Loading src/nova/galaxy/data_store.py +17 −7 Original line number Diff line number Diff line Loading @@ -28,11 +28,20 @@ class Datastore: """ self.persist_store = True def recover_tools(self) -> List[Tool]: def recover_tools(self, filter_running: bool = True) -> List[Tool]: """Recovers all running tools in this data_store. Mainly used to recover all the running tools inside of this data store or any past persisted data stores that used the same name. Can also be used to simply get a list of all running tools in a store as well. Parameters ---------- filter_running: bool If this should only recover tools that are running (true). Returns ------- List of tools from this data store. """ history_contents = self.nova_connection.galaxy_instance.histories.show_history( self.history_id, contents=True, deleted=False, details="all" Loading @@ -41,10 +50,11 @@ class Datastore: for dataset in history_contents: job_id = dataset.get("creating_job", None) if job_id: tool_id = self.nova_connection.galaxy_instance.jobs.show_job(job_id)["tool_id"] info = self.nova_connection.galaxy_instance.jobs.show_job(job_id) if info["state"] == "running" or info["state"] == "queued" or not filter_running: tool_id = info["tool_id"] t = Tool(tool_id) t.assign_id(job_id, self) t.get_url() t.get_status() t.get_url(max_tries=1) tools.append(t) return tools src/nova/galaxy/tool.py +63 −7 Original line number Diff line number Diff line Loading @@ -102,7 +102,14 @@ class Tool(AbstractWork): return self._job.run_interactive(params, wait=wait, max_tries=max_tries, check_url=check_url) def get_status(self) -> WorkState: """Returns the current status of the tool.""" """Returns the current status of the tool. Returns ------- WorkState Returns the status of the tool which will be one of the following values: not_started, uploading, queued, running, finished, error """ if self._job: return self._job.get_state().state else: Loading @@ -114,6 +121,10 @@ class Tool(AbstractWork): Throws an Exception if the tool has not finished yet. Will be overridden if this tool is run again. Returns ------- Optional[Outputs] An instance of Outputs that holds the datasets and collections from this tool execution if it is finished. """ if self._job: return self._job.get_results() Loading @@ -130,30 +141,75 @@ class Tool(AbstractWork): self._job.cancel(check_results=False) def get_stdout(self) -> Optional[str]: """Get the current STDOUT for a tool. Will be overridden everytime this tool is run.""" """Get the current STDOUT for a tool. Will be overridden everytime this tool is run. Returns ------- Optional[str] Returns the current STDOUT of the tool if it is running or finished. """ if self._job: return self._job.get_console_output()["stdout"] return None def get_stderr(self) -> Optional[str]: """Get the current STDERR for a tool. Will be overridden everytime this tool is run.""" """Get the current STDERR for a tool. Will be overridden everytime this tool is run. Returns ------- Optional[str] Returns the current STDERR of the tool if it is running or finished. """ if self._job: return self._job.get_console_output()["stderr"] return None def get_url(self) -> Optional[str]: """Get the URL for this tool. If this is an interactive tool, then will return the endpoint to the tool.""" def get_url(self, max_tries: int = 5) -> Optional[str]: """Get the URL for this tool. If this is an interactive tool, then will return the endpoint to the tool. The first call may need to query Galaxy, but the url is cached for subsequent calls. Parameters ---------- max_tries: int How many attempts to obtain the url. """ if self._job: return self._job.get_url() return self._job.get_url(max_tries=max_tries, check_url=False) return None def get_uid(self) -> Optional[str]: """Get the unique ID for this tool. Will only be available if Tool.run() has been successfully invoked.""" """Get the unique ID for this tool. Will only be available if Tool.run() has been successfully invoked. Returns ------- Optional[str] Returns the uid of this tool if it is running or finished. """ if self._job: return self._job.id return None def assign_id(self, new_id: str, data_store: "Datastore") -> None: """Assigns an id to this tool. Assigns this tool a new id, so that it can track already existing tools. Useful for recovering old tools if you have kept track of the id. Parameters ---------- new_id: str The new id to assign to this tool. data_store: Datastore The datastore in which the tool should be tracked. """ if self._job: raise Exception("Tool cannot be currently assigned an ID. Do not directly call this method.") self._job = Job(self.id, data_store) Loading src/nova/galaxy/util.py +6 −6 Original line number Diff line number Diff line Loading @@ -6,9 +6,9 @@ from enum import Enum class WorkState(Enum): """The state of a tool in Galaxy.""" NOT_STARTED = 1 UPLOADING_DATA = 2 QUEUED = 3 RUNNING = 4 FINISHED = 5 ERROR = 6 NOT_STARTED = "not_started" UPLOADING_DATA = "uploading" QUEUED = "queued" RUNNING = "running" FINISHED = "finished" ERROR = "error" Loading
src/nova/galaxy/data_store.py +17 −7 Original line number Diff line number Diff line Loading @@ -28,11 +28,20 @@ class Datastore: """ self.persist_store = True def recover_tools(self) -> List[Tool]: def recover_tools(self, filter_running: bool = True) -> List[Tool]: """Recovers all running tools in this data_store. Mainly used to recover all the running tools inside of this data store or any past persisted data stores that used the same name. Can also be used to simply get a list of all running tools in a store as well. Parameters ---------- filter_running: bool If this should only recover tools that are running (true). Returns ------- List of tools from this data store. """ history_contents = self.nova_connection.galaxy_instance.histories.show_history( self.history_id, contents=True, deleted=False, details="all" Loading @@ -41,10 +50,11 @@ class Datastore: for dataset in history_contents: job_id = dataset.get("creating_job", None) if job_id: tool_id = self.nova_connection.galaxy_instance.jobs.show_job(job_id)["tool_id"] info = self.nova_connection.galaxy_instance.jobs.show_job(job_id) if info["state"] == "running" or info["state"] == "queued" or not filter_running: tool_id = info["tool_id"] t = Tool(tool_id) t.assign_id(job_id, self) t.get_url() t.get_status() t.get_url(max_tries=1) tools.append(t) return tools
src/nova/galaxy/tool.py +63 −7 Original line number Diff line number Diff line Loading @@ -102,7 +102,14 @@ class Tool(AbstractWork): return self._job.run_interactive(params, wait=wait, max_tries=max_tries, check_url=check_url) def get_status(self) -> WorkState: """Returns the current status of the tool.""" """Returns the current status of the tool. Returns ------- WorkState Returns the status of the tool which will be one of the following values: not_started, uploading, queued, running, finished, error """ if self._job: return self._job.get_state().state else: Loading @@ -114,6 +121,10 @@ class Tool(AbstractWork): Throws an Exception if the tool has not finished yet. Will be overridden if this tool is run again. Returns ------- Optional[Outputs] An instance of Outputs that holds the datasets and collections from this tool execution if it is finished. """ if self._job: return self._job.get_results() Loading @@ -130,30 +141,75 @@ class Tool(AbstractWork): self._job.cancel(check_results=False) def get_stdout(self) -> Optional[str]: """Get the current STDOUT for a tool. Will be overridden everytime this tool is run.""" """Get the current STDOUT for a tool. Will be overridden everytime this tool is run. Returns ------- Optional[str] Returns the current STDOUT of the tool if it is running or finished. """ if self._job: return self._job.get_console_output()["stdout"] return None def get_stderr(self) -> Optional[str]: """Get the current STDERR for a tool. Will be overridden everytime this tool is run.""" """Get the current STDERR for a tool. Will be overridden everytime this tool is run. Returns ------- Optional[str] Returns the current STDERR of the tool if it is running or finished. """ if self._job: return self._job.get_console_output()["stderr"] return None def get_url(self) -> Optional[str]: """Get the URL for this tool. If this is an interactive tool, then will return the endpoint to the tool.""" def get_url(self, max_tries: int = 5) -> Optional[str]: """Get the URL for this tool. If this is an interactive tool, then will return the endpoint to the tool. The first call may need to query Galaxy, but the url is cached for subsequent calls. Parameters ---------- max_tries: int How many attempts to obtain the url. """ if self._job: return self._job.get_url() return self._job.get_url(max_tries=max_tries, check_url=False) return None def get_uid(self) -> Optional[str]: """Get the unique ID for this tool. Will only be available if Tool.run() has been successfully invoked.""" """Get the unique ID for this tool. Will only be available if Tool.run() has been successfully invoked. Returns ------- Optional[str] Returns the uid of this tool if it is running or finished. """ if self._job: return self._job.id return None def assign_id(self, new_id: str, data_store: "Datastore") -> None: """Assigns an id to this tool. Assigns this tool a new id, so that it can track already existing tools. Useful for recovering old tools if you have kept track of the id. Parameters ---------- new_id: str The new id to assign to this tool. data_store: Datastore The datastore in which the tool should be tracked. """ if self._job: raise Exception("Tool cannot be currently assigned an ID. Do not directly call this method.") self._job = Job(self.id, data_store) Loading
src/nova/galaxy/util.py +6 −6 Original line number Diff line number Diff line Loading @@ -6,9 +6,9 @@ from enum import Enum class WorkState(Enum): """The state of a tool in Galaxy.""" NOT_STARTED = 1 UPLOADING_DATA = 2 QUEUED = 3 RUNNING = 4 FINISHED = 5 ERROR = 6 NOT_STARTED = "not_started" UPLOADING_DATA = "uploading" QUEUED = "queued" RUNNING = "running" FINISHED = "finished" ERROR = "error"