Unverified Commit 54481446 authored by mvdbeek's avatar mvdbeek
Browse files

Only ``stream`` stdout/stderr on kubernetes

parent 09621441
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ def build_command(
    create_tool_working_directory=True,
    remote_command_params=None,
    remote_job_directory=None,
    stream_stdout_stderr=False,
):
    """
    Compose the sequence of commands necessary to execute a job. This will
@@ -94,7 +95,7 @@ def build_command(

    for_pulsar = 'script_directory' in remote_command_params
    if not for_pulsar:
        commands_builder.capture_stdout_stderr('../outputs/tool_stdout', '../outputs/tool_stderr')
        commands_builder.capture_stdout_stderr('../outputs/tool_stdout', '../outputs/tool_stderr', stream_stdout_stderr=stream_stdout_stderr)

    # Don't need to create a separate tool working directory for Pulsar
    # jobs - that is handled by Pulsar.
@@ -277,7 +278,10 @@ class CommandsBuilder:
    def append_commands(self, commands):
        self.append_command("; ".join(c for c in commands if c))

    def capture_stdout_stderr(self, stdout_file, stderr_file):
    def capture_stdout_stderr(self, stdout_file, stderr_file, stream_stdout_stderr=False):
        if not stream_stdout_stderr:
            self.append_command(f'> "{stdout_file}" 2> "{stderr_file}"', sep="")
            return
        trap_command = """trap 'rm -f "$__out" "$__err"' EXIT"""
        if TRAP_KILL_CONTAINER in self.commands:
            # We need to replace the container kill trap with one that removes the named pipes and kills the container
+4 −0
Original line number Diff line number Diff line
@@ -224,6 +224,7 @@ class BaseJobRunner:
        include_metadata=False,
        include_work_dir_outputs=True,
        modify_command_for_container=True,
        stream_stdout_stderr=False,
    ):
        """Some sanity checks that all runners' queue_job() methods are likely to want to do"""
        job_id = job_wrapper.get_id_tag()
@@ -250,6 +251,7 @@ class BaseJobRunner:
                include_metadata=include_metadata,
                include_work_dir_outputs=include_work_dir_outputs,
                modify_command_for_container=modify_command_for_container,
                stream_stdout_stderr=stream_stdout_stderr,
            )
        except Exception as e:
            log.exception("(%s) Failure preparing job", job_id)
@@ -277,6 +279,7 @@ class BaseJobRunner:
                           include_metadata=False,
                           include_work_dir_outputs=True,
                           modify_command_for_container=True,
                           stream_stdout_stderr=False,
                           ):
        container = self._find_container(job_wrapper)
        if not container and job_wrapper.requires_containerization:
@@ -288,6 +291,7 @@ class BaseJobRunner:
            include_work_dir_outputs=include_work_dir_outputs,
            modify_command_for_container=modify_command_for_container,
            container=container,
            stream_stdout_stderr=stream_stdout_stderr,
        )

    def get_work_dir_outputs(self, job_wrapper, job_working_directory=None, tool_working_directory=None):
+1 −0
Original line number Diff line number Diff line
@@ -149,6 +149,7 @@ class KubernetesJobRunner(AsynchronousJobRunner):
            job_wrapper,
            include_metadata=False,
            modify_command_for_container=False,
            stream_stdout_stderr=True,
        ):
            return

+39 −29
Original line number Diff line number Diff line
@@ -18,17 +18,15 @@ TEST_METADATA_LINE = "set_metadata_and_stuff.sh"
TEST_FILES_PATH = "file_path"
TEE_REDIRECT = '> "$__out" 2> "$__err"'
RETURN_CODE_CAPTURE = "; return_code=$?; echo $return_code > galaxy_1.ec"
CAPTURE_AND_REDIRECT = f"{TEE_REDIRECT}{RETURN_CODE_CAPTURE}"
CP_WORK_DIR_OUTPUTS = '; \nif [ -f "foo" ] ; then cp "foo" "bar" ; fi'
TEE_LOG = """__out="${TMPDIR:-.}/out.$$" __err="${TMPDIR:-.}/err.$$"
mkfifo "$__out" "$__err"
trap 'rm -f "$__out" "$__err"' EXIT
tee -a '../outputs/tool_stdout' < "$__out" &
tee -a '../outputs/tool_stderr' < "$__err" >&2 & """


