Loading lib/galaxy/datatypes/text.py +1 −0 Original line number Diff line number Diff line Loading @@ -138,6 +138,7 @@ class DataManagerJson(Json): MetadataElement( name="data_tables", default=None, desc="Data tables represented by this dataset", readonly=True, visible=True ) MetadataElement(name="is_bundle", default=False, desc="Dataset represents bundle", readonly=True, visible=True) def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd): super().set_meta(dataset=dataset, overwrite=overwrite, **kwd) Loading lib/galaxy/managers/workflows.py +3 −17 Original line number Diff line number Diff line Loading @@ -31,8 +31,6 @@ from pydantic import ( ) from sqlalchemy import ( and_, Cast, ColumnElement, desc, false, func, Loading @@ -40,7 +38,6 @@ from sqlalchemy import ( select, true, ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import ( aliased, joinedload, Loading Loading @@ -72,6 +69,7 @@ from galaxy.model import ( StoredWorkflow, StoredWorkflowTagAssociation, StoredWorkflowUserShareAssociation, to_json, User, Workflow, WorkflowInvocation, Loading Loading @@ -2069,26 +2067,14 @@ class WorkflowContentsManager(UsesAnnotations): ) -> Optional[model.StoredWorkflow]: sa_session = self.app.model.session def to_json(column, keys: List[str]): assert sa_session.bind if sa_session.bind.dialect.name == "postgresql": cast: Union[ColumnElement[Any], Cast[Any]] = func.cast(func.convert_from(column, "UTF8"), JSONB) for key in keys: cast = cast.__getitem__(key) return cast.astext else: for key in keys: column = func.json_extract(column, f"$.{key}") return column stmnt = ( select(model.StoredWorkflow) .join(model.Workflow, model.Workflow.id == model.StoredWorkflow.latest_workflow_id) .filter( and_( model.StoredWorkflow.deleted == false(), to_json(model.Workflow.source_metadata, ["trs_tool_id"]) == trs_id, to_json(model.Workflow.source_metadata, ["trs_version_id"]) == trs_version, to_json(sa_session, model.Workflow.source_metadata, ["trs_tool_id"]) == trs_id, to_json(sa_session, model.Workflow.source_metadata, ["trs_version_id"]) == trs_version, ) ) ) Loading lib/galaxy/model/__init__.py +17 −1 Original line number Diff line number Diff line Loading @@ -68,8 +68,10 @@ from sqlalchemy import ( bindparam, Boolean, case, Cast, Column, column, ColumnElement, DateTime, delete, desc, Loading Loading @@ -100,6 +102,7 @@ from sqlalchemy import ( update, VARCHAR, ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.exc import ( CompileError, OperationalError, Loading Loading @@ -318,6 +321,19 @@ def get_uuid(uuid: Optional[Union[UUID, str]] = None) -> UUID: return UUID(str(uuid)) def to_json(sa_session, column, keys: List[str]): assert sa_session.bind if sa_session.bind.dialect.name == "postgresql": cast: Union[ColumnElement[Any], Cast[Any]] = func.cast(func.convert_from(column, "UTF8"), JSONB) for key in keys: cast = cast.__getitem__(key) return cast.astext else: for key in keys: column = func.json_extract(column, f"$.{key}") return column class Base(DeclarativeBase, _HasTable): __abstract__ = True metadata = MetaData(naming_convention=NAMING_CONVENTION) Loading Loading @@ -904,8 +920,8 @@ class User(Base, Dictifiable, RepresentById): Dataset.state == "ok", # excludes data manager runs that actually populated tables. # maybe track this formally by creating a different datatype for bundles ? Dataset.total_size != Dataset.file_size, HistoryDatasetAssociation._metadata.contains(data_table), to_json(session, HistoryDatasetAssociation._metadata, ["is_bundle"]) == "true", ) .order_by(HistoryDatasetAssociation.id) ) Loading lib/galaxy/tools/__init__.py +1 −0 Original line number Diff line number Diff line Loading @@ -3261,6 +3261,7 @@ class DataManagerTool(OutputParameterJSONTool): create=True, preserve_symlinks=True, ) hda.metadata.is_bundle = True else: raise Exception("Unknown data manager mode encountered type...") Loading lib/galaxy/tools/parameters/basic.py +64 −50 Original line number Diff line number Diff line Loading @@ -13,10 +13,11 @@ import urllib.parse from collections.abc import MutableMapping from typing import ( Any, cast, Dict, List, Optional, Tuple, Sequence, TYPE_CHECKING, Union, ) Loading @@ -43,12 +44,14 @@ from galaxy.model.dataset_collections import builder from galaxy.schema.fetch_data import FilesPayload from galaxy.tool_util.parameters.factory import get_color_value from galaxy.tool_util.parser import get_input_source as ensure_input_source from galaxy.tool_util.parser.interface import DrillDownOptionsDict from galaxy.tool_util.parser.util import ( boolean_is_checked, boolean_true_and_false_values, ParameterParseException, text_input_is_optional, ) from galaxy.tools.parameters.options import ParameterOption from galaxy.tools.parameters.workflow_utils import ( NO_REPLACEMENT, workflow_building_modes, Loading Loading @@ -131,6 +134,10 @@ def parse_dynamic_options(param, input_source): return dynamic_options.DynamicOptions(options_elem, param) def serialize_options(security: "IdEncodingHelper", options: Sequence[ParameterOption]): return [option.serialize(security) for option in options] # Describe a parameter value error where there is no actual supplied # parameter - e.g. just a specification issue. NO_PARAMETER_VALUE = object() Loading Loading @@ -175,7 +182,7 @@ class ToolParameter(UsesDictVisibleKeys): >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None) >>> trans = Bunch(app=None, security=lambda x: x) >>> p = ToolParameter(None, XML('<param argument="--parameter-name" type="text" value="default" />')) >>> assert p.name == 'parameter_name' >>> assert sorted(p.to_dict(trans).items()) == [('argument', '--parameter-name'), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ToolParameter'), ('name', 'parameter_name'), ('optional', False), ('refresh_on_change', False), ('type', 'text'), ('value', None)] Loading Loading @@ -925,7 +932,7 @@ class SelectToolParameter(ToolParameter): >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=False) >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=False, security=lambda x: x) >>> p = SelectToolParameter(None, XML( ... ''' ... <param name="_name" type="select"> Loading Loading @@ -983,13 +990,15 @@ class SelectToolParameter(ToolParameter): call_other_values.update(other_values.dict) return call_other_values def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[Union[ParameterOption, DrillDownOptionsDict]]: if self.options: return self.options.get_options(trans, other_values) elif self.dynamic_options: call_other_values = self._get_dynamic_options_call_other_values(trans, other_values) try: return eval(self.dynamic_options, self.tool.code_namespace, call_other_values) return [ ParameterOption(*o) for o in eval(self.dynamic_options, self.tool.code_namespace, call_other_values) ] except Exception as e: log.debug( "Error determining dynamic options for parameter '%s' in tool '%s':", Loading @@ -999,22 +1008,21 @@ class SelectToolParameter(ToolParameter): ) return [] else: return self.static_options return [ParameterOption(*o) for o in self.static_options] def get_legal_values(self, trans, other_values, value): """ determine the set of values of legal options """ return { history_item_dict_to_python(v, trans.app, self.name) or v for _, v, _ in self.get_options(trans, other_values) } options = cast(List[ParameterOption], self.get_options(trans, other_values)) return {option.dataset or option.value for option in options} def get_legal_names(self, trans, other_values): """ determine a mapping from names to values for all legal options determine the set of values of legal options """ return {n: v for n, v, _ in self.get_options(trans, other_values)} options = cast(List[ParameterOption], self.get_options(trans, other_values)) return {option.name: option.value for option in options} def from_json(self, value, trans, other_values=None): return self._select_from_json(value, trans, other_values=other_values, require_legal_value=True) Loading Loading @@ -1136,17 +1144,17 @@ class SelectToolParameter(ToolParameter): def get_initial_value(self, trans, other_values): try: options = list(self.get_options(trans, other_values)) options = cast(List[ParameterOption], self.get_options(trans, other_values)) except ImplicitConversionRequired: return None if not options: return None value = [optval for _, optval, selected in options if selected] value = [option.value for option in options if option.selected] if len(value) == 0: if not self.optional and not self.multiple and options: # Nothing selected, but not optional and not a multiple select, with some values, # so we have to default to something (the HTML form will anyway) value2 = options[0][1] value2: Optional[Union[str, List[str]]] = options[0].value else: value2 = None elif len(value) == 1 or not self.multiple: Loading Loading @@ -1186,8 +1194,8 @@ class SelectToolParameter(ToolParameter): d = super().to_dict(trans, other_values) # Get options, value. options = self.get_options(trans, other_values) d["options"] = options options = cast(List[ParameterOption], self.get_options(trans, other_values)) d["options"] = serialize_options(trans.security, options) d["display"] = self.display d["multiple"] = self.multiple d["textable"] = is_runtime_context(trans, other_values) Loading @@ -1212,7 +1220,7 @@ class GenomeBuildParameter(SelectToolParameter): >>> # Create a mock transaction with 'hg17' as the current build >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None)) >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None), security=lambda x:x) >>> p = GenomeBuildParameter(None, XML('<param name="_name" type="genomebuild" value="hg17" />')) >>> print(p.name) _name Loading @@ -1232,12 +1240,14 @@ class GenomeBuildParameter(SelectToolParameter): self.static_options = [(value, key, False) for key, value in self._get_dbkey_names()] self.is_dynamic = True def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[ParameterOption]: last_used_build = object() if trans.history: last_used_build = trans.history.genome_build for dbkey, build_name in self._get_dbkey_names(trans=trans): yield build_name, dbkey, (dbkey == last_used_build) return [ ParameterOption(build_name, dbkey, (dbkey == last_used_build)) for dbkey, build_name in self._get_dbkey_names(trans=trans) ] def get_legal_values(self, trans, other_values, value): return {dbkey for dbkey, _ in self._get_dbkey_names(trans=trans)} Loading @@ -1247,16 +1257,16 @@ class GenomeBuildParameter(SelectToolParameter): d = ToolParameter.to_dict(self, trans) # Get options, value - options is a generator here, so compile to list options = list(self.get_options(trans, {})) value = options[0][1] options = self.get_options(trans, {}) value = options[0].value for option in options: if option[2]: if option.selected: # Found selected option. value = option[1] value = option.value d.update( { "options": options, "options": serialize_options(trans, options), "value": value, "display": self.display, "multiple": self.multiple, Loading Loading @@ -1344,13 +1354,13 @@ class SelectTagParameter(SelectToolParameter): tags.add(tag.user_value) return list(tags) def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[ParameterOption]: """ Show tags """ options = [] for tag in self.get_tag_list(other_values): options.append((f"Tags: {tag}", tag, False)) options.append(ParameterOption(f"Tags: {tag}", tag, False)) return options def get_initial_value(self, trans, other_values): Loading Loading @@ -1517,11 +1527,11 @@ class ColumnListParameter(SelectToolParameter): column_list = [c for c in column_list if c in this_column_list] return column_list def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[ParameterOption]: """ Show column labels rather than c1..cn if use_header_names=True """ options: List[Tuple[str, Union[str, Tuple[str, str]], bool]] = [] options: Sequence[ParameterOption] = [] column_list = self.get_column_list(trans, other_values) if not column_list: return options Loading @@ -1535,7 +1545,10 @@ class ColumnListParameter(SelectToolParameter): and dataset.metadata.element_is_set("column_names") ): try: options = [(f"c{c}: {dataset.metadata.column_names[int(c) - 1]}", c, False) for c in column_list] options = [ ParameterOption(f"c{c}: {dataset.metadata.column_names[int(c) - 1]}", c, False) for c in column_list ] except IndexError: # ignore and rely on fallback pass Loading @@ -1544,13 +1557,13 @@ class ColumnListParameter(SelectToolParameter): with open(dataset.get_file_name()) as f: head = f.readline() cnames = head.rstrip("\n\r ").split("\t") options = [(f"c{c}: {cnames[int(c) - 1]}", c, False) for c in column_list] options = [ParameterOption(f"c{c}: {cnames[int(c) - 1]}", c, False) for c in column_list] except Exception: # ignore and rely on fallback pass if not options: # fallback if no options list could be built so far options = [(f"Column: {col}", col, False) for col in column_list] options = [ParameterOption(f"Column: {col}", col, False) for col in column_list] return options def get_initial_value(self, trans, other_values): Loading Loading @@ -1613,7 +1626,7 @@ class DrillDownSelectToolParameter(SelectToolParameter): >>> from galaxy.util.bunch import Bunch >>> app = Bunch(config=Bunch(tool_data_path=None)) >>> tool = Bunch(app=app) >>> trans = Bunch(app=app, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None)) >>> trans = Bunch(app=app, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None), security=lambda x: x) >>> p = DrillDownSelectToolParameter(tool, XML( ... ''' ... <param name="_name" type="drill_down" display="checkbox" hierarchy="recurse" multiple="true"> Loading Loading @@ -1680,18 +1693,17 @@ class DrillDownSelectToolParameter(SelectToolParameter): except Exception: return [] def get_options(self, trans=None, other_values=None): def get_options(self, trans=None, other_values=None) -> List[DrillDownOptionsDict]: other_values = other_values or {} if self.is_dynamic: if self.dynamic_options: options = self._get_options_from_code(trans=trans, other_values=other_values) else: options = [] return options return self._get_options_from_code(trans=trans, other_values=other_values) return [] return self.options def get_legal_values(self, trans, other_values, value): def recurse_options(legal_values, options): def recurse_options(legal_values, options: List[DrillDownOptionsDict]): for option in options: legal_values.append(option["value"]) recurse_options(legal_values, option["options"]) Loading Loading @@ -1737,16 +1749,16 @@ class DrillDownSelectToolParameter(SelectToolParameter): other_values = other_values or {} def get_options_list(value): def get_base_option(value, options): def get_base_option(value, options: List[DrillDownOptionsDict]): for option in options: if value == option["value"]: return option rval = get_base_option(value, option["options"]) rval = get_base_option(value, option["options"] or []) if rval: return rval return None # not found def recurse_option(option_list, option): def recurse_option(option_list, option: DrillDownOptionsDict): if not option["options"]: option_list.append(option["value"]) else: Loading @@ -1754,7 +1766,9 @@ class DrillDownSelectToolParameter(SelectToolParameter): recurse_option(option_list, opt) rval: List[str] = [] base_option = get_base_option(value, self.get_options(other_values=other_values)) options = self.get_options(other_values=other_values) base_option = get_base_option(value, options) if base_option: recurse_option(rval, base_option) return rval or [value] Loading @@ -1781,11 +1795,11 @@ class DrillDownSelectToolParameter(SelectToolParameter): return rval def get_initial_value(self, trans, other_values): def recurse_options(initial_values, options): def recurse_options(initial_values, options: List[DrillDownOptionsDict]): for option in options: if option["selected"]: initial_values.append(option["value"]) recurse_options(initial_values, option["options"]) recurse_options(initial_values, option["options"] or []) # More working around dynamic options for workflow options = self.get_options(trans=trans, other_values=other_values) Loading @@ -1798,11 +1812,11 @@ class DrillDownSelectToolParameter(SelectToolParameter): return initial_values def to_text(self, value): def get_option_display(value, options): def get_option_display(value, options: List[DrillDownOptionsDict]): for option in options: if value == option["value"]: return option["name"] rval = get_option_display(value, option["options"]) rval = get_option_display(value, option["options"] or []) if rval: return rval return None # not found Loading @@ -1824,7 +1838,7 @@ class DrillDownSelectToolParameter(SelectToolParameter): else: rval = [] for val in value: rval.append(get_option_display(val, self.options) or val) rval.append(get_option_display(val, self.options or val)) if rval: return "\n".join(map(str, rval)) return "Nothing selected." Loading Loading
lib/galaxy/datatypes/text.py +1 −0 Original line number Diff line number Diff line Loading @@ -138,6 +138,7 @@ class DataManagerJson(Json): MetadataElement( name="data_tables", default=None, desc="Data tables represented by this dataset", readonly=True, visible=True ) MetadataElement(name="is_bundle", default=False, desc="Dataset represents bundle", readonly=True, visible=True) def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd): super().set_meta(dataset=dataset, overwrite=overwrite, **kwd) Loading
lib/galaxy/managers/workflows.py +3 −17 Original line number Diff line number Diff line Loading @@ -31,8 +31,6 @@ from pydantic import ( ) from sqlalchemy import ( and_, Cast, ColumnElement, desc, false, func, Loading @@ -40,7 +38,6 @@ from sqlalchemy import ( select, true, ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import ( aliased, joinedload, Loading Loading @@ -72,6 +69,7 @@ from galaxy.model import ( StoredWorkflow, StoredWorkflowTagAssociation, StoredWorkflowUserShareAssociation, to_json, User, Workflow, WorkflowInvocation, Loading Loading @@ -2069,26 +2067,14 @@ class WorkflowContentsManager(UsesAnnotations): ) -> Optional[model.StoredWorkflow]: sa_session = self.app.model.session def to_json(column, keys: List[str]): assert sa_session.bind if sa_session.bind.dialect.name == "postgresql": cast: Union[ColumnElement[Any], Cast[Any]] = func.cast(func.convert_from(column, "UTF8"), JSONB) for key in keys: cast = cast.__getitem__(key) return cast.astext else: for key in keys: column = func.json_extract(column, f"$.{key}") return column stmnt = ( select(model.StoredWorkflow) .join(model.Workflow, model.Workflow.id == model.StoredWorkflow.latest_workflow_id) .filter( and_( model.StoredWorkflow.deleted == false(), to_json(model.Workflow.source_metadata, ["trs_tool_id"]) == trs_id, to_json(model.Workflow.source_metadata, ["trs_version_id"]) == trs_version, to_json(sa_session, model.Workflow.source_metadata, ["trs_tool_id"]) == trs_id, to_json(sa_session, model.Workflow.source_metadata, ["trs_version_id"]) == trs_version, ) ) ) Loading
lib/galaxy/model/__init__.py +17 −1 Original line number Diff line number Diff line Loading @@ -68,8 +68,10 @@ from sqlalchemy import ( bindparam, Boolean, case, Cast, Column, column, ColumnElement, DateTime, delete, desc, Loading Loading @@ -100,6 +102,7 @@ from sqlalchemy import ( update, VARCHAR, ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.exc import ( CompileError, OperationalError, Loading Loading @@ -318,6 +321,19 @@ def get_uuid(uuid: Optional[Union[UUID, str]] = None) -> UUID: return UUID(str(uuid)) def to_json(sa_session, column, keys: List[str]): assert sa_session.bind if sa_session.bind.dialect.name == "postgresql": cast: Union[ColumnElement[Any], Cast[Any]] = func.cast(func.convert_from(column, "UTF8"), JSONB) for key in keys: cast = cast.__getitem__(key) return cast.astext else: for key in keys: column = func.json_extract(column, f"$.{key}") return column class Base(DeclarativeBase, _HasTable): __abstract__ = True metadata = MetaData(naming_convention=NAMING_CONVENTION) Loading Loading @@ -904,8 +920,8 @@ class User(Base, Dictifiable, RepresentById): Dataset.state == "ok", # excludes data manager runs that actually populated tables. # maybe track this formally by creating a different datatype for bundles ? Dataset.total_size != Dataset.file_size, HistoryDatasetAssociation._metadata.contains(data_table), to_json(session, HistoryDatasetAssociation._metadata, ["is_bundle"]) == "true", ) .order_by(HistoryDatasetAssociation.id) ) Loading
lib/galaxy/tools/__init__.py +1 −0 Original line number Diff line number Diff line Loading @@ -3261,6 +3261,7 @@ class DataManagerTool(OutputParameterJSONTool): create=True, preserve_symlinks=True, ) hda.metadata.is_bundle = True else: raise Exception("Unknown data manager mode encountered type...") Loading
lib/galaxy/tools/parameters/basic.py +64 −50 Original line number Diff line number Diff line Loading @@ -13,10 +13,11 @@ import urllib.parse from collections.abc import MutableMapping from typing import ( Any, cast, Dict, List, Optional, Tuple, Sequence, TYPE_CHECKING, Union, ) Loading @@ -43,12 +44,14 @@ from galaxy.model.dataset_collections import builder from galaxy.schema.fetch_data import FilesPayload from galaxy.tool_util.parameters.factory import get_color_value from galaxy.tool_util.parser import get_input_source as ensure_input_source from galaxy.tool_util.parser.interface import DrillDownOptionsDict from galaxy.tool_util.parser.util import ( boolean_is_checked, boolean_true_and_false_values, ParameterParseException, text_input_is_optional, ) from galaxy.tools.parameters.options import ParameterOption from galaxy.tools.parameters.workflow_utils import ( NO_REPLACEMENT, workflow_building_modes, Loading Loading @@ -131,6 +134,10 @@ def parse_dynamic_options(param, input_source): return dynamic_options.DynamicOptions(options_elem, param) def serialize_options(security: "IdEncodingHelper", options: Sequence[ParameterOption]): return [option.serialize(security) for option in options] # Describe a parameter value error where there is no actual supplied # parameter - e.g. just a specification issue. NO_PARAMETER_VALUE = object() Loading Loading @@ -175,7 +182,7 @@ class ToolParameter(UsesDictVisibleKeys): >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None) >>> trans = Bunch(app=None, security=lambda x: x) >>> p = ToolParameter(None, XML('<param argument="--parameter-name" type="text" value="default" />')) >>> assert p.name == 'parameter_name' >>> assert sorted(p.to_dict(trans).items()) == [('argument', '--parameter-name'), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ToolParameter'), ('name', 'parameter_name'), ('optional', False), ('refresh_on_change', False), ('type', 'text'), ('value', None)] Loading Loading @@ -925,7 +932,7 @@ class SelectToolParameter(ToolParameter): >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=False) >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=False, security=lambda x: x) >>> p = SelectToolParameter(None, XML( ... ''' ... <param name="_name" type="select"> Loading Loading @@ -983,13 +990,15 @@ class SelectToolParameter(ToolParameter): call_other_values.update(other_values.dict) return call_other_values def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[Union[ParameterOption, DrillDownOptionsDict]]: if self.options: return self.options.get_options(trans, other_values) elif self.dynamic_options: call_other_values = self._get_dynamic_options_call_other_values(trans, other_values) try: return eval(self.dynamic_options, self.tool.code_namespace, call_other_values) return [ ParameterOption(*o) for o in eval(self.dynamic_options, self.tool.code_namespace, call_other_values) ] except Exception as e: log.debug( "Error determining dynamic options for parameter '%s' in tool '%s':", Loading @@ -999,22 +1008,21 @@ class SelectToolParameter(ToolParameter): ) return [] else: return self.static_options return [ParameterOption(*o) for o in self.static_options] def get_legal_values(self, trans, other_values, value): """ determine the set of values of legal options """ return { history_item_dict_to_python(v, trans.app, self.name) or v for _, v, _ in self.get_options(trans, other_values) } options = cast(List[ParameterOption], self.get_options(trans, other_values)) return {option.dataset or option.value for option in options} def get_legal_names(self, trans, other_values): """ determine a mapping from names to values for all legal options determine the set of values of legal options """ return {n: v for n, v, _ in self.get_options(trans, other_values)} options = cast(List[ParameterOption], self.get_options(trans, other_values)) return {option.name: option.value for option in options} def from_json(self, value, trans, other_values=None): return self._select_from_json(value, trans, other_values=other_values, require_legal_value=True) Loading Loading @@ -1136,17 +1144,17 @@ class SelectToolParameter(ToolParameter): def get_initial_value(self, trans, other_values): try: options = list(self.get_options(trans, other_values)) options = cast(List[ParameterOption], self.get_options(trans, other_values)) except ImplicitConversionRequired: return None if not options: return None value = [optval for _, optval, selected in options if selected] value = [option.value for option in options if option.selected] if len(value) == 0: if not self.optional and not self.multiple and options: # Nothing selected, but not optional and not a multiple select, with some values, # so we have to default to something (the HTML form will anyway) value2 = options[0][1] value2: Optional[Union[str, List[str]]] = options[0].value else: value2 = None elif len(value) == 1 or not self.multiple: Loading Loading @@ -1186,8 +1194,8 @@ class SelectToolParameter(ToolParameter): d = super().to_dict(trans, other_values) # Get options, value. options = self.get_options(trans, other_values) d["options"] = options options = cast(List[ParameterOption], self.get_options(trans, other_values)) d["options"] = serialize_options(trans.security, options) d["display"] = self.display d["multiple"] = self.multiple d["textable"] = is_runtime_context(trans, other_values) Loading @@ -1212,7 +1220,7 @@ class GenomeBuildParameter(SelectToolParameter): >>> # Create a mock transaction with 'hg17' as the current build >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None)) >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None), security=lambda x:x) >>> p = GenomeBuildParameter(None, XML('<param name="_name" type="genomebuild" value="hg17" />')) >>> print(p.name) _name Loading @@ -1232,12 +1240,14 @@ class GenomeBuildParameter(SelectToolParameter): self.static_options = [(value, key, False) for key, value in self._get_dbkey_names()] self.is_dynamic = True def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[ParameterOption]: last_used_build = object() if trans.history: last_used_build = trans.history.genome_build for dbkey, build_name in self._get_dbkey_names(trans=trans): yield build_name, dbkey, (dbkey == last_used_build) return [ ParameterOption(build_name, dbkey, (dbkey == last_used_build)) for dbkey, build_name in self._get_dbkey_names(trans=trans) ] def get_legal_values(self, trans, other_values, value): return {dbkey for dbkey, _ in self._get_dbkey_names(trans=trans)} Loading @@ -1247,16 +1257,16 @@ class GenomeBuildParameter(SelectToolParameter): d = ToolParameter.to_dict(self, trans) # Get options, value - options is a generator here, so compile to list options = list(self.get_options(trans, {})) value = options[0][1] options = self.get_options(trans, {}) value = options[0].value for option in options: if option[2]: if option.selected: # Found selected option. value = option[1] value = option.value d.update( { "options": options, "options": serialize_options(trans, options), "value": value, "display": self.display, "multiple": self.multiple, Loading Loading @@ -1344,13 +1354,13 @@ class SelectTagParameter(SelectToolParameter): tags.add(tag.user_value) return list(tags) def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[ParameterOption]: """ Show tags """ options = [] for tag in self.get_tag_list(other_values): options.append((f"Tags: {tag}", tag, False)) options.append(ParameterOption(f"Tags: {tag}", tag, False)) return options def get_initial_value(self, trans, other_values): Loading Loading @@ -1517,11 +1527,11 @@ class ColumnListParameter(SelectToolParameter): column_list = [c for c in column_list if c in this_column_list] return column_list def get_options(self, trans, other_values): def get_options(self, trans, other_values) -> Sequence[ParameterOption]: """ Show column labels rather than c1..cn if use_header_names=True """ options: List[Tuple[str, Union[str, Tuple[str, str]], bool]] = [] options: Sequence[ParameterOption] = [] column_list = self.get_column_list(trans, other_values) if not column_list: return options Loading @@ -1535,7 +1545,10 @@ class ColumnListParameter(SelectToolParameter): and dataset.metadata.element_is_set("column_names") ): try: options = [(f"c{c}: {dataset.metadata.column_names[int(c) - 1]}", c, False) for c in column_list] options = [ ParameterOption(f"c{c}: {dataset.metadata.column_names[int(c) - 1]}", c, False) for c in column_list ] except IndexError: # ignore and rely on fallback pass Loading @@ -1544,13 +1557,13 @@ class ColumnListParameter(SelectToolParameter): with open(dataset.get_file_name()) as f: head = f.readline() cnames = head.rstrip("\n\r ").split("\t") options = [(f"c{c}: {cnames[int(c) - 1]}", c, False) for c in column_list] options = [ParameterOption(f"c{c}: {cnames[int(c) - 1]}", c, False) for c in column_list] except Exception: # ignore and rely on fallback pass if not options: # fallback if no options list could be built so far options = [(f"Column: {col}", col, False) for col in column_list] options = [ParameterOption(f"Column: {col}", col, False) for col in column_list] return options def get_initial_value(self, trans, other_values): Loading Loading @@ -1613,7 +1626,7 @@ class DrillDownSelectToolParameter(SelectToolParameter): >>> from galaxy.util.bunch import Bunch >>> app = Bunch(config=Bunch(tool_data_path=None)) >>> tool = Bunch(app=app) >>> trans = Bunch(app=app, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None)) >>> trans = Bunch(app=app, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None), security=lambda x: x) >>> p = DrillDownSelectToolParameter(tool, XML( ... ''' ... <param name="_name" type="drill_down" display="checkbox" hierarchy="recurse" multiple="true"> Loading Loading @@ -1680,18 +1693,17 @@ class DrillDownSelectToolParameter(SelectToolParameter): except Exception: return [] def get_options(self, trans=None, other_values=None): def get_options(self, trans=None, other_values=None) -> List[DrillDownOptionsDict]: other_values = other_values or {} if self.is_dynamic: if self.dynamic_options: options = self._get_options_from_code(trans=trans, other_values=other_values) else: options = [] return options return self._get_options_from_code(trans=trans, other_values=other_values) return [] return self.options def get_legal_values(self, trans, other_values, value): def recurse_options(legal_values, options): def recurse_options(legal_values, options: List[DrillDownOptionsDict]): for option in options: legal_values.append(option["value"]) recurse_options(legal_values, option["options"]) Loading Loading @@ -1737,16 +1749,16 @@ class DrillDownSelectToolParameter(SelectToolParameter): other_values = other_values or {} def get_options_list(value): def get_base_option(value, options): def get_base_option(value, options: List[DrillDownOptionsDict]): for option in options: if value == option["value"]: return option rval = get_base_option(value, option["options"]) rval = get_base_option(value, option["options"] or []) if rval: return rval return None # not found def recurse_option(option_list, option): def recurse_option(option_list, option: DrillDownOptionsDict): if not option["options"]: option_list.append(option["value"]) else: Loading @@ -1754,7 +1766,9 @@ class DrillDownSelectToolParameter(SelectToolParameter): recurse_option(option_list, opt) rval: List[str] = [] base_option = get_base_option(value, self.get_options(other_values=other_values)) options = self.get_options(other_values=other_values) base_option = get_base_option(value, options) if base_option: recurse_option(rval, base_option) return rval or [value] Loading @@ -1781,11 +1795,11 @@ class DrillDownSelectToolParameter(SelectToolParameter): return rval def get_initial_value(self, trans, other_values): def recurse_options(initial_values, options): def recurse_options(initial_values, options: List[DrillDownOptionsDict]): for option in options: if option["selected"]: initial_values.append(option["value"]) recurse_options(initial_values, option["options"]) recurse_options(initial_values, option["options"] or []) # More working around dynamic options for workflow options = self.get_options(trans=trans, other_values=other_values) Loading @@ -1798,11 +1812,11 @@ class DrillDownSelectToolParameter(SelectToolParameter): return initial_values def to_text(self, value): def get_option_display(value, options): def get_option_display(value, options: List[DrillDownOptionsDict]): for option in options: if value == option["value"]: return option["name"] rval = get_option_display(value, option["options"]) rval = get_option_display(value, option["options"] or []) if rval: return rval return None # not found Loading @@ -1824,7 +1838,7 @@ class DrillDownSelectToolParameter(SelectToolParameter): else: rval = [] for val in value: rval.append(get_option_display(val, self.options) or val) rval.append(get_option_display(val, self.options or val)) if rval: return "\n".join(map(str, rval)) return "Nothing selected." Loading