Loading lib/galaxy/managers/workflows.py +4 −3 Original line number Diff line number Diff line Loading @@ -592,7 +592,7 @@ class WorkflowContentsManager(UsesAnnotations): import_options = ImportOptions() import_options.deduplicate_subworkflows = True as_dict = python_to_workflow(as_dict, galaxy_interface, workflow_directory=None, import_options=import_options) raw_description = RawWorkflowDescription(as_dict, path) raw_description = RawWorkflowDescription(as_dict) created_workflow = self.build_workflow_from_raw_description(trans, raw_description, WorkflowCreateOptions()) return created_workflow.workflow Loading Loading @@ -925,6 +925,7 @@ class WorkflowContentsManager(UsesAnnotations): return wf_dict def _sync_stored_workflow(self, trans, stored_workflow): if trans.user_is_admin: workflow_path = stored_workflow.from_path self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans) Loading lib/galaxy/model/__init__.py +13 −6 Original line number Diff line number Diff line Loading @@ -2489,7 +2489,7 @@ class PostJobAction(Base, RepresentById): workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), index=True, nullable=True) action_type = Column(String(255), nullable=False) output_name = Column(String(255), nullable=True) action_arguments = Column(MutableJSONType, nullable=True) _action_arguments = Column("action_arguments", MutableJSONType, nullable=True) workflow_step = relationship( "WorkflowStep", back_populates="post_job_actions", Loading @@ -2503,6 +2503,18 @@ class PostJobAction(Base, RepresentById): self.workflow_step = workflow_step ensure_object_added_to_session(self, object_in_session=workflow_step) @property def action_arguments(self): if self.action_type in ("HideDatasetAction", "DeleteIntermediatesAction") and self._action_arguments is True: # Fix up broken workflows resulting from imports with gxformat2 <= 0.20.0 return {} else: return self._action_arguments @action_arguments.setter def action_arguments(self, value: Dict[str, Any]): self._action_arguments = value class PostJobActionAssociation(Base, RepresentById): __tablename__ = "post_job_action_association" Loading Loading @@ -6701,11 +6713,6 @@ class HistoryDatasetCollectionAssociation( primaryjoin=copied_from_history_dataset_collection_association_id == id, remote_side=[id], uselist=False, back_populates="copied_to_history_dataset_collection_association", ) copied_to_history_dataset_collection_association = relationship( "HistoryDatasetCollectionAssociation", back_populates="copied_from_history_dataset_collection_association", ) implicit_input_collections = relationship( "ImplicitlyCreatedDatasetCollectionInput", Loading lib/galaxy/model/store/__init__.py +22 −18 Original line number Diff line number Diff line Loading @@ -996,14 +996,16 @@ class ModelImportStore(metaclass=abc.ABCMeta): # sense. hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks if copied_from_object_key in object_import_tracker.hdcas_by_key: hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[ copied_from_object_key ] source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key] if source_hdca is not hdca: # We may not have the copied source, in which case the first included HDCA in the chain # acts as the source, so here we make sure we don't create a cycle. hdca.copied_from_history_dataset_collection_association = source_hdca else: if copied_from_object_key in hdca_copied_from_sinks: hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[ hdca_copied_from_sinks[copied_from_object_key] ] source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]] if source_hdca is not hdca: hdca.copied_from_history_dataset_collection_association = source_hdca else: hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key Loading Loading @@ -1070,7 +1072,7 @@ class ModelImportStore(metaclass=abc.ABCMeta): for step_attrs in invocation_attrs["steps"]: imported_invocation_step = model.WorkflowInvocationStep() imported_invocation_step.workflow_invocation = imported_invocation ensure_object_added_to_session(imported_invocation, session=self.sa_session) ensure_object_added_to_session(imported_invocation_step, session=self.sa_session) attach_workflow_step(imported_invocation_step, step_attrs) restore_times(imported_invocation_step, step_attrs) imported_invocation_step.action = step_attrs["action"] Loading Loading @@ -1921,12 +1923,14 @@ class DirectoryModelExportStore(ModelExportStore): self.export_files = export_files self.included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]] = {} self.dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {} self.included_collections: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = [] self.included_collections: Dict[ Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], ] = {} self.included_libraries: List[model.Library] = [] self.included_library_folders: List[model.LibraryFolder] = [] self.included_invocations: List[model.WorkflowInvocation] = [] self.collection_datasets: Set[int] = set() self.collections_attrs: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = [] self.dataset_id_to_path: Dict[int, Tuple[Optional[str], Optional[str]]] = {} self.job_output_dataset_associations: Dict[int, Dict[str, model.DatasetInstance]] = {} Loading Loading @@ -2287,8 +2291,7 @@ class DirectoryModelExportStore(ModelExportStore): def add_dataset_collection( self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation] ) -> None: self.collections_attrs.append(collection) self.included_collections.append(collection) self.included_collections[collection] = collection def add_implicit_conversion_dataset( self, Loading Loading @@ -2343,7 +2346,7 @@ class DirectoryModelExportStore(ModelExportStore): collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS) with open(collections_attrs_filename, "w") as collections_attrs_out: collections_attrs_out.write(to_json(self.collections_attrs)) collections_attrs_out.write(to_json(self.included_collections.values())) conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS) with open(conversions_attrs_filename, "w") as conversions_attrs_out: Loading @@ -2364,12 +2367,12 @@ class DirectoryModelExportStore(ModelExportStore): # # Get all jobs associated with included HDAs. jobs_dict: Dict[str, model.Job] = {} jobs_dict: Dict[int, model.Job] = {} implicit_collection_jobs_dict = {} def record_job(job): if not job: # No viable job. if not job or job.id in jobs_dict: # No viable job or job already recorded. return jobs_dict[job.id] = job Loading @@ -2395,10 +2398,11 @@ class DirectoryModelExportStore(ModelExportStore): ) job_hda = hda while job_hda.copied_from_history_dataset_association: # should this check library datasets as well? # record job (if one exists) even if dataset was copied # copy could have been created manually through UI/API or using database operation tool, # in which case we have a relevant job to export. record_associated_jobs(job_hda) job_hda = job_hda.copied_from_history_dataset_association if not job_hda.creating_job_associations: # No viable HDA found. continue record_associated_jobs(job_hda) Loading test/integration/test_workflow_tasks.py +44 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ from galaxy_test.api.test_workflows import RunsWorkflowFixtures from galaxy_test.base import api_asserts from galaxy_test.base.api import UsesCeleryTasks from galaxy_test.base.populators import ( DatasetCollectionPopulator, DatasetPopulator, RunJobsSummary, WorkflowPopulator, Loading @@ -27,6 +28,7 @@ from galaxy_test.driver.integration_util import IntegrationTestCase class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, UsesCeleryTasks, RunsWorkflowFixtures): dataset_populator: DatasetPopulator dataset_collection_populator: DatasetCollectionPopulator framework_tool_and_types = True @classmethod Loading @@ -37,6 +39,7 @@ class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, Us def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.workflow_populator = WorkflowPopulator(self.galaxy_interactor) self._write_file_fixtures() Loading Loading @@ -124,6 +127,47 @@ class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, Us invocation_details = self._export_and_import_workflow_invocation(summary, use_uris) self._rerun_imported_workflow(summary, invocation_details) def test_export_import_invocation_with_copied_hdca_and_database_operation_tool(self): with self.dataset_populator.test_history() as history_id: self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True).json() new_history = self.dataset_populator.copy_history(history_id=history_id).json() copied_collection = self.dataset_populator.get_history_collection_details(new_history["id"]) workflow_id = self.workflow_populator.upload_yaml_workflow( """class: GalaxyWorkflow inputs: input: type: collection collection_type: list steps: extract_dataset: tool_id: __EXTRACT_DATASET__ in: input: source: input """ ) inputs = {"input": {"src": "hdca", "id": copied_collection["id"]}} workflow_request = {"history": f"hist_id={new_history['id']}", "inputs_by": "name", "inputs": inputs} invocation = self.workflow_populator.invoke_workflow_raw( workflow_id, workflow_request, assert_ok=True ).json() invocation_id = invocation["id"] self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) jobs = self.workflow_populator.get_invocation_jobs(invocation_id) summary = RunJobsSummary( history_id=history_id, workflow_id=workflow_id, invocation_id=invocation["id"], inputs=inputs, jobs=jobs, invocation=invocation, workflow_request=workflow_request, ) imported_invocation_details = self._export_and_import_workflow_invocation(summary) original_contents = self.dataset_populator.get_history_contents(new_history["id"]) contents = self.dataset_populator.get_history_contents(imported_invocation_details["history_id"]) assert len(contents) == len(original_contents) == 5 def _export_and_import_workflow_invocation( self, summary: RunJobsSummary, use_uris: bool = True, model_store_format="tgz" ) -> Dict[str, Any]: Loading Loading
lib/galaxy/managers/workflows.py +4 −3 Original line number Diff line number Diff line Loading @@ -592,7 +592,7 @@ class WorkflowContentsManager(UsesAnnotations): import_options = ImportOptions() import_options.deduplicate_subworkflows = True as_dict = python_to_workflow(as_dict, galaxy_interface, workflow_directory=None, import_options=import_options) raw_description = RawWorkflowDescription(as_dict, path) raw_description = RawWorkflowDescription(as_dict) created_workflow = self.build_workflow_from_raw_description(trans, raw_description, WorkflowCreateOptions()) return created_workflow.workflow Loading Loading @@ -925,6 +925,7 @@ class WorkflowContentsManager(UsesAnnotations): return wf_dict def _sync_stored_workflow(self, trans, stored_workflow): if trans.user_is_admin: workflow_path = stored_workflow.from_path self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans) Loading
lib/galaxy/model/__init__.py +13 −6 Original line number Diff line number Diff line Loading @@ -2489,7 +2489,7 @@ class PostJobAction(Base, RepresentById): workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), index=True, nullable=True) action_type = Column(String(255), nullable=False) output_name = Column(String(255), nullable=True) action_arguments = Column(MutableJSONType, nullable=True) _action_arguments = Column("action_arguments", MutableJSONType, nullable=True) workflow_step = relationship( "WorkflowStep", back_populates="post_job_actions", Loading @@ -2503,6 +2503,18 @@ class PostJobAction(Base, RepresentById): self.workflow_step = workflow_step ensure_object_added_to_session(self, object_in_session=workflow_step) @property def action_arguments(self): if self.action_type in ("HideDatasetAction", "DeleteIntermediatesAction") and self._action_arguments is True: # Fix up broken workflows resulting from imports with gxformat2 <= 0.20.0 return {} else: return self._action_arguments @action_arguments.setter def action_arguments(self, value: Dict[str, Any]): self._action_arguments = value class PostJobActionAssociation(Base, RepresentById): __tablename__ = "post_job_action_association" Loading Loading @@ -6701,11 +6713,6 @@ class HistoryDatasetCollectionAssociation( primaryjoin=copied_from_history_dataset_collection_association_id == id, remote_side=[id], uselist=False, back_populates="copied_to_history_dataset_collection_association", ) copied_to_history_dataset_collection_association = relationship( "HistoryDatasetCollectionAssociation", back_populates="copied_from_history_dataset_collection_association", ) implicit_input_collections = relationship( "ImplicitlyCreatedDatasetCollectionInput", Loading
lib/galaxy/model/store/__init__.py +22 −18 Original line number Diff line number Diff line Loading @@ -996,14 +996,16 @@ class ModelImportStore(metaclass=abc.ABCMeta): # sense. hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks if copied_from_object_key in object_import_tracker.hdcas_by_key: hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[ copied_from_object_key ] source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key] if source_hdca is not hdca: # We may not have the copied source, in which case the first included HDCA in the chain # acts as the source, so here we make sure we don't create a cycle. hdca.copied_from_history_dataset_collection_association = source_hdca else: if copied_from_object_key in hdca_copied_from_sinks: hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[ hdca_copied_from_sinks[copied_from_object_key] ] source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]] if source_hdca is not hdca: hdca.copied_from_history_dataset_collection_association = source_hdca else: hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key Loading Loading @@ -1070,7 +1072,7 @@ class ModelImportStore(metaclass=abc.ABCMeta): for step_attrs in invocation_attrs["steps"]: imported_invocation_step = model.WorkflowInvocationStep() imported_invocation_step.workflow_invocation = imported_invocation ensure_object_added_to_session(imported_invocation, session=self.sa_session) ensure_object_added_to_session(imported_invocation_step, session=self.sa_session) attach_workflow_step(imported_invocation_step, step_attrs) restore_times(imported_invocation_step, step_attrs) imported_invocation_step.action = step_attrs["action"] Loading Loading @@ -1921,12 +1923,14 @@ class DirectoryModelExportStore(ModelExportStore): self.export_files = export_files self.included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]] = {} self.dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {} self.included_collections: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = [] self.included_collections: Dict[ Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], ] = {} self.included_libraries: List[model.Library] = [] self.included_library_folders: List[model.LibraryFolder] = [] self.included_invocations: List[model.WorkflowInvocation] = [] self.collection_datasets: Set[int] = set() self.collections_attrs: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = [] self.dataset_id_to_path: Dict[int, Tuple[Optional[str], Optional[str]]] = {} self.job_output_dataset_associations: Dict[int, Dict[str, model.DatasetInstance]] = {} Loading Loading @@ -2287,8 +2291,7 @@ class DirectoryModelExportStore(ModelExportStore): def add_dataset_collection( self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation] ) -> None: self.collections_attrs.append(collection) self.included_collections.append(collection) self.included_collections[collection] = collection def add_implicit_conversion_dataset( self, Loading Loading @@ -2343,7 +2346,7 @@ class DirectoryModelExportStore(ModelExportStore): collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS) with open(collections_attrs_filename, "w") as collections_attrs_out: collections_attrs_out.write(to_json(self.collections_attrs)) collections_attrs_out.write(to_json(self.included_collections.values())) conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS) with open(conversions_attrs_filename, "w") as conversions_attrs_out: Loading @@ -2364,12 +2367,12 @@ class DirectoryModelExportStore(ModelExportStore): # # Get all jobs associated with included HDAs. jobs_dict: Dict[str, model.Job] = {} jobs_dict: Dict[int, model.Job] = {} implicit_collection_jobs_dict = {} def record_job(job): if not job: # No viable job. if not job or job.id in jobs_dict: # No viable job or job already recorded. return jobs_dict[job.id] = job Loading @@ -2395,10 +2398,11 @@ class DirectoryModelExportStore(ModelExportStore): ) job_hda = hda while job_hda.copied_from_history_dataset_association: # should this check library datasets as well? # record job (if one exists) even if dataset was copied # copy could have been created manually through UI/API or using database operation tool, # in which case we have a relevant job to export. record_associated_jobs(job_hda) job_hda = job_hda.copied_from_history_dataset_association if not job_hda.creating_job_associations: # No viable HDA found. continue record_associated_jobs(job_hda) Loading
test/integration/test_workflow_tasks.py +44 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ from galaxy_test.api.test_workflows import RunsWorkflowFixtures from galaxy_test.base import api_asserts from galaxy_test.base.api import UsesCeleryTasks from galaxy_test.base.populators import ( DatasetCollectionPopulator, DatasetPopulator, RunJobsSummary, WorkflowPopulator, Loading @@ -27,6 +28,7 @@ from galaxy_test.driver.integration_util import IntegrationTestCase class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, UsesCeleryTasks, RunsWorkflowFixtures): dataset_populator: DatasetPopulator dataset_collection_populator: DatasetCollectionPopulator framework_tool_and_types = True @classmethod Loading @@ -37,6 +39,7 @@ class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, Us def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.workflow_populator = WorkflowPopulator(self.galaxy_interactor) self._write_file_fixtures() Loading Loading @@ -124,6 +127,47 @@ class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, Us invocation_details = self._export_and_import_workflow_invocation(summary, use_uris) self._rerun_imported_workflow(summary, invocation_details) def test_export_import_invocation_with_copied_hdca_and_database_operation_tool(self): with self.dataset_populator.test_history() as history_id: self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True).json() new_history = self.dataset_populator.copy_history(history_id=history_id).json() copied_collection = self.dataset_populator.get_history_collection_details(new_history["id"]) workflow_id = self.workflow_populator.upload_yaml_workflow( """class: GalaxyWorkflow inputs: input: type: collection collection_type: list steps: extract_dataset: tool_id: __EXTRACT_DATASET__ in: input: source: input """ ) inputs = {"input": {"src": "hdca", "id": copied_collection["id"]}} workflow_request = {"history": f"hist_id={new_history['id']}", "inputs_by": "name", "inputs": inputs} invocation = self.workflow_populator.invoke_workflow_raw( workflow_id, workflow_request, assert_ok=True ).json() invocation_id = invocation["id"] self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) jobs = self.workflow_populator.get_invocation_jobs(invocation_id) summary = RunJobsSummary( history_id=history_id, workflow_id=workflow_id, invocation_id=invocation["id"], inputs=inputs, jobs=jobs, invocation=invocation, workflow_request=workflow_request, ) imported_invocation_details = self._export_and_import_workflow_invocation(summary) original_contents = self.dataset_populator.get_history_contents(new_history["id"]) contents = self.dataset_populator.get_history_contents(imported_invocation_details["history_id"]) assert len(contents) == len(original_contents) == 5 def _export_and_import_workflow_invocation( self, summary: RunJobsSummary, use_uris: bool = True, model_store_format="tgz" ) -> Dict[str, Any]: Loading