class TestCommandFactory(TestCase):

    stream_stdout_stderr = False
    TEE_LOG = " "
    CAPTURE_AND_REDIRECT = f'> "../outputs/tool_stdout" 2> "../outputs/tool_stderr"{RETURN_CODE_CAPTURE}'

    def setUp(self):
        self.job_dir = mkdtemp()
        self.job_wrapper = MockJobWrapper(self.job_dir)
@@ -47,29 +45,20 @@ class TestCommandFactory(TestCase):

    def test_simplest_command(self):
        self.include_work_dir_outputs = False
        self.__assert_command_is(self._surround_command(MOCK_COMMAND_LINE))

    def test_kill_trap_replaced(self):
        self.include_work_dir_outputs = False
        self.job_wrapper.command_line = f"{TRAP_KILL_CONTAINER}{MOCK_COMMAND_LINE}"
        expected_command_line = self._surround_command(MOCK_COMMAND_LINE).replace(
            """trap 'rm -f "$__out" "$__err"' EXIT""",
            """trap 'rm -f "$__out" "$__err"; _on_exit' EXIT"""
        )
        self.__assert_command_is(expected_command_line)
        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE))

    def test_shell_commands(self):
        self.include_work_dir_outputs = False
        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]
        self.job_wrapper.dependency_shell_commands = dep_commands
        self.__assert_command_is(self._surround_command(f"{dep_commands[0]}; {MOCK_COMMAND_LINE}"))
        self._assert_command_is(self._surround_command(f"{dep_commands[0]}; {MOCK_COMMAND_LINE}"))

    def test_shell_commands_external(self):
        self.job_wrapper.commands_in_new_shell = True
        self.include_work_dir_outputs = False
        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]
        self.job_wrapper.dependency_shell_commands = dep_commands
        self.__assert_command_is(self._surround_command(
        self._assert_command_is(self._surround_command(
            '{} {}/tool_script.sh'.format(
                self.job_wrapper.shell,
                self.job_wrapper.working_directory,
@@ -80,35 +69,35 @@ class TestCommandFactory(TestCase):
        self.include_work_dir_outputs = False
        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]
        self.job_wrapper.dependency_shell_commands = dep_commands
        self.__assert_command_is(self._surround_command(MOCK_COMMAND_LINE), remote_command_params=dict(dependency_resolution="remote"))
        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE), remote_command_params=dict(dependency_resolution="remote"))

    def test_explicit_local_dependency_resolution(self):
        self.include_work_dir_outputs = False
        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]
        self.job_wrapper.dependency_shell_commands = dep_commands
        self.__assert_command_is(self._surround_command(f"{dep_commands[0]}; {MOCK_COMMAND_LINE}"),
        self._assert_command_is(self._surround_command(f"{dep_commands[0]}; {MOCK_COMMAND_LINE}"),
                                remote_command_params=dict(dependency_resolution="local"))

    def test_task_prepare_inputs(self):
        self.include_work_dir_outputs = False
        self.job_wrapper.prepare_input_files_cmds = ["/opt/split1", "/opt/split2"]
        self.__assert_command_is(self._surround_command(f"/opt/split1; /opt/split2; {MOCK_COMMAND_LINE}"))
        self._assert_command_is(self._surround_command(f"/opt/split1; /opt/split2; {MOCK_COMMAND_LINE}"))

    def test_workdir_outputs(self):
        self.include_work_dir_outputs = True
        self.workdir_outputs = [("foo", "bar")]
        self.__assert_command_is(self._surround_command(MOCK_COMMAND_LINE, CP_WORK_DIR_OUTPUTS))
        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE, CP_WORK_DIR_OUTPUTS))

    def test_workdir_outputs_with_glob(self):
        self.include_work_dir_outputs = True
        self.workdir_outputs = [("foo*bar", "foo_x_bar")]
        self.__assert_command_is(self._surround_command(
        self._assert_command_is(self._surround_command(
            MOCK_COMMAND_LINE, '; \nif [ -f "foo"*"bar" ] ; then cp "foo"*"bar" "foo_x_bar" ; fi'))

    def test_set_metadata_skipped_if_unneeded(self):
        self.include_metadata = True
        self.include_work_dir_outputs = False
        self.__assert_command_is(self._surround_command(MOCK_COMMAND_LINE))
        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE))

    def test_set_metadata(self):
        self._test_set_metadata()
