Commit db1e1d7f authored by r8s's avatar r8s
Browse files

DatasetCollection and Workflow implementation

Added a working DatasetCollection and Workflow classes. Also refactored
Dataset creation to work through a DatasetFactory
parent 3ecc572b
Loading
Loading
Loading
Loading
Loading
+43 −10
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ as well as output data from Galaxy tools.
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from bioblend.galaxy.dataset_collections import DatasetCollectionClient
from bioblend.galaxy.datasets import DatasetClient
@@ -15,6 +15,9 @@ from bioblend.galaxy.datasets import DatasetClient
if TYPE_CHECKING:
    from .data_store import Datastore

from .parameters import Parameters
from .tool import Tool


class DataState(Enum):
    """The state of a dataset in Galaxy."""
@@ -103,15 +106,47 @@ class Dataset(AbstractData):
class DatasetCollection(AbstractData):
    """A group of files that can be uploaded as a collection and collectively be used in a Galaxy tool."""

    def __init__(self, path: str, name: Optional[str] = None):
        self.path = path
        self.name = name or Path(path).name
    def __init__(self, paths: List[str], name: str = ""):
        self.paths = paths
        self.name = name
        self.id: str
        self.store: "Datastore"

    def upload(self, store: "Datastore") -> None:
        """Will need to handle this differently than single datasets."""
        raise NotImplementedError
        """ Upload several files as a collection.

        Parameters
        ----------
        store: "Datastore"
            The Datastore to upload the collection under
        """

        self.store = store

        # Dictionary from file names to Datasets, needed when doing parallel upload
        input_dictionary = {}

        # Parameters for the Build List tool
        params = Parameters()

        # Create a Dataset for each requested path and add it to the dictionary and Parameters
        for i in range(len(self.paths)):
            new_dataset = Dataset(self.paths[i])
            input_dictionary["Input Dataset" + str(i)] = new_dataset
            params.add_input("Input Dataset" + str(i), new_dataset)

        # Parallel upload of all the Datasets
        upload_datasets(store, input_dictionary)

        # Run the Build List tool on all the uploaded datasets, turning them into a DatasetCollection
        build_list_tool = Tool("__BUILD_LIST__")
        outputs = build_list_tool.run(data_store=store, params=Parameters())

        # Return the new DatasetCollection produced by the tool
        print(outputs.data)
        new_collection = outputs.get_dataset("(as list)")
        
        self.id = new_collection.id

    def download(self, local_path: str) -> AbstractData:
        """Downloads this dataset collection to the local path given."""
@@ -126,10 +161,8 @@ class DatasetCollection(AbstractData):
        if self.store and self.id:
            dataset_client = DatasetCollectionClient(self.store.nova_connection.galaxy_instance)
            info = dataset_client.show_dataset_collection(self.id)
            output = ""
            for element in info["elements"]:
                output += f"{element['element_identifier']}\n"
            return output
            self.info = info
            return info
        else:
            raise Exception("Dataset collection is not present in Galaxy.")

+54 −0
Original line number Diff line number Diff line
from typing import Optional

from bioblend import galaxy

from .parameters import Parameters

class DatasetFactory():
    
    @staticmethod
    def create_dataset(store, config):
        
        # Avoid circular imports for this singleton class by importing packages after declaration
        from .dataset import Dataset
        
        d = Dataset(config["output_name"])
        d.id = config["id"]
        d.store = store
        return d
    
    @staticmethod
    def create_dataset_collection(store, config):
        
        # Avoid circular imports for this singleton class by importing packages after declaration
        from .dataset import DatasetCollection
        
        dc = DatasetCollection([], config["output_name"])
        dc.id = config["id"]
        dc.store = store
        return dc

    @staticmethod
    def upload_data(store, params: Optional[Parameters]):
    
        # Avoid circular imports for this singleton class by importing packages after declaration
        from .dataset import Dataset, DatasetCollection, upload_datasets

        datasets_to_upload = {}

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        if params:
            for param, val in params.inputs.items():
                if isinstance(val, Dataset):
                    datasets_to_upload[param] = val
                elif isinstance(val, DatasetCollection):
                    val.upload(store)
                    tool_inputs.set_dataset_param(param, val.id)
                else:
                    tool_inputs.set_param(param, val)
            ids = upload_datasets(store=store, datasets=datasets_to_upload)
            for param, val in ids.items():
                tool_inputs.set_dataset_param(param, val)

        return tool_inputs
+42 −16
Original line number Diff line number Diff line
@@ -9,7 +9,8 @@ from bioblend import galaxy

if TYPE_CHECKING:
    from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets

