Loading lib/galaxy/model/__init__.py +1 −1 Original line number Diff line number Diff line Loading @@ -7866,7 +7866,7 @@ class Workflow(Base, Dictifiable, RepresentById): has_cycles: Mapped[Optional[bool]] has_errors: Mapped[Optional[bool]] reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType) creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType) creator_metadata: Mapped[Optional[List[Dict[str, Any]]]] = mapped_column(JSONType) license: Mapped[Optional[str]] = mapped_column(TEXT) source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType) uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType) Loading lib/galaxy/model/store/ro_crate_utils.py +159 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import os from typing import ( Any, Dict, List, Optional, ) Loading Loading @@ -85,6 +86,8 @@ class WorkflowRunCrateProfileBuilder: self.file_entities: Dict[int, Any] = {} self.param_entities: Dict[int, Any] = {} self.pv_entities: Dict[str, Any] = {} # Cache for tools to avoid duplicating entities for the same tool self.tool_cache: Dict[str, ContextEntity] = {} def build_crate(self): crate = ROCrate() Loading Loading @@ -224,6 +227,162 @@ class WorkflowRunCrateProfileBuilder: crate.mainEntity["name"] = self.workflow.name crate.mainEntity["subjectOf"] = cwl_wf # Adding multiple creators if available if self.workflow.creator_metadata: for creator_data in self.workflow.creator_metadata: if creator_data.get("class") == "Person": # Create the person entity creator_entity = crate.add( ContextEntity( crate, creator_data.get("identifier", ""), # Default to empty string if identifier is missing properties={ "@type": "Person", "name": creator_data.get("name", ""), # Default to empty string if name is missing "identifier": creator_data.get( "identifier", "" ), # Assuming identifier is ORCID, or adjust as needed "url": creator_data.get("url", ""), # Add URL if available, otherwise empty string "email": creator_data.get( "email", "" ), # Add email if available, otherwise empty string }, ) ) # Append the person creator entity to the mainEntity crate.mainEntity.append_to("creator", creator_entity) elif creator_data.get("class") == "Organization": # Create the organization entity organization_entity = crate.add( ContextEntity( crate, creator_data.get( "url", "" ), # Use URL as identifier if available, otherwise empty string properties={ "@type": "Organization", "name": creator_data.get("name", ""), # Default to empty string if name is missing "url": creator_data.get("url", ""), # Add URL if available, otherwise empty string }, ) ) # Append the organization entity to the mainEntity crate.mainEntity.append_to("creator", organization_entity) # Add CWL workflow entity if exists crate.mainEntity["subjectOf"] = cwl_wf # Add tools used in the workflow self._add_tools(crate) self._add_steps(crate) def _add_steps(self, crate: ROCrate): """ Add workflow steps (HowToStep) to the RO-Crate. These are unique for each tool occurrence. """ step_entities: List[ContextEntity] = [] # Initialize the position as a list with a single element to keep it mutable position = [1] self._add_steps_recursive(self.workflow.steps, crate, step_entities, position) return step_entities def _add_steps_recursive(self, steps, crate: ROCrate, step_entities, position): """ Recursively add HowToStep entities from workflow steps, ensuring that the position index is maintained across subworkflows. """ for step in steps: if step.type == "tool": # Create a unique HowToStep entity for each step step_id = f"step_{position[0]}" step_description = None if step.annotations: annotations_list = [annotation.annotation for annotation in step.annotations if annotation] step_description = " ".join(annotations_list) if annotations_list else None # Add HowToStep entity to the crate step_entity = crate.add( ContextEntity( crate, step_id, properties={ "@type": "HowToStep", "position": position[0], "name": step.tool_id, "description": step_description, "workExample": f"#{step.tool_id}", }, ) ) # Append the HowToStep entity to the workflow steps list step_entities.append(step_entity) crate.mainEntity.append_to("step", step_entity) # Increment the position counter position[0] += 1 # Handle subworkflows recursively elif step.type == "subworkflow": subworkflow = step.subworkflow if subworkflow: self._add_steps_recursive(subworkflow.steps, crate, step_entities, position) def _add_tools(self, crate: ROCrate): tool_entities: List[ContextEntity] = [] self._add_tools_recursive(self.workflow.steps, crate, tool_entities) def _add_tools_recursive(self, steps, crate: ROCrate, tool_entities): """ Recursively add SoftwareApplication entities from workflow steps, reusing tools when necessary. """ for step in steps: if step.type == "tool": tool_id = step.tool_id tool_version = step.tool_version # Cache key based on tool ID and version tool_key = f"{tool_id}:{tool_version}" # Check if tool entity is already in cache if tool_key in self.tool_cache: tool_entity = self.tool_cache[tool_key] else: # Create a new tool entity tool_name = tool_id tool_description = None if step.annotations: annotations_list = [annotation.annotation for annotation in step.annotations if annotation] tool_description = " ".join(annotations_list) if annotations_list else None # Add tool entity to the RO-Crate tool_entity = crate.add( ContextEntity( crate, f"#{tool_id}", # Prepend # to tool_id properties={ "@type": "SoftwareApplication", "name": tool_name, "version": tool_version, "description": tool_description, }, ) ) # Store the tool entity in the cache self.tool_cache[tool_key] = tool_entity # Append the tool entity to the workflow (instrument) and store it in the list tool_entities.append(tool_entity) crate.mainEntity.append_to("hasPart", tool_entity) # Handle subworkflows recursively elif step.type == "subworkflow": subworkflow = step.subworkflow if subworkflow: self._add_tools_recursive(subworkflow.steps, crate, tool_entities) def _add_create_action(self, crate: ROCrate): self.create_action = crate.add( ContextEntity( Loading test/unit/data/model/test_model_store.py +125 −25 Original line number Diff line number Diff line Loading @@ -23,7 +23,6 @@ from sqlalchemy.orm.scoping import scoped_session from galaxy import model from galaxy.model import store from galaxy.model.metadata import MetadataTempFile from galaxy.model.orm.now import now from galaxy.model.unittest_utils import GalaxyDataTestApp from galaxy.model.unittest_utils.store_fixtures import ( deferred_hda_model_store_dict, Loading Loading @@ -440,7 +439,6 @@ def test_import_export_library(tmp_path): def test_import_export_invocation(): app = _mock_app() workflow_invocation = _setup_invocation(app) temp_directory = mkdtemp() with store.DirectoryModelExportStore(temp_directory, app=app) as export_store: export_store.export_workflow_invocation(workflow_invocation) Loading Loading @@ -482,6 +480,55 @@ def validate_has_mit_license(ro_crate: ROCrate): assert found_license def validate_creators(ro_crate: ROCrate): """ Validate that creators (Person and Organization) are correctly added. """ creators = ro_crate.mainEntity.get("creator") assert creators, "No creators found in the RO-Crate" for creator in creators: assert creator["@type"] in {"Person", "Organization"} if creator["@type"] == "Person": assert "name" in creator assert "orcid" in creator or "identifier" in creator assert "email" in creator elif creator["@type"] == "Organization": assert "name" in creator assert "url" in creator def validate_steps(ro_crate: ROCrate): """ Validate that workflow steps (HowToStep) are correctly added. """ steps = ro_crate.mainEntity.get("step") assert steps, "No steps found in the RO-Crate" for i, step in enumerate(steps, start=1): assert step["@type"] == "HowToStep" assert step["position"] == i assert "name" in step assert "description" in step or step["description"] is None def validate_tools(ro_crate: ROCrate): """ Validate that tools (SoftwareApplication) are correctly added. """ tools = ro_crate.mainEntity.get("hasPart") assert tools, "No tools found in the RO-Crate" tool_ids = set() for tool in tools: assert tool["@type"] == "SoftwareApplication" assert "name" in tool assert "version" in tool assert "description" in tool or tool["description"] is None assert tool.id not in tool_ids, "Duplicate tool found" tool_ids.add(tool.id) def validate_has_readme(ro_crate: ROCrate): found_readme = False for e in ro_crate.get_entities(): Loading Loading @@ -564,6 +611,9 @@ def validate_invocation_crate_directory(crate_directory): validate_has_pl_galaxy(crate) validate_organize_action(crate) validate_has_mit_license(crate) validate_creators(crate) validate_steps(crate) validate_tools(crate) # validate_has_readme(crate) Loading Loading @@ -971,31 +1021,64 @@ def _setup_simple_cat_job(app, state="ok"): def _setup_invocation(app): sa_session = app.model.context # Set up a user, history, datasets, and job u, h, d1, d2, j = _setup_simple_cat_job(app) j.parameters = [model.JobParameter(name="index_path", value='"/old/path/human"')] # Create a workflow workflow = model.Workflow() workflow.license = "MIT" workflow.name = "Test Workflow" workflow.creator_metadata = [ {"class": "Person", "name": "Alice", "identifier": "0000-0001-2345-6789", "email": "alice@example.com"}, ] # Create and associate a data_input step workflow_step_1 = model.WorkflowStep() workflow_step_1.order_index = 0 workflow_step_1.type = "data_input" sa_session.add(workflow_step_1) workflow_1 = _workflow_from_steps(u, [workflow_step_1]) workflow_1.license = "MIT" workflow_1.name = "Test Workflow" sa_session.add(workflow_1) workflow_invocation = _invocation_for_workflow(u, workflow_1) invocation_step = model.WorkflowInvocationStep() invocation_step.workflow_step = workflow_step_1 invocation_step.job = j sa_session.add(invocation_step) output_assoc = model.WorkflowInvocationStepOutputDatasetAssociation() output_assoc.dataset = d2 invocation_step.output_datasets = [output_assoc] workflow_invocation.steps = [invocation_step] workflow_step_1.label = "Input Step" workflow.steps.append(workflow_step_1) sa_session.add(workflow_step_1) # Persist step in the session # Create and associate a tool step workflow_step_2 = model.WorkflowStep() workflow_step_2.order_index = 0 workflow_step_2.type = "tool" workflow_step_2.tool_id = "example_tool" workflow_step_2.tool_version = "1.0" workflow_step_2.label = "Example Tool Step" workflow.steps.append(workflow_step_2) sa_session.add(workflow_step_2) # Persist step in the session sa_session.add(workflow) # Persist the workflow itself # Create a workflow invocation workflow_invocation = _invocation_for_workflow(u, workflow) # Associate invocation step for data_input invocation_step_1 = model.WorkflowInvocationStep() invocation_step_1.workflow_step = workflow_step_1 invocation_step_1.job = j sa_session.add(invocation_step_1) # Associate invocation step for tool invocation_step_2 = model.WorkflowInvocationStep() invocation_step_2.workflow_step = workflow_step_2 sa_session.add(invocation_step_2) # Add steps to the invocation workflow_invocation.steps = [invocation_step_1, invocation_step_2] workflow_invocation.user = u workflow_invocation.add_input(d1, step=workflow_step_1) wf_output = model.WorkflowOutput(workflow_step_1, label="output_label") workflow_invocation.add_output(wf_output, workflow_step_1, d2) # Add workflow output associated with the tool step wf_output = model.WorkflowOutput(workflow_step_2, label="output_label") workflow_invocation.add_output(wf_output, workflow_step_2, d2) # Commit the workflow and invocation app.add_and_commit(workflow_invocation) return workflow_invocation Loading Loading @@ -1074,9 +1157,11 @@ def _setup_collection_invocation(app): def _setup_simple_invocation(app): sa_session = app.model.context # Set up a simple user, history, datasets, and job u, h, d1, d2, j = _setup_simple_cat_job(app) j.parameters = [model.JobParameter(name="index_path", value='"/old/path/human"')] # Create a workflow workflow_step_1 = model.WorkflowStep() workflow_step_1.order_index = 0 workflow_step_1.type = "data_input" Loading @@ -1085,16 +1170,30 @@ def _setup_simple_invocation(app): workflow = _workflow_from_steps(u, [workflow_step_1]) workflow.license = "MIT" workflow.name = "Test Workflow" workflow.create_time = now() workflow.update_time = now() workflow.creator_metadata = [ {"class": "Person", "name": "Bob", "identifier": "0000-0002-3456-7890", "email": "bob@example.com"}, ] # Create and associate a tool step workflow_step_tool = model.WorkflowStep() workflow_step_tool.order_index = 1 workflow_step_tool.type = "tool" workflow_step_tool.tool_id = "example_tool" workflow_step_tool.tool_version = "1.0" workflow_step_tool.label = "Example Tool Step" workflow.steps.append(workflow_step_tool) sa_session.add(workflow) # Create a workflow invocation invocation = _invocation_for_workflow(u, workflow) invocation.create_time = now() invocation.update_time = now() invocation.add_input(d1, step=workflow_step_1) # Associate input dataset wf_output = model.WorkflowOutput(workflow_step_tool, label="output_label") invocation.add_output(wf_output, workflow_step_tool, d2) # Associate output dataset # Commit the workflow and invocation to the database app.add_and_commit(invocation) invocation.add_input(d1, step=workflow_step_1) wf_output = model.WorkflowOutput(workflow_step_1, label="output_label") invocation.add_output(wf_output, workflow_step_1, d2) return invocation Loading Loading @@ -1191,6 +1290,7 @@ def _mock_app(store_by=DEFAULT_OBJECT_STORE_BY): test_object_store_config = TestConfig(store_by=store_by) app.object_store = test_object_store_config.object_store app.model.Dataset.object_store = app.object_store return app Loading Loading
lib/galaxy/model/__init__.py +1 −1 Original line number Diff line number Diff line Loading @@ -7866,7 +7866,7 @@ class Workflow(Base, Dictifiable, RepresentById): has_cycles: Mapped[Optional[bool]] has_errors: Mapped[Optional[bool]] reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType) creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType) creator_metadata: Mapped[Optional[List[Dict[str, Any]]]] = mapped_column(JSONType) license: Mapped[Optional[str]] = mapped_column(TEXT) source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType) uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType) Loading
lib/galaxy/model/store/ro_crate_utils.py +159 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import os from typing import ( Any, Dict, List, Optional, ) Loading Loading @@ -85,6 +86,8 @@ class WorkflowRunCrateProfileBuilder: self.file_entities: Dict[int, Any] = {} self.param_entities: Dict[int, Any] = {} self.pv_entities: Dict[str, Any] = {} # Cache for tools to avoid duplicating entities for the same tool self.tool_cache: Dict[str, ContextEntity] = {} def build_crate(self): crate = ROCrate() Loading Loading @@ -224,6 +227,162 @@ class WorkflowRunCrateProfileBuilder: crate.mainEntity["name"] = self.workflow.name crate.mainEntity["subjectOf"] = cwl_wf # Adding multiple creators if available if self.workflow.creator_metadata: for creator_data in self.workflow.creator_metadata: if creator_data.get("class") == "Person": # Create the person entity creator_entity = crate.add( ContextEntity( crate, creator_data.get("identifier", ""), # Default to empty string if identifier is missing properties={ "@type": "Person", "name": creator_data.get("name", ""), # Default to empty string if name is missing "identifier": creator_data.get( "identifier", "" ), # Assuming identifier is ORCID, or adjust as needed "url": creator_data.get("url", ""), # Add URL if available, otherwise empty string "email": creator_data.get( "email", "" ), # Add email if available, otherwise empty string }, ) ) # Append the person creator entity to the mainEntity crate.mainEntity.append_to("creator", creator_entity) elif creator_data.get("class") == "Organization": # Create the organization entity organization_entity = crate.add( ContextEntity( crate, creator_data.get( "url", "" ), # Use URL as identifier if available, otherwise empty string properties={ "@type": "Organization", "name": creator_data.get("name", ""), # Default to empty string if name is missing "url": creator_data.get("url", ""), # Add URL if available, otherwise empty string }, ) ) # Append the organization entity to the mainEntity crate.mainEntity.append_to("creator", organization_entity) # Add CWL workflow entity if exists crate.mainEntity["subjectOf"] = cwl_wf # Add tools used in the workflow self._add_tools(crate) self._add_steps(crate) def _add_steps(self, crate: ROCrate): """ Add workflow steps (HowToStep) to the RO-Crate. These are unique for each tool occurrence. """ step_entities: List[ContextEntity] = [] # Initialize the position as a list with a single element to keep it mutable position = [1] self._add_steps_recursive(self.workflow.steps, crate, step_entities, position) return step_entities def _add_steps_recursive(self, steps, crate: ROCrate, step_entities, position): """ Recursively add HowToStep entities from workflow steps, ensuring that the position index is maintained across subworkflows. """ for step in steps: if step.type == "tool": # Create a unique HowToStep entity for each step step_id = f"step_{position[0]}" step_description = None if step.annotations: annotations_list = [annotation.annotation for annotation in step.annotations if annotation] step_description = " ".join(annotations_list) if annotations_list else None # Add HowToStep entity to the crate step_entity = crate.add( ContextEntity( crate, step_id, properties={ "@type": "HowToStep", "position": position[0], "name": step.tool_id, "description": step_description, "workExample": f"#{step.tool_id}", }, ) ) # Append the HowToStep entity to the workflow steps list step_entities.append(step_entity) crate.mainEntity.append_to("step", step_entity) # Increment the position counter position[0] += 1 # Handle subworkflows recursively elif step.type == "subworkflow": subworkflow = step.subworkflow if subworkflow: self._add_steps_recursive(subworkflow.steps, crate, step_entities, position) def _add_tools(self, crate: ROCrate): tool_entities: List[ContextEntity] = [] self._add_tools_recursive(self.workflow.steps, crate, tool_entities) def _add_tools_recursive(self, steps, crate: ROCrate, tool_entities): """ Recursively add SoftwareApplication entities from workflow steps, reusing tools when necessary. """ for step in steps: if step.type == "tool": tool_id = step.tool_id tool_version = step.tool_version # Cache key based on tool ID and version tool_key = f"{tool_id}:{tool_version}" # Check if tool entity is already in cache if tool_key in self.tool_cache: tool_entity = self.tool_cache[tool_key] else: # Create a new tool entity tool_name = tool_id tool_description = None if step.annotations: annotations_list = [annotation.annotation for annotation in step.annotations if annotation] tool_description = " ".join(annotations_list) if annotations_list else None # Add tool entity to the RO-Crate tool_entity = crate.add( ContextEntity( crate, f"#{tool_id}", # Prepend # to tool_id properties={ "@type": "SoftwareApplication", "name": tool_name, "version": tool_version, "description": tool_description, }, ) ) # Store the tool entity in the cache self.tool_cache[tool_key] = tool_entity # Append the tool entity to the workflow (instrument) and store it in the list tool_entities.append(tool_entity) crate.mainEntity.append_to("hasPart", tool_entity) # Handle subworkflows recursively elif step.type == "subworkflow": subworkflow = step.subworkflow if subworkflow: self._add_tools_recursive(subworkflow.steps, crate, tool_entities) def _add_create_action(self, crate: ROCrate): self.create_action = crate.add( ContextEntity( Loading
test/unit/data/model/test_model_store.py +125 −25 Original line number Diff line number Diff line Loading @@ -23,7 +23,6 @@ from sqlalchemy.orm.scoping import scoped_session from galaxy import model from galaxy.model import store from galaxy.model.metadata import MetadataTempFile from galaxy.model.orm.now import now from galaxy.model.unittest_utils import GalaxyDataTestApp from galaxy.model.unittest_utils.store_fixtures import ( deferred_hda_model_store_dict, Loading Loading @@ -440,7 +439,6 @@ def test_import_export_library(tmp_path): def test_import_export_invocation(): app = _mock_app() workflow_invocation = _setup_invocation(app) temp_directory = mkdtemp() with store.DirectoryModelExportStore(temp_directory, app=app) as export_store: export_store.export_workflow_invocation(workflow_invocation) Loading Loading @@ -482,6 +480,55 @@ def validate_has_mit_license(ro_crate: ROCrate): assert found_license def validate_creators(ro_crate: ROCrate): """ Validate that creators (Person and Organization) are correctly added. """ creators = ro_crate.mainEntity.get("creator") assert creators, "No creators found in the RO-Crate" for creator in creators: assert creator["@type"] in {"Person", "Organization"} if creator["@type"] == "Person": assert "name" in creator assert "orcid" in creator or "identifier" in creator assert "email" in creator elif creator["@type"] == "Organization": assert "name" in creator assert "url" in creator def validate_steps(ro_crate: ROCrate): """ Validate that workflow steps (HowToStep) are correctly added. """ steps = ro_crate.mainEntity.get("step") assert steps, "No steps found in the RO-Crate" for i, step in enumerate(steps, start=1): assert step["@type"] == "HowToStep" assert step["position"] == i assert "name" in step assert "description" in step or step["description"] is None def validate_tools(ro_crate: ROCrate): """ Validate that tools (SoftwareApplication) are correctly added. """ tools = ro_crate.mainEntity.get("hasPart") assert tools, "No tools found in the RO-Crate" tool_ids = set() for tool in tools: assert tool["@type"] == "SoftwareApplication" assert "name" in tool assert "version" in tool assert "description" in tool or tool["description"] is None assert tool.id not in tool_ids, "Duplicate tool found" tool_ids.add(tool.id) def validate_has_readme(ro_crate: ROCrate): found_readme = False for e in ro_crate.get_entities(): Loading Loading @@ -564,6 +611,9 @@ def validate_invocation_crate_directory(crate_directory): validate_has_pl_galaxy(crate) validate_organize_action(crate) validate_has_mit_license(crate) validate_creators(crate) validate_steps(crate) validate_tools(crate) # validate_has_readme(crate) Loading Loading @@ -971,31 +1021,64 @@ def _setup_simple_cat_job(app, state="ok"): def _setup_invocation(app): sa_session = app.model.context # Set up a user, history, datasets, and job u, h, d1, d2, j = _setup_simple_cat_job(app) j.parameters = [model.JobParameter(name="index_path", value='"/old/path/human"')] # Create a workflow workflow = model.Workflow() workflow.license = "MIT" workflow.name = "Test Workflow" workflow.creator_metadata = [ {"class": "Person", "name": "Alice", "identifier": "0000-0001-2345-6789", "email": "alice@example.com"}, ] # Create and associate a data_input step workflow_step_1 = model.WorkflowStep() workflow_step_1.order_index = 0 workflow_step_1.type = "data_input" sa_session.add(workflow_step_1) workflow_1 = _workflow_from_steps(u, [workflow_step_1]) workflow_1.license = "MIT" workflow_1.name = "Test Workflow" sa_session.add(workflow_1) workflow_invocation = _invocation_for_workflow(u, workflow_1) invocation_step = model.WorkflowInvocationStep() invocation_step.workflow_step = workflow_step_1 invocation_step.job = j sa_session.add(invocation_step) output_assoc = model.WorkflowInvocationStepOutputDatasetAssociation() output_assoc.dataset = d2 invocation_step.output_datasets = [output_assoc] workflow_invocation.steps = [invocation_step] workflow_step_1.label = "Input Step" workflow.steps.append(workflow_step_1) sa_session.add(workflow_step_1) # Persist step in the session # Create and associate a tool step workflow_step_2 = model.WorkflowStep() workflow_step_2.order_index = 0 workflow_step_2.type = "tool" workflow_step_2.tool_id = "example_tool" workflow_step_2.tool_version = "1.0" workflow_step_2.label = "Example Tool Step" workflow.steps.append(workflow_step_2) sa_session.add(workflow_step_2) # Persist step in the session sa_session.add(workflow) # Persist the workflow itself # Create a workflow invocation workflow_invocation = _invocation_for_workflow(u, workflow) # Associate invocation step for data_input invocation_step_1 = model.WorkflowInvocationStep() invocation_step_1.workflow_step = workflow_step_1 invocation_step_1.job = j sa_session.add(invocation_step_1) # Associate invocation step for tool invocation_step_2 = model.WorkflowInvocationStep() invocation_step_2.workflow_step = workflow_step_2 sa_session.add(invocation_step_2) # Add steps to the invocation workflow_invocation.steps = [invocation_step_1, invocation_step_2] workflow_invocation.user = u workflow_invocation.add_input(d1, step=workflow_step_1) wf_output = model.WorkflowOutput(workflow_step_1, label="output_label") workflow_invocation.add_output(wf_output, workflow_step_1, d2) # Add workflow output associated with the tool step wf_output = model.WorkflowOutput(workflow_step_2, label="output_label") workflow_invocation.add_output(wf_output, workflow_step_2, d2) # Commit the workflow and invocation app.add_and_commit(workflow_invocation) return workflow_invocation Loading Loading @@ -1074,9 +1157,11 @@ def _setup_collection_invocation(app): def _setup_simple_invocation(app): sa_session = app.model.context # Set up a simple user, history, datasets, and job u, h, d1, d2, j = _setup_simple_cat_job(app) j.parameters = [model.JobParameter(name="index_path", value='"/old/path/human"')] # Create a workflow workflow_step_1 = model.WorkflowStep() workflow_step_1.order_index = 0 workflow_step_1.type = "data_input" Loading @@ -1085,16 +1170,30 @@ def _setup_simple_invocation(app): workflow = _workflow_from_steps(u, [workflow_step_1]) workflow.license = "MIT" workflow.name = "Test Workflow" workflow.create_time = now() workflow.update_time = now() workflow.creator_metadata = [ {"class": "Person", "name": "Bob", "identifier": "0000-0002-3456-7890", "email": "bob@example.com"}, ] # Create and associate a tool step workflow_step_tool = model.WorkflowStep() workflow_step_tool.order_index = 1 workflow_step_tool.type = "tool" workflow_step_tool.tool_id = "example_tool" workflow_step_tool.tool_version = "1.0" workflow_step_tool.label = "Example Tool Step" workflow.steps.append(workflow_step_tool) sa_session.add(workflow) # Create a workflow invocation invocation = _invocation_for_workflow(u, workflow) invocation.create_time = now() invocation.update_time = now() invocation.add_input(d1, step=workflow_step_1) # Associate input dataset wf_output = model.WorkflowOutput(workflow_step_tool, label="output_label") invocation.add_output(wf_output, workflow_step_tool, d2) # Associate output dataset # Commit the workflow and invocation to the database app.add_and_commit(invocation) invocation.add_input(d1, step=workflow_step_1) wf_output = model.WorkflowOutput(workflow_step_1, label="output_label") invocation.add_output(wf_output, workflow_step_1, d2) return invocation Loading Loading @@ -1191,6 +1290,7 @@ def _mock_app(store_by=DEFAULT_OBJECT_STORE_BY): test_object_store_config = TestConfig(store_by=store_by) app.object_store = test_object_store_config.object_store app.model.Dataset.object_store = app.object_store return app Loading