@@ -122,7 +111,7 @@ class TestCommandFactory(TestCase):
        self.include_work_dir_outputs = False
        self.job_wrapper.metadata_line = TEST_METADATA_LINE
        expected_command = self._surround_command(MOCK_COMMAND_LINE, f"; cd '{self.job_dir}'; {SETUP_GALAXY_FOR_METADATA}{TEST_METADATA_LINE}")
        self.__assert_command_is(expected_command)
        self._assert_command_is(expected_command)

    def test_empty_metadata(self):
        """
@@ -133,7 +122,7 @@ class TestCommandFactory(TestCase):
        self.job_wrapper.metadata_line = ' '
        # Empty metadata command do not touch command line.
        expected_command = self._surround_command(MOCK_COMMAND_LINE, f"; cd '{self.job_dir}'")
        self.__assert_command_is(expected_command)
        self._assert_command_is(expected_command)

    def test_metadata_kwd_defaults(self):
        configured_kwds = self.__set_metadata_with_kwds()
@@ -164,7 +153,7 @@ class TestCommandFactory(TestCase):
            self.__command()
        return self.job_wrapper.configured_external_metadata_kwds

    def __assert_command_is(self, expected_command, **command_kwds):
    def _assert_command_is(self, expected_command, **command_kwds):
        command = self.__command(**command_kwds)
        self.assertEqual(command, expected_command)

@@ -181,15 +170,36 @@ class TestCommandFactory(TestCase):
            job_wrapper=self.job_wrapper,
            include_metadata=self.include_metadata,
            include_work_dir_outputs=self.include_work_dir_outputs,
            stream_stdout_stderr=self.stream_stdout_stderr,
            **extra_kwds
        )
        return build_command(**kwds)

    def _surround_command(self, command, post_command=""):
        command = f'''{PREPARE_DIRS}; {TEE_LOG}{command} {CAPTURE_AND_REDIRECT}{post_command}; sh -c "exit $return_code"'''
        command = f'''{PREPARE_DIRS};{self.TEE_LOG}{command} {self.CAPTURE_AND_REDIRECT}{post_command}; sh -c "exit $return_code"'''
        return command.replace("galaxy_1.ec", os.path.join(self.job_wrapper.working_directory, "galaxy_1.ec"), 1)


class TestCommandFactoryStreamStdoutStderr(TestCommandFactory):

    stream_stdout_stderr = True
    TEE_LOG = """ __out="${TMPDIR:-.}/out.$$" __err="${TMPDIR:-.}/err.$$"
mkfifo "$__out" "$__err"
trap 'rm -f "$__out" "$__err"' EXIT
tee -a '../outputs/tool_stdout' < "$__out" &
tee -a '../outputs/tool_stderr' < "$__err" >&2 & """
    CAPTURE_AND_REDIRECT = f"{TEE_REDIRECT}{RETURN_CODE_CAPTURE}"

    def test_kill_trap_replaced(self):
        self.include_work_dir_outputs = False
        self.job_wrapper.command_line = f"{TRAP_KILL_CONTAINER}{MOCK_COMMAND_LINE}"
        expected_command_line = self._surround_command(MOCK_COMMAND_LINE).replace(
            """trap 'rm -f "$__out" "$__err"' EXIT""",
            """trap 'rm -f "$__out" "$__err"; _on_exit' EXIT"""
        )
        self._assert_command_is(expected_command_line)


class MockJobWrapper:

    def __init__(self, job_dir):