from .dataset_factory import DatasetFactory
from .outputs import Outputs
from .parameters import Parameters
from .util import WorkState
@@ -39,6 +40,10 @@ class Job:

    def _run_and_wait(self, params: Optional[Parameters]) -> None:
        """Runs tools and waits for result."""
        
        if params.workflow_parameter_set:
            self.submit_workflow(params)
        else:
            self.submit(params)
        try:
            self.wait_for_results()
@@ -92,14 +97,7 @@ class Job:
        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        if params:
            for param, val in params.inputs.items():
                if isinstance(val, AbstractData):
                    datasets_to_upload[param] = val
                else:
                    tool_inputs.set_param(param, val)
            ids = upload_datasets(store=self.store, datasets=datasets_to_upload)
            for param, val in ids.items():
                tool_inputs.set_dataset_param(param, val)
            tool_inputs = DatasetFactory.upload_data(self.store, params)

        # Run tool and wait for job to finish
        self.status.state = WorkState.QUEUED
@@ -110,6 +108,38 @@ class Job:
        self.datasets = results["outputs"]
        self.collections = results["output_collections"]
        
    def submit_workflow(self, params: Optional[Parameters]):
        self.status.state = WorkState.UPLOADING_DATA
        self.url = None
        datasets_to_upload = {}

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        if params:
            tool_inputs = DatasetFactory.upload_data(self.store, params)
            
        # Convert the InputsBuilder into a dictionary
        inputs = {}
        for key, val in tool_inputs.to_dict().items():
            inputs[key] = val
            
        print(inputs)
        print(self.galaxy_instance.workflows.get_workflow_inputs(self.tool, "Image files"))
        print(self.galaxy_instance.workflows.show_workflow(self.tool))
        params_dict = {
             "355": {
                 "Image files": inputs["Image files"]
                 }
            }
        # Run tool and wait for job to finish
        self.status.state = WorkState.QUEUED
        results = self.galaxy_instance.workflows.invoke_workflow(
            workflow_id=self.tool, history_id=self.store.history_id, inputs=inputs, params=params_dict
        )
        self.id = results["jobs"][0]["id"]
        self.datasets = results["outputs"]
        self.collections = results["output_collections"]

    def cancel(self, check_results: bool = False) -> bool:
        """Cancels or stops a job in Galaxy."""
        self.url = None
@@ -149,15 +179,11 @@ class Job:
            outputs = Outputs()
            if self.datasets:
                for dataset in self.datasets:
                    d = Dataset(dataset["output_name"])
                    d.id = dataset["id"]
                    d.store = self.store
                    d = DatasetFactory.create_dataset(self.store, dataset)
                    outputs.add_output(d)
            if self.collections:
                for collection in self.collections:
                    dc = DatasetCollection(collection["output_name"])
                    dc.id = collection["id"]
                    dc.store = self.store
                    dc = DatasetFactory.create_dataset_collection(self.store, collection)
                    outputs.add_output(dc)

            return outputs
+14 −13
Original line number Diff line number Diff line
"""Encapsulates the output datasets and collections for a Tool."""

from typing import Any, List
from typing import TYPE_CHECKING, Any, List

if TYPE_CHECKING:
    from .dataset import AbstractData, Dataset, DatasetCollection


@@ -9,14 +10,14 @@ class Outputs:
    """Contains the output datasets and collections for a Tool."""

    def __init__(self) -> None:
        self.data: List[AbstractData] = []
        self.data: List['AbstractData'] = []

    def __iter__(self) -> Any:
        """Iterator."""
        self._iterator = 0
        return self

    def __next__(self) -> AbstractData:
    def __next__(self) -> 'AbstractData':
        """Get next element for iterator."""
        if self._iterator >= len(self.data):
            raise StopIteration
@@ -24,17 +25,17 @@ class Outputs:
        self._iterator += 1
        return d

    def add_output(self, data: AbstractData) -> None:
    def add_output(self, data: 'AbstractData') -> None:
        self.data.append(data)

    def get_dataset(self, name: str) -> AbstractData:
    def get_dataset(self, name: str) -> 'AbstractData':
        for d in self.data:
            print(d)
        try:
            return next(filter(lambda x: isinstance(x, Dataset) and x.name == name, self.data))
            #return next(filter(lambda x: x.name == name, self.data))
            for d in self.data:
                if d.name == name:
                    return d
            return self.data[0]
        except StopIteration as e:
            raise Exception(f"There is no dataset: {name}") from e

    def get_collection(self, name: str) -> AbstractData:
        try:
            return next(filter(lambda x: isinstance(x, DatasetCollection) and x.name == name, self.data))
        except StopIteration as e:
            raise Exception(f"There is no dataset collection: {name}") from e
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ class Parameters:

    def __init__(self) -> None:
        self.inputs: Dict[str, Any] = {}
        self.workflow_parameter_set = False

    def add_input(self, name: str, value: Any) -> None:
        self.inputs[name] = value
Loading