Commit 23aadd11 authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Provide an option to get stdout as part of api status request

parent d2c74366
Loading
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ import logging
import os
import re
import subprocess
from pathlib import Path
from time import sleep

import packaging.version
@@ -644,6 +645,11 @@ class PulsarJobRunner(AsynchronousJobRunner):
            run_results = client.full_status()
            remote_metadata_directory = run_results.get("metadata_directory", None)
            stdout = run_results.get("stdout", "")
            if stdout == "":
                stdout_path = Path(".").parent.parent.parent.parent.parent / "database/jobs_directory/000" / str(
                    run_results["job_id"]) / "outputs/tool_stdout"
                stdout_file = open(stdout_path, "r")
                stdout = stdout_file.read()
            stderr = run_results.get("stderr", "")
            exit_code = run_results.get("returncode", None)
            pulsar_outputs = PulsarOutputs.from_status_response(run_results)
+12 −8
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ from galaxy.util.search import (

log = logging.getLogger(__name__)

STDOUT_PAGE_SIZE_CHARS = 5000

class JobLock(BaseModel):
    active: bool = Field(title="Job lock status", description="If active, jobs will not dispatch")
@@ -220,7 +221,7 @@ class JobManager:
        )
        return self.job_lock()

    def get_accessible_job(self, trans, decoded_job_id):
    def get_accessible_job(self, trans, decoded_job_id, stdout_page=0):
        job = trans.sa_session.query(trans.app.model.Job).filter(trans.app.model.Job.id == decoded_job_id).first()
        if job is None:
            raise ObjectNotFound()
@@ -237,17 +238,20 @@ class JobManager:
                if not self.dataset_manager.is_accessible(data_assoc.dataset.dataset, trans.user):
                    raise ItemAccessibilityException("You are not allowed to rerun this job.")
        trans.sa_session.refresh(job)
        if job.state == job.states.RUNNING:
        if job.state == job.states.RUNNING and stdout_page != 0:
            try:
                stdout_path = Path(".").parent.parent.parent.parent.parent / "database/jobs_directory/000" / str(
                    job.id) / "outputs/tool_stdout"
                stdout_file = open(stdout_path, "r")
                # stderr_file = open("../../../../../database/jobs_directory/000/" + str(job.id) + "/outputs/tool_stderr")
                job.job_stdout = stdout_file.read()
                stdout_file = open(stdout_path, "rb")
                if stdout_page < 0:
                    stdout_file.seek((stdout_page) * STDOUT_PAGE_SIZE_CHARS, 2)
                    job.job_stdout = stdout_file.read(STDOUT_PAGE_SIZE_CHARS).decode("utf-8")
                else:
                    stdout_file.seek((stdout_page - 1) * STDOUT_PAGE_SIZE_CHARS)
                    job.job_stdout = stdout_file.read(STDOUT_PAGE_SIZE_CHARS).decode("utf-8")
                job.tool_stdout = job.job_stdout
                # job.stderr = stderr_file.read()
            except:
                log.error("Could not load std out")
            except Exception as e:
                log.error("Could not read STDOUT: %s", e)
        return job

    def stop(self, job, message=None):
+5 −1
Original line number Diff line number Diff line
@@ -104,6 +104,10 @@ class JobFilesAPIController(BaseGalaxyAPIController):
        target_dir = os.path.dirname(path)
        util.safe_makedirs(target_dir)
        try:
            if os.path.exists(path):
                with open(path, 'ab') as dest:
                    shutil.copyfileobj(open(input_file.name, 'rb'), dest)
            else:
                shutil.move(input_file.name, path)
        finally:
            try:
+2 −1
Original line number Diff line number Diff line
@@ -178,6 +178,7 @@ class FastAPIJobs:
        id: EncodedDatabaseIdField,
        trans: ProvidesUserContext = DependsOnTrans,
        full: Optional[bool] = False,
        stdout_page: Optional[int] = None,
    ) -> Dict[str, Any]:
        """
        Return dictionary containing description of job data
@@ -186,7 +187,7 @@ class FastAPIJobs:
        - id: ID of job to return
        - full: Return extra information ?
        """
        return self.service.show(trans, id, bool(full))
        return self.service.show(trans, id, bool(full), int(stdout_page) if stdout_page else 0)

    @router.get("/api/jobs")
    def index(
+2 −1
Original line number Diff line number Diff line
@@ -48,9 +48,10 @@ class JobsService:
        trans: ProvidesUserContext,
        id: EncodedDatabaseIdField,
        full: bool = False,
        stdout_page: int = 0,
    ) -> Dict[str, Any]:
        id = trans.app.security.decode_id(id)
        job = self.job_manager.get_accessible_job(trans, id)
        job = self.job_manager.get_accessible_job(trans, id, stdout_page)
        return view_show_job(trans, job, bool(full))

    def index(