Commit 902025e0 authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

Merge branch '128-stdout-not-none' into 'dev'

Ensure stdout/stderr from pulsar is not processed as None

Closes #128

See merge request !104
parents dca01854 0b1f04cf
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -9,8 +9,8 @@ variables:
  CONTAINER_GALAXY_URL: "${NDIP_DOCKER_REPOSITORY}/${CI_PROJECT_PATH}"
  CONTAINER_GALAXY_BASE_URL: "${CONTAINER_GALAXY_URL}/base"
  CONTAINER_GALAXY_COMMIT_URL: "${CONTAINER_GALAXY_URL}/commit"
  GALAXY_VERSION_PYTHON: 24.1.dev3+ornl
  GALAXY_VERSION_DOCKER: 24.1.dev3.ornl
  GALAXY_VERSION_PYTHON: 24.1.dev4+ornl
  GALAXY_VERSION_DOCKER: 24.1.dev4.ornl

# This import is for the func_rse_docker_* functions
before_script:
+5 −2
Original line number Diff line number Diff line
@@ -1240,7 +1240,7 @@ class DefaultJobDispatcher:
            log.debug(f"({job_wrapper.job_id}) Dispatching to {job_wrapper.job_destination.runner} runner")
        runner.put(job_wrapper)

    def stop(self, job, job_wrapper):
    def stop(self, job, job_wrapper, soft_kill=False):
        """
        Stop the given job. The input variable job may be either a Job or a Task.
        """
@@ -1260,6 +1260,9 @@ class DefaultJobDispatcher:
            runner_name = job_runner_name.split(":", 1)[0]
            log.debug(f"Stopping job {job_wrapper.get_id_tag()} in {runner_name} runner")
            try:
                try:
                    self.job_runners[runner_name].stop_job(job_wrapper, soft_kill=soft_kill)
                except:
                    self.job_runners[runner_name].stop_job(job_wrapper)
            except KeyError:
                log.error(f"stop(): ({job_wrapper.get_id_tag()}) Invalid job runner: {runner_name}")
+5 −1
Original line number Diff line number Diff line
@@ -684,6 +684,10 @@ class PulsarJobRunner(AsynchronousJobRunner):
                        tool_stderr = file_content.read()
                except Exception:
                    pass
            if not tool_stdout:
                tool_stdout = ''
            if not tool_stderr:
                tool_stderr = ''
            job_stdout = run_results.get("job_stdout")
            job_stderr = run_results.get("job_stderr")
            exit_code = run_results.get("returncode")
@@ -758,7 +762,7 @@ class PulsarJobRunner(AsynchronousJobRunner):
                )
            return False

    def stop_job(self, job_wrapper, soft_kill=True):
    def stop_job(self, job_wrapper, soft_kill=False):
        job = job_wrapper.get_job()
        if not job.job_runner_external_id:
            return
+3 −1
Original line number Diff line number Diff line
@@ -337,12 +337,14 @@ class JobManager:
        if not job.finished:
            try:
                job.mark_stopped(self.app.config.track_jobs_in_database)
                job_wrapper = self.app.job_manager.job_handler.job_queue.job_wrapper(job)
                self.app.job_manager.job_handler.dispatcher.stop(job, job_wrapper, soft_kill=True)
                session = self.app.model.session
                with transaction(session):
                    session.commit()
                self.app.job_manager.stop(job, message="")
                return True
            except Exception:
            except Exception as e:
                log.error("Job Runner does not support stopping job early.")
        return False