Unverified Commit 055a6428 authored by Nicola Soranzo's avatar Nicola Soranzo Committed by GitHub
Browse files

Merge pull request #13642 from nsoranzo/release_22.01_fix_13637

[22.01] Only stream stdout/stderr on kubernetes
parents dcd0473d 3db96f40
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ def build_command(
    create_tool_working_directory=True,
    remote_command_params=None,
    remote_job_directory=None,
    stream_stdout_stderr=False,
):
    """
    Compose the sequence of commands necessary to execute a job. This will
@@ -94,7 +95,7 @@ def build_command(

    for_pulsar = 'script_directory' in remote_command_params
    if not for_pulsar:
        commands_builder.capture_stdout_stderr('../outputs/tool_stdout', '../outputs/tool_stderr')
        commands_builder.capture_stdout_stderr('../outputs/tool_stdout', '../outputs/tool_stderr', stream_stdout_stderr=stream_stdout_stderr)

    # Don't need to create a separate tool working directory for Pulsar
    # jobs - that is handled by Pulsar.
@@ -277,7 +278,10 @@ class CommandsBuilder:
    def append_commands(self, commands):
        self.append_command("; ".join(c for c in commands if c))

    def capture_stdout_stderr(self, stdout_file, stderr_file):
    def capture_stdout_stderr(self, stdout_file, stderr_file, stream_stdout_stderr=False):
        if not stream_stdout_stderr:
            self.append_command(f"> '{stdout_file}' 2> '{stderr_file}'", sep="")
            return
        trap_command = """trap 'rm -f "$__out" "$__err"' EXIT"""
        if TRAP_KILL_CONTAINER in self.commands:
            # We need to replace the container kill trap with one that removes the named pipes and kills the container
+4 −0
Original line number Diff line number Diff line
@@ -224,6 +224,7 @@ class BaseJobRunner:
        include_metadata=False,
        include_work_dir_outputs=True,
        modify_command_for_container=True,
        stream_stdout_stderr=False,
    ):
        """Some sanity checks that all runners' queue_job() methods are likely to want to do"""
        job_id = job_wrapper.get_id_tag()
@@ -250,6 +251,7 @@ class BaseJobRunner:
                include_metadata=include_metadata,
                include_work_dir_outputs=include_work_dir_outputs,
                modify_command_for_container=modify_command_for_container,
                stream_stdout_stderr=stream_stdout_stderr,
            )
        except Exception as e:
            log.exception("(%s) Failure preparing job", job_id)
@@ -277,6 +279,7 @@ class BaseJobRunner:
                           include_metadata=False,
                           include_work_dir_outputs=True,
                           modify_command_for_container=True,
                           stream_stdout_stderr=False,
                           ):
        container = self._find_container(job_wrapper)
        if not container and job_wrapper.requires_containerization:
@@ -288,6 +291,7 @@ class BaseJobRunner:
            include_work_dir_outputs=include_work_dir_outputs,
            modify_command_for_container=modify_command_for_container,
            container=container,
            stream_stdout_stderr=stream_stdout_stderr,
        )

    def get_work_dir_outputs(self, job_wrapper, job_working_directory=None, tool_working_directory=None):
+1 −0
Original line number Diff line number Diff line
@@ -149,6 +149,7 @@ class KubernetesJobRunner(AsynchronousJobRunner):
            job_wrapper,
            include_metadata=False,
            modify_command_for_container=False,
            stream_stdout_stderr=True,
        ):
            return

+34 −9
Original line number Diff line number Diff line
@@ -205,25 +205,50 @@ steps:

    @uses_test_history(require_new=True)
    def test_show(self, history_id):
        # Create HDA to ensure at least one job exists...
        self.__history_with_new_dataset(history_id)

        jobs_response = self._get("jobs")
        first_job = jobs_response.json()[0]
        job_properties_tool_run = self.dataset_populator.run_tool(
            tool_id="job_properties",
            inputs={},
            history_id=history_id,
        )
        first_job = self.__jobs_index()[0]
        self._assert_has_key(first_job, 'id', 'state', 'exit_code', 'update_time', 'create_time')

        job_id = first_job["id"]
        show_jobs_response = self._get(f"jobs/{job_id}")
        job_id = job_properties_tool_run["jobs"][0]["id"]
        show_jobs_response = self.dataset_populator.get_job_details(job_id)
        self._assert_status_code_is(show_jobs_response, 200)

        job_details = show_jobs_response.json()
        self._assert_has_key(job_details, 'id', 'state', 'exit_code', 'update_time', 'create_time')

        show_jobs_response = self._get(f"jobs/{job_id}", {"full": True})
        show_jobs_response = self.dataset_populator.get_job_details(job_id, full=True)
        self._assert_status_code_is(show_jobs_response, 200)

        job_details = show_jobs_response.json()
        self._assert_has_key(job_details, 'id', 'state', 'exit_code', 'update_time', 'create_time', 'stdout', 'stderr', 'job_messages')
        self._assert_has_key(
            job_details,
            "create_time",
            "exit_code",
            "id",
            "job_messages",
            "job_stderr",
            "job_stdout",
            "state",
            "stderr",
            "stdout",
            "tool_stderr",
            "tool_stdout",
            "update_time",
        )

        self.dataset_populator.wait_for_job(job_id, assert_ok=True)
        show_jobs_response = self.dataset_populator.get_job_details(job_id, full=True)
        job_details = show_jobs_response.json()
        assert "The bool is not true\n" not in job_details["job_stdout"]
        assert "The bool is very not true\n" not in job_details["job_stderr"]
        assert job_details["tool_stdout"] == "The bool is not true\n"
        assert job_details["tool_stderr"] == "The bool is very not true\n"
        assert "The bool is not true\n" in job_details["stdout"]
        assert "The bool is very not true\n" in job_details["stderr"]

    @uses_test_history(require_new=True)
    def test_show_security(self, history_id):
+1 −1
Original line number Diff line number Diff line
@@ -557,7 +557,7 @@ class BaseDatasetPopulator(BasePopulator):
        return wait_on_state(lambda: self.get_job_details(job_id, full=True), desc="job state", assert_ok=assert_ok, timeout=timeout)

    def get_job_details(self, job_id: str, full: bool = False) -> Response:
        return self._get(f"jobs/{job_id}?full={full}")
        return self._get(f"jobs/{job_id}", {"full": full})

    def cancel_history_jobs(self, history_id: str, wait=True) -> None:
        active_jobs = self.active_history_jobs(history_id)
Loading