Skip to content
Snippets Groups Projects
Commit be363295 authored by John Chilton's avatar John Chilton
Browse files

Fix Pulsar for exit code handling changes in...

Fix Pulsar for exit code handling changes in https://github.com/galaxyproject/galaxy/pull/13557/files
parent 4311a342
No related branches found
No related tags found
3 merge requests!44Try to fix CI.,!38Generalize orchestrated container scheduling.,!22update Pulsar to 0.15.3
...@@ -6,6 +6,7 @@ import errno ...@@ -6,6 +6,7 @@ import errno
import json import json
import logging import logging
import os import os
import platform
from os import ( from os import (
curdir, curdir,
getenv, getenv,
...@@ -78,6 +79,10 @@ class BaseManager(ManagerInterface): ...@@ -78,6 +79,10 @@ class BaseManager(ManagerInterface):
self.job_metrics = app.job_metrics self.job_metrics = app.job_metrics
self.object_store = app.object_store self.object_store = app.object_store
@property
def _is_windows(self) -> bool:
return platform.system().lower() == "windows"
def clean(self, job_id): def clean(self, job_id):
if self.debug: if self.debug:
# In debug mode skip cleaning job directories. # In debug mode skip cleaning job directories.
...@@ -193,7 +198,7 @@ class BaseManager(ManagerInterface): ...@@ -193,7 +198,7 @@ class BaseManager(ManagerInterface):
else: else:
return listdir(directory_or_none) return listdir(directory_or_none)
def _expand_command_line(self, command_line, dependencies_description, job_directory=None): def _expand_command_line(self, job_id, command_line: str, dependencies_description, job_directory=None) -> str:
if dependencies_description is None: if dependencies_description is None:
return command_line return command_line
......
...@@ -117,9 +117,21 @@ class DirectoryBaseManager(BaseManager): ...@@ -117,9 +117,21 @@ class DirectoryBaseManager(BaseManager):
tool_id = job_directory.load_metadata(JOB_FILE_TOOL_ID) tool_id = job_directory.load_metadata(JOB_FILE_TOOL_ID)
return tool_id return tool_id
def _expand_command_line(self, job_id, command_line: str, dependencies_description, job_directory=None) -> str:
command_line = super()._expand_command_line(
job_id, command_line, dependencies_description, job_directory=job_directory
)
if not self._is_windows:
rc_path = self._return_code_path(job_id)
CAPTURE_RETURN_CODE = "return_code=$?"
command_line = f"{command_line}; {CAPTURE_RETURN_CODE}; echo $return_code > {rc_path};"
return command_line
# Helpers methods related to setting up job script files. # Helpers methods related to setting up job script files.
def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[], setup_params=None): def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[], setup_params=None):
command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) command_line = self._expand_command_line(
job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory
)
script_env = self._job_template_env(job_id, command_line=command_line, env=env, setup_params=setup_params) script_env = self._job_template_env(job_id, command_line=command_line, env=env, setup_params=setup_params)
script = job_script(**script_env) script = job_script(**script_env)
return self._write_job_script(job_id, script) return self._write_job_script(job_id, script)
...@@ -138,7 +150,6 @@ class DirectoryBaseManager(BaseManager): ...@@ -138,7 +150,6 @@ class DirectoryBaseManager(BaseManager):
return tmp_dir return tmp_dir
def _job_template_env(self, job_id, command_line=None, env=[], setup_params=None): def _job_template_env(self, job_id, command_line=None, env=[], setup_params=None):
return_code_path = self._return_code_path(job_id)
# TODO: Add option to ignore remote env. # TODO: Add option to ignore remote env.
env = env + self.env_vars env = env + self.env_vars
setup_params = setup_params or {} setup_params = setup_params or {}
...@@ -149,7 +160,6 @@ class DirectoryBaseManager(BaseManager): ...@@ -149,7 +160,6 @@ class DirectoryBaseManager(BaseManager):
'galaxy_lib': self._galaxy_lib(), 'galaxy_lib': self._galaxy_lib(),
'preserve_python_environment': setup_params.get('preserve_galaxy_python_environment', False), 'preserve_python_environment': setup_params.get('preserve_galaxy_python_environment', False),
'env_setup_commands': env_setup_commands, 'env_setup_commands': env_setup_commands,
'exit_code_path': return_code_path,
'job_directory': self.job_directory(job_id).job_directory, 'job_directory': self.job_directory(job_id).job_directory,
'working_directory': self.job_directory(job_id).working_directory(), 'working_directory': self.job_directory(job_id).working_directory(),
'metadata_directory': self.job_directory(job_id).metadata_directory(), 'metadata_directory': self.job_directory(job_id).metadata_directory(),
......
...@@ -30,7 +30,9 @@ class CliQueueManager(ExternalBaseManager): ...@@ -30,7 +30,9 @@ class CliQueueManager(ExternalBaseManager):
stdout_path = self._stdout_path(job_id) stdout_path = self._stdout_path(job_id)
stderr_path = self._stderr_path(job_id) stderr_path = self._stderr_path(job_id)
job_name = self._job_name(job_id) job_name = self._job_name(job_id)
command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) command_line = self._expand_command_line(
job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory
)
job_script_kwargs = self._job_template_env( job_script_kwargs = self._job_template_env(
job_id, job_id,
command_line=command_line, command_line=command_line,
......
...@@ -60,10 +60,12 @@ class BaseUnqueuedManager(DirectoryBaseManager): ...@@ -60,10 +60,12 @@ class BaseUnqueuedManager(DirectoryBaseManager):
def _prepare_run(self, job_id, command_line, dependencies_description, env, setup_params=None): def _prepare_run(self, job_id, command_line, dependencies_description, env, setup_params=None):
self._check_execution_with_tool_file(job_id, command_line) self._check_execution_with_tool_file(job_id, command_line)
self._record_submission(job_id) self._record_submission(job_id)
if platform.system().lower() == "windows": if self._is_windows:
# TODO: Don't ignore requirements and env without warning. Ideally # TODO: Don't ignore requirements and env without warning. Ideally
# process them or at least warn about them being ignored. # process them or at least warn about them being ignored.
command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) command_line = self._expand_command_line(
job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory
)
else: else:
command_line = self._setup_job_file( command_line = self._setup_job_file(
job_id, job_id,
......
...@@ -42,7 +42,7 @@ INTEGRITY_SYNC_COMMAND = "/bin/sync" ...@@ -42,7 +42,7 @@ INTEGRITY_SYNC_COMMAND = "/bin/sync"
DEFAULT_INTEGRITY_CHECK = True DEFAULT_INTEGRITY_CHECK = True
DEFAULT_INTEGRITY_COUNT = 35 DEFAULT_INTEGRITY_COUNT = 35
DEFAULT_INTEGRITY_SLEEP = 0.25 DEFAULT_INTEGRITY_SLEEP = 0.25
REQUIRED_TEMPLATE_PARAMS = ["working_directory", "command", "exit_code_path"] REQUIRED_TEMPLATE_PARAMS = ["working_directory", "command"]
OPTIONAL_TEMPLATE_PARAMS: Dict[str, Any] = { OPTIONAL_TEMPLATE_PARAMS: Dict[str, Any] = {
"galaxy_lib": None, "galaxy_lib": None,
"galaxy_virtual_env": None, "galaxy_virtual_env": None,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment