Skip to content
Snippets Groups Projects
Commit 6f4328eb authored by John Chilton's avatar John Chilton
Browse files

Introduce the concept of a metadata directory...

... for metadata about ... data and jobs. So formal Galaxy metadata stuff, job instrument files, etc....

Handle transfer of metadata and working directory outputs correctly in Galaxy 16.04.
parent 72c929d0
No related branches found
No related tags found
No related merge requests found
...@@ -39,12 +39,17 @@ path_type = Bunch( ...@@ -39,12 +39,17 @@ path_type = Bunch(
CONFIG="config", CONFIG="config",
# Files from tool's tool_dir (for now just wrapper if available). # Files from tool's tool_dir (for now just wrapper if available).
TOOL="tool", TOOL="tool",
# Input work dir files - e.g. metadata files, task-split input files, etc.. # Input work dir files - e.g.task-split input file
WORKDIR="workdir", WORKDIR="workdir",
# Input work dir files - e.g. metadata files, etc..
METADATA="metadata",
# Galaxy output datasets in their final home. # Galaxy output datasets in their final home.
OUTPUT="output", OUTPUT="output",
# Galaxy from_work_dir output paths and other files (e.g. galaxy.json) # Galaxy from_work_dir output paths and other files (e.g. galaxy.json)
OUTPUT_WORKDIR="output_workdir", OUTPUT_WORKDIR="output_workdir",
# Meta job and data files (e.g. Galaxy metadata generation files and
# metric instrumentation files)
OUTPUT_METADATA="output_metadata",
# Other fixed tool parameter paths (likely coming from tool data, but not # Other fixed tool parameter paths (likely coming from tool data, but not
# nessecarily). Not sure this is the best name... # nessecarily). Not sure this is the best name...
UNSTRUCTURED="unstructured", UNSTRUCTURED="unstructured",
...@@ -56,8 +61,10 @@ ACTION_DEFAULT_PATH_TYPES = [ ...@@ -56,8 +61,10 @@ ACTION_DEFAULT_PATH_TYPES = [
path_type.CONFIG, path_type.CONFIG,
path_type.TOOL, path_type.TOOL,
path_type.WORKDIR, path_type.WORKDIR,
path_type.METADATA,
path_type.OUTPUT, path_type.OUTPUT,
path_type.OUTPUT_WORKDIR, path_type.OUTPUT_WORKDIR,
path_type.OUTPUT_METADATA,
] ]
ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED] ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED]
...@@ -206,7 +213,7 @@ class FileActionMapper(object): ...@@ -206,7 +213,7 @@ class FileActionMapper(object):
action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none"
if mapper: if mapper:
action_type = mapper.action_type action_type = mapper.action_type
if type in ["workdir", "output_workdir"] and action_type == "none": if type in ["workdir", "output_workdir", "output_metadata"] and action_type == "none":
# We are changing the working_directory relative to what # We are changing the working_directory relative to what
# Galaxy would use, these need to be copied over. # Galaxy would use, these need to be copied over.
action_type = "copy" action_type = "copy"
......
...@@ -204,8 +204,8 @@ class JobClient(BaseJobClient): ...@@ -204,8 +204,8 @@ class JobClient(BaseJobClient):
used if targetting an older Pulsar server that didn't return statuses used if targetting an older Pulsar server that didn't return statuses
allowing this to be inferred. allowing this to be inferred.
""" """
if output_type == 'output_workdir': if output_type in ['output_workdir', 'output_metadata']:
self._fetch_work_dir_output(name, working_directory, path, action_type=action_type) self._populate_output_path(name, path, action_type, output_type)
elif output_type == 'output': elif output_type == 'output':
self._fetch_output(path=path, name=name, action_type=action_type) self._fetch_output(path=path, name=name, action_type=action_type)
else: else:
...@@ -219,22 +219,14 @@ class JobClient(BaseJobClient): ...@@ -219,22 +219,14 @@ class JobClient(BaseJobClient):
# Extra files will send in the path. # Extra files will send in the path.
name = os.path.basename(path) name = os.path.basename(path)
self.__populate_output_path(name, path, action_type) self._populate_output_path(name, path, action_type, path_type.OUTPUT)
def _fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): def _populate_output_path(self, name, output_path, action_type, path_type):
ensure_directory(output_path) ensure_directory(output_path)
if action_type == 'transfer': if action_type == 'transfer':
self.__raw_download_output(name, self.job_id, path_type.OUTPUT_WORKDIR, output_path) self.__raw_download_output(name, self.job_id, path_type, output_path)
else: # Even if action is none - Pulsar has a different work_dir so this needs to be copied.
pulsar_path = self._output_path(name, self.job_id, path_type.OUTPUT_WORKDIR)['path']
copy(pulsar_path, output_path)
def __populate_output_path(self, name, output_path, action_type):
ensure_directory(output_path)
if action_type == 'transfer':
self.__raw_download_output(name, self.job_id, path_type.OUTPUT, output_path)
elif action_type == 'copy': elif action_type == 'copy':
pulsar_path = self._output_path(name, self.job_id, path_type.OUTPUT)['path'] pulsar_path = self._output_path(name, self.job_id, path_type)['path']
copy(pulsar_path, output_path) copy(pulsar_path, output_path)
@parseJson() @parseJson()
......
...@@ -17,8 +17,10 @@ TYPES_TO_METHOD = dict( ...@@ -17,8 +17,10 @@ TYPES_TO_METHOD = dict(
config="configs_directory", config="configs_directory",
tool="tool_files_directory", tool="tool_files_directory",
workdir="working_directory", workdir="working_directory",
metadata="metadata_directory",
output="outputs_directory", output="outputs_directory",
output_workdir="working_directory", output_workdir="working_directory",
output_metadata="metadata_directory",
) )
...@@ -33,6 +35,9 @@ class RemoteJobDirectory(object): ...@@ -33,6 +35,9 @@ class RemoteJobDirectory(object):
remote_id remote_id
) )
def metadata_directory(self):
return self._sub_dir('metadata')
def working_directory(self): def working_directory(self):
return self._sub_dir('working') return self._sub_dir('working')
...@@ -72,7 +77,7 @@ class RemoteJobDirectory(object): ...@@ -72,7 +77,7 @@ class RemoteJobDirectory(object):
# Obviously this client won't be legacy because this is in the # Obviously this client won't be legacy because this is in the
# client module, but this code is reused on server which may # client module, but this code is reused on server which may
# serve legacy clients. # serve legacy clients.
allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir'] allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir', 'metadata', 'output_metadata']
directory_function = getattr(self, TYPES_TO_METHOD.get(file_type, None), None) directory_function = getattr(self, TYPES_TO_METHOD.get(file_type, None), None)
if not directory_function: if not directory_function:
raise Exception("Unknown file_type specified %s" % file_type) raise Exception("Unknown file_type specified %s" % file_type)
......
...@@ -32,6 +32,7 @@ class PathMapper(object): ...@@ -32,6 +32,7 @@ class PathMapper(object):
self.input_directory = remote_job_config["inputs_directory"] self.input_directory = remote_job_config["inputs_directory"]
self.output_directory = remote_job_config["outputs_directory"] self.output_directory = remote_job_config["outputs_directory"]
self.working_directory = remote_job_config["working_directory"] self.working_directory = remote_job_config["working_directory"]
self.metadata_directory = remote_job_config.get("working_directory", None)
self.unstructured_files_directory = remote_job_config["unstructured_files_directory"] self.unstructured_files_directory = remote_job_config["unstructured_files_directory"]
self.config_directory = remote_job_config["configs_directory"] self.config_directory = remote_job_config["configs_directory"]
separator = remote_job_config["system_properties"]["separator"] separator = remote_job_config["system_properties"]["separator"]
...@@ -87,6 +88,8 @@ class PathMapper(object): ...@@ -87,6 +88,8 @@ class PathMapper(object):
def __remote_directory(self, dataset_path_type): def __remote_directory(self, dataset_path_type):
if dataset_path_type in [path_type.OUTPUT]: if dataset_path_type in [path_type.OUTPUT]:
return self.output_directory return self.output_directory
elif dataset_path_type in [path_type.METADATA, path_type.OUTPUT_METADATA]:
return self.metadata_directory
elif dataset_path_type in [path_type.WORKDIR, path_type.OUTPUT_WORKDIR]: elif dataset_path_type in [path_type.WORKDIR, path_type.OUTPUT_WORKDIR]:
return self.working_directory return self.working_directory
elif dataset_path_type in [path_type.INPUT]: elif dataset_path_type in [path_type.INPUT]:
......
...@@ -31,7 +31,9 @@ class ClientJobDescription(object): ...@@ -31,7 +31,9 @@ class ClientJobDescription(object):
Directory containing tool to execute (if a wrapper is used, it will Directory containing tool to execute (if a wrapper is used, it will
be transferred to remote server). be transferred to remote server).
working_directory : str working_directory : str
Local path created by Galaxy for running this job. Local path created by Galaxy for running this job (job_wrapper.tool_working_directory).
metadata_directory : str
Local path created by Galaxy for running this job (job_wrapper.working_directory).
dependencies_description : list dependencies_description : list
galaxy.tools.deps.dependencies.DependencyDescription object describing galaxy.tools.deps.dependencies.DependencyDescription object describing
tool dependency context for remote depenency resolution. tool dependency context for remote depenency resolution.
...@@ -57,7 +59,8 @@ class ClientJobDescription(object): ...@@ -57,7 +59,8 @@ class ClientJobDescription(object):
config_files=[], config_files=[],
input_files=[], input_files=[],
client_outputs=None, client_outputs=None,
working_directory=None, # More sensible default? working_directory=None,
metadata_directory=None,
dependencies_description=None, dependencies_description=None,
env=[], env=[],
arbitrary_files=None, arbitrary_files=None,
...@@ -69,6 +72,7 @@ class ClientJobDescription(object): ...@@ -69,6 +72,7 @@ class ClientJobDescription(object):
self.input_files = input_files self.input_files = input_files
self.client_outputs = client_outputs or ClientOutputs() self.client_outputs = client_outputs or ClientOutputs()
self.working_directory = working_directory self.working_directory = working_directory
self.metadata_directory = metadata_directory
self.dependencies_description = dependencies_description self.dependencies_description = dependencies_description
self.env = env self.env = env
self.rewrite_paths = rewrite_paths self.rewrite_paths = rewrite_paths
...@@ -103,9 +107,11 @@ class ClientOutputs(object): ...@@ -103,9 +107,11 @@ class ClientOutputs(object):
output_files=[], output_files=[],
work_dir_outputs=None, work_dir_outputs=None,
version_file=None, version_file=None,
dynamic_outputs=None dynamic_outputs=None,
metadata_directory=None,
): ):
self.working_directory = working_directory self.working_directory = working_directory
self.metadata_directory = metadata_directory
self.work_dir_outputs = work_dir_outputs or [] self.work_dir_outputs = work_dir_outputs or []
self.output_files = output_files or [] self.output_files = output_files or []
self.version_file = version_file self.version_file = version_file
...@@ -115,6 +121,7 @@ class ClientOutputs(object): ...@@ -115,6 +121,7 @@ class ClientOutputs(object):
def to_dict(self): def to_dict(self):
return dict( return dict(
working_directory=self.working_directory, working_directory=self.working_directory,
metadata_directory=self.metadata_directory,
work_dir_outputs=self.work_dir_outputs, work_dir_outputs=self.work_dir_outputs,
output_files=self.output_files, output_files=self.output_files,
version_file=self.version_file, version_file=self.version_file,
...@@ -125,6 +132,7 @@ class ClientOutputs(object): ...@@ -125,6 +132,7 @@ class ClientOutputs(object):
def from_dict(config_dict): def from_dict(config_dict):
return ClientOutputs( return ClientOutputs(
working_directory=config_dict.get('working_directory'), working_directory=config_dict.get('working_directory'),
metadata_directory=config_dict.get('metadata_directory'),
work_dir_outputs=config_dict.get('work_dir_outputs'), work_dir_outputs=config_dict.get('work_dir_outputs'),
output_files=config_dict.get('output_files'), output_files=config_dict.get('output_files'),
version_file=config_dict.get('version_file'), version_file=config_dict.get('version_file'),
...@@ -139,9 +147,12 @@ class PulsarOutputs(object): ...@@ -139,9 +147,12 @@ class PulsarOutputs(object):
""" Abstraction describing the output files PRODUCED by the remote Pulsar """ Abstraction describing the output files PRODUCED by the remote Pulsar
server. """ server. """
def __init__(self, working_directory_contents, output_directory_contents, remote_separator=sep): def __init__(
self, working_directory_contents, output_directory_contents, metadata_directory_contents, remote_separator=sep
):
self.working_directory_contents = working_directory_contents self.working_directory_contents = working_directory_contents
self.output_directory_contents = output_directory_contents self.output_directory_contents = output_directory_contents
self.metadata_directory_contents = metadata_directory_contents
self.path_helper = PathHelper(remote_separator) self.path_helper = PathHelper(remote_separator)
@staticmethod @staticmethod
...@@ -150,6 +161,7 @@ class PulsarOutputs(object): ...@@ -150,6 +161,7 @@ class PulsarOutputs(object):
# by the Pulsar - older Pulsar instances will not set these in complete response. # by the Pulsar - older Pulsar instances will not set these in complete response.
working_directory_contents = complete_response.get("working_directory_contents") working_directory_contents = complete_response.get("working_directory_contents")
output_directory_contents = complete_response.get("outputs_directory_contents") output_directory_contents = complete_response.get("outputs_directory_contents")
metadata_directory_contents = complete_response.get("metadata_directory_contents")
# Older (pre-2014) Pulsar servers will not include separator in response, # Older (pre-2014) Pulsar servers will not include separator in response,
# so this should only be used when reasoning about outputs in # so this should only be used when reasoning about outputs in
# subdirectories (which was not previously supported prior to that). # subdirectories (which was not previously supported prior to that).
...@@ -157,6 +169,7 @@ class PulsarOutputs(object): ...@@ -157,6 +169,7 @@ class PulsarOutputs(object):
return PulsarOutputs( return PulsarOutputs(
working_directory_contents, working_directory_contents,
output_directory_contents, output_directory_contents,
metadata_directory_contents,
remote_separator remote_separator
) )
......
from os.path import join from os.path import join
from os.path import relpath from os.path import relpath
from re import compile
from contextlib import contextmanager from contextlib import contextmanager
from ..staging import COMMAND_VERSION_FILENAME from ..staging import COMMAND_VERSION_FILENAME
...@@ -10,12 +9,6 @@ from ..action_mapper import FileActionMapper ...@@ -10,12 +9,6 @@ from ..action_mapper import FileActionMapper
from logging import getLogger from logging import getLogger
log = getLogger(__name__) log = getLogger(__name__)
# All output files marked with from_work_dir attributes will copied or downloaded
# this pattern picks up attiditional files to copy back - such as those
# associated with multiple outputs and metadata configuration. Set to .* to just
# copy everything
COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*|dataset_\d+\.dat|__instrument_.*|dataset_\d+_files.+")
def finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs): def finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs):
""" Responsible for downloading results from remote server and cleaning up """ Responsible for downloading results from remote server and cleaning up
...@@ -63,12 +56,14 @@ class ResultsCollector(object): ...@@ -63,12 +56,14 @@ class ResultsCollector(object):
self.exception_tracker = DownloadExceptionTracker() self.exception_tracker = DownloadExceptionTracker()
self.output_files = client_outputs.output_files self.output_files = client_outputs.output_files
self.working_directory_contents = pulsar_outputs.working_directory_contents or [] self.working_directory_contents = pulsar_outputs.working_directory_contents or []
self.metadata_directory_contents = pulsar_outputs.metadata_directory_contents or []
def collect(self): def collect(self):
self.__collect_working_directory_outputs() self.__collect_working_directory_outputs()
self.__collect_outputs() self.__collect_outputs()
self.__collect_version_file() self.__collect_version_file()
self.__collect_other_working_directory_files() self.__collect_other_working_directory_files()
self.__collect_metadata_directory_files()
return self.exception_tracker.collection_failure_exceptions return self.exception_tracker.collection_failure_exceptions
def __collect_working_directory_outputs(self): def __collect_working_directory_outputs(self):
...@@ -105,15 +100,31 @@ class ResultsCollector(object): ...@@ -105,15 +100,31 @@ class ResultsCollector(object):
self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME) self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME)
def __collect_other_working_directory_files(self): def __collect_other_working_directory_files(self):
working_directory = self.client_outputs.working_directory self.__collect_directory_files(
self.client_outputs.working_directory,
self.working_directory_contents,
'output_workdir',
)
def __collect_metadata_directory_files(self):
self.__collect_directory_files(
self.client_outputs.metadata_directory,
self.metadata_directory_contents,
'output_metadata',
)
def __collect_directory_files(self, directory, contents, output_type):
if directory is None: # e.g. output_metadata_directory
return
# Fetch remaining working directory outputs of interest. # Fetch remaining working directory outputs of interest.
for name in self.working_directory_contents: for name in contents:
if name in self.downloaded_working_directory_files: if name in self.downloaded_working_directory_files:
continue continue
if self.client_outputs.dynamic_match(name): if self.client_outputs.dynamic_match(name):
log.debug("collecting dynamic output %s" % name) log.debug("collecting dynamic %s file %s" % (output_type, name))
output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name)) output_file = join(directory, self.pulsar_outputs.path_helper.local_name(name))
if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name): if self._attempt_collect_output(output_type=output_type, path=output_file, name=name):
self.downloaded_working_directory_files.append(name) self.downloaded_working_directory_files.append(name)
def _attempt_collect_output(self, output_type, path, name=None): def _attempt_collect_output(self, output_type, path, name=None):
......
...@@ -74,6 +74,7 @@ class FileStager(object): ...@@ -74,6 +74,7 @@ class FileStager(object):
self.tool_version = None self.tool_version = None
self.tool_dir = None self.tool_dir = None
self.working_directory = client_job_description.working_directory self.working_directory = client_job_description.working_directory
self.metadata_directory = client_job_description.metadata_directory
self.version_file = client_job_description.version_file self.version_file = client_job_description.version_file
self.arbitrary_files = client_job_description.arbitrary_files self.arbitrary_files = client_job_description.arbitrary_files
self.rewrite_paths = client_job_description.rewrite_paths self.rewrite_paths = client_job_description.rewrite_paths
...@@ -95,6 +96,7 @@ class FileStager(object): ...@@ -95,6 +96,7 @@ class FileStager(object):
self.__upload_tool_files() self.__upload_tool_files()
self.__upload_input_files() self.__upload_input_files()
self.__upload_working_directory_files() self.__upload_working_directory_files()
self.__upload_metadata_directory_files()
self.__upload_arbitrary_files() self.__upload_arbitrary_files()
if self.rewrite_paths: if self.rewrite_paths:
...@@ -190,12 +192,24 @@ class FileStager(object): ...@@ -190,12 +192,24 @@ class FileStager(object):
path = join(self.working_directory, working_directory_file) path = join(self.working_directory, working_directory_file)
self.transfer_tracker.handle_transfer(path, path_type.WORKDIR) self.transfer_tracker.handle_transfer(path, path_type.WORKDIR)
def __upload_metadata_directory_files(self):
metadata_directory_files = self.__metadata_directory_files()
for metadata_directory_file in metadata_directory_files:
path = join(self.metadata_directory, metadata_directory_file)
self.transfer_tracker.handle_transfer(path, path_type.METADATA)
def __working_directory_files(self): def __working_directory_files(self):
if self.working_directory and exists(self.working_directory): if self.working_directory and exists(self.working_directory):
return listdir(self.working_directory) return listdir(self.working_directory)
else: else:
return [] return []
def __metadata_directory_files(self):
if self.metadata_directory and exists(self.metadata_directory):
return listdir(self.metadata_directory)
else:
return []
def __initialize_version_file_rename(self): def __initialize_version_file_rename(self):
version_file = self.version_file version_file = self.version_file
if version_file: if version_file:
......
...@@ -33,13 +33,24 @@ import sys ...@@ -33,13 +33,24 @@ import sys
from os import getenv from os import getenv
from os import makedirs from os import makedirs
from os import listdir from os import listdir
from os.path import exists
from os.path import join from os.path import join
from os.path import basename from os.path import basename
from os.path import dirname from os.path import dirname
def assert_path_contents(path, expected_contents):
if not exists(path):
message = "Expected path [%s] to exist, but it doesn't."
raise AssertionError(message % path)
with open(path, 'r') as f:
contents = f.read()
if contents != expected_contents:
message = "Expected path [%s] to be have contents [%s], but contains [%s]."
raise AssertionError(message % (path, expected_contents, contents))
config_input = open(sys.argv[1], 'r') config_input = open(sys.argv[1], 'r')
input_input = open(sys.argv[2], 'r')
input_extra = open(sys.argv[8], 'r')
output = open(sys.argv[3], 'w') output = open(sys.argv[3], 'w')
output2 = open(sys.argv[5], 'w') output2 = open(sys.argv[5], 'w')
output2_contents = sys.argv[6] output2_contents = sys.argv[6]
...@@ -49,9 +60,12 @@ index_path = sys.argv[10] ...@@ -49,9 +60,12 @@ index_path = sys.argv[10]
assert len(listdir(dirname(index_path))) == 2 assert len(listdir(dirname(index_path))) == 2
assert len(listdir(join(dirname(dirname(index_path)), "seq"))) == 1 assert len(listdir(join(dirname(dirname(index_path)), "seq"))) == 1
output4_index_path = open(sys.argv[11], 'w') output4_index_path = open(sys.argv[11], 'w')
metadata_dir = dirname(sys.argv[13])
output_metadata_path = join(metadata_dir, "metadata_output")
try: try:
assert input_input.read() == "Hello world input!!@!" assert_path_contents(sys.argv[2], "Hello world input!!@!")
assert input_extra.read() == "INPUT_EXTRA_CONTENTS" assert_path_contents(sys.argv[8], "INPUT_EXTRA_CONTENTS")
assert_path_contents(sys.argv[13], "meta input")
contents = config_input.read(1024) contents = config_input.read(1024)
output.write(contents) output.write(contents)
open("workdir_output", "w").write("WORK DIR OUTPUT") open("workdir_output", "w").write("WORK DIR OUTPUT")
...@@ -59,6 +73,7 @@ try: ...@@ -59,6 +73,7 @@ try:
open("rewrite_action_test", "w").write(sys.argv[12]) open("rewrite_action_test", "w").write(sys.argv[12])
output2.write(output2_contents) output2.write(output2_contents)
with open("galaxy.json", "w") as f: f.write("GALAXY_JSON") with open("galaxy.json", "w") as f: f.write("GALAXY_JSON")
with open(output_metadata_path, "w") as f: f.write("meta output")
output3.write(getenv("MOO", "moo_default")) output3.write(getenv("MOO", "moo_default"))
output1_extras_path = "%s_files" % sys.argv[3][0:-len(".dat")] output1_extras_path = "%s_files" % sys.argv[3][0:-len(".dat")]
makedirs(output1_extras_path) makedirs(output1_extras_path)
...@@ -111,9 +126,17 @@ def run(options): ...@@ -111,9 +126,17 @@ def run(options):
temp_index_dir_sibbling = os.path.join(temp_directory, "idx", "seq") temp_index_dir_sibbling = os.path.join(temp_directory, "idx", "seq")
temp_shared_dir = os.path.join(temp_directory, "shared", "test1") temp_shared_dir = os.path.join(temp_directory, "shared", "test1")
temp_work_dir = os.path.join(temp_directory, "w") temp_work_dir = os.path.join(temp_directory, "w")
temp_metadata_dir = os.path.join(temp_directory, "m")
temp_tool_dir = os.path.join(temp_directory, "t") temp_tool_dir = os.path.join(temp_directory, "t")
__makedirs([temp_tool_dir, temp_work_dir, temp_index_dir, temp_index_dir_sibbling, temp_shared_dir]) __makedirs([
temp_tool_dir,
temp_work_dir,
temp_index_dir,
temp_index_dir_sibbling,
temp_shared_dir,
temp_metadata_dir,
])
temp_input_path = os.path.join(temp_directory, "dataset_0.dat") temp_input_path = os.path.join(temp_directory, "dataset_0.dat")
temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra") temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra")
...@@ -128,6 +151,7 @@ def run(options): ...@@ -128,6 +151,7 @@ def run(options):
temp_version_output_path = os.path.join(temp_directory, "GALAXY_VERSION_1234") temp_version_output_path = os.path.join(temp_directory, "GALAXY_VERSION_1234")
temp_output_workdir_destination = os.path.join(temp_directory, "dataset_77.dat") temp_output_workdir_destination = os.path.join(temp_directory, "dataset_77.dat")
temp_output_workdir = os.path.join(temp_work_dir, "env_test") temp_output_workdir = os.path.join(temp_work_dir, "env_test")
temp_metadata_path = os.path.join(temp_metadata_dir, "metadata_test123")
temp_output_workdir_destination2 = os.path.join(temp_directory, "dataset_78.dat") temp_output_workdir_destination2 = os.path.join(temp_directory, "dataset_78.dat")
temp_output_workdir2 = os.path.join(temp_work_dir, "rewrite_action_test") temp_output_workdir2 = os.path.join(temp_work_dir, "rewrite_action_test")
...@@ -135,6 +159,7 @@ def run(options): ...@@ -135,6 +159,7 @@ def run(options):
__write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_input_path, b"Hello world input!!@!")
__write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS") __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS")
__write_to_file(temp_config_path, EXPECTED_OUTPUT) __write_to_file(temp_config_path, EXPECTED_OUTPUT)
__write_to_file(temp_metadata_path, "meta input")
__write_to_file(temp_tool_path, TEST_SCRIPT) __write_to_file(temp_tool_path, TEST_SCRIPT)
__write_to_file(temp_index_path, b"AGTC") __write_to_file(temp_index_path, b"AGTC")
# Implicit files that should also get transferred since depth > 0 # Implicit files that should also get transferred since depth > 0
...@@ -159,9 +184,10 @@ def run(options): ...@@ -159,9 +184,10 @@ def run(options):
temp_index_path, temp_index_path,
temp_output4_path, temp_output4_path,
temp_shared_dir, temp_shared_dir,
temp_metadata_path,
) )
assert os.path.exists(temp_index_path) assert os.path.exists(temp_index_path)
command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
config_files = [temp_config_path] config_files = [temp_config_path]
input_files = [temp_input_path, empty_input] input_files = [temp_input_path, empty_input]
output_files = [ output_files = [
...@@ -176,6 +202,7 @@ def run(options): ...@@ -176,6 +202,7 @@ def run(options):
waiter = Waiter(client, client_manager) waiter = Waiter(client, client_manager)
client_outputs = ClientOutputs( client_outputs = ClientOutputs(
working_directory=temp_work_dir, working_directory=temp_work_dir,
metadata_directory=temp_metadata_dir,
work_dir_outputs=[ work_dir_outputs=[
(temp_output_workdir, temp_output_workdir_destination), (temp_output_workdir, temp_output_workdir_destination),
(temp_output_workdir2, temp_output_workdir_destination2), (temp_output_workdir2, temp_output_workdir_destination2),
...@@ -191,6 +218,7 @@ def run(options): ...@@ -191,6 +218,7 @@ def run(options):
input_files=input_files, input_files=input_files,
client_outputs=client_outputs, client_outputs=client_outputs,
working_directory=temp_work_dir, working_directory=temp_work_dir,
metadata_directory=temp_metadata_dir,
**__extra_job_description_kwargs(options) **__extra_job_description_kwargs(options)
) )
submit_job(client, job_description) submit_job(client, job_description)
...@@ -201,6 +229,7 @@ def run(options): ...@@ -201,6 +229,7 @@ def run(options):
__assert_contents(temp_output2_path, cmd_text, result_status) __assert_contents(temp_output2_path, cmd_text, result_status)
__assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON", result_status) __assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON", result_status)
__assert_contents(os.path.join(temp_directory, "dataset_1_files", "extra"), b"EXTRA_OUTPUT_CONTENTS", result_status) __assert_contents(os.path.join(temp_directory, "dataset_1_files", "extra"), b"EXTRA_OUTPUT_CONTENTS", result_status)
__assert_contents(os.path.join(temp_metadata_dir, "metadata_output"), b"meta output", result_status)
if getattr(options, "test_rewrite_action", False): if getattr(options, "test_rewrite_action", False):
__assert_contents(temp_output_workdir_destination2, os.path.join(temp_directory, "shared2", "test1"), result_status) __assert_contents(temp_output_workdir_destination2, os.path.join(temp_directory, "shared2", "test1"), result_status)
if job_description.env: if job_description.env:
......
...@@ -40,6 +40,7 @@ def __job_complete_dict(complete_status, manager, job_id): ...@@ -40,6 +40,7 @@ def __job_complete_dict(complete_status, manager, job_id):
stderr=stderr_contents, stderr=stderr_contents,
working_directory=job_directory.working_directory(), working_directory=job_directory.working_directory(),
working_directory_contents=job_directory.working_directory_contents(), working_directory_contents=job_directory.working_directory_contents(),
metadata_directory_contents=job_directory.metadata_directory_contents(),
outputs_directory_contents=job_directory.outputs_directory_contents(), outputs_directory_contents=job_directory.outputs_directory_contents(),
system_properties=manager.system_properties(), system_properties=manager.system_properties(),
) )
......
...@@ -29,6 +29,7 @@ from pulsar.client.job_directory import verify_is_in_directory ...@@ -29,6 +29,7 @@ from pulsar.client.job_directory import verify_is_in_directory
JOB_DIRECTORY_INPUTS = "inputs" JOB_DIRECTORY_INPUTS = "inputs"
JOB_DIRECTORY_OUTPUTS = "outputs" JOB_DIRECTORY_OUTPUTS = "outputs"
JOB_DIRECTORY_WORKING = "working" JOB_DIRECTORY_WORKING = "working"
JOB_DIRECTORY_METADATA = "metadata"
JOB_DIRECTORY_CONFIGS = "configs" JOB_DIRECTORY_CONFIGS = "configs"
JOB_DIRECTORY_TOOL_FILES = "tool_files" JOB_DIRECTORY_TOOL_FILES = "tool_files"
...@@ -145,7 +146,8 @@ class BaseManager(ManagerInterface): ...@@ -145,7 +146,8 @@ class BaseManager(ManagerInterface):
JOB_DIRECTORY_WORKING, JOB_DIRECTORY_WORKING,
JOB_DIRECTORY_OUTPUTS, JOB_DIRECTORY_OUTPUTS,
JOB_DIRECTORY_CONFIGS, JOB_DIRECTORY_CONFIGS,
JOB_DIRECTORY_TOOL_FILES]: JOB_DIRECTORY_TOOL_FILES,
JOB_DIRECTORY_METADATA]:
job_directory.make_directory(directory) job_directory.make_directory(directory)
return job_directory return job_directory
...@@ -283,6 +285,10 @@ class JobDirectory(RemoteJobDirectory): ...@@ -283,6 +285,10 @@ class JobDirectory(RemoteJobDirectory):
outputs_directory = self.outputs_directory() outputs_directory = self.outputs_directory()
return self.__directory_contents(outputs_directory) return self.__directory_contents(outputs_directory)
def metadata_directory_contents(self):
metadata_directory = self.metadata_directory()
return self.__directory_contents(metadata_directory)
def __directory_contents(self, directory): def __directory_contents(self, directory):
contents = [] contents = []
for path, _, files in walk(directory): for path, _, files in walk(directory):
......
...@@ -30,6 +30,7 @@ def __collect_outputs(job_directory, staging_config, action_executor): ...@@ -30,6 +30,7 @@ def __collect_outputs(job_directory, staging_config, action_executor):
pulsar_outputs = __pulsar_outputs(job_directory) pulsar_outputs = __pulsar_outputs(job_directory)
output_collector = PulsarServerOutputCollector(job_directory, action_executor) output_collector = PulsarServerOutputCollector(job_directory, action_executor)
results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs) results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
collection_failure_exceptions = results_collector.collect() collection_failure_exceptions = results_collector.collect()
if collection_failure_exceptions: if collection_failure_exceptions:
log.warn("Failures collecting results %s" % collection_failure_exceptions) log.warn("Failures collecting results %s" % collection_failure_exceptions)
...@@ -62,9 +63,11 @@ class PulsarServerOutputCollector(object): ...@@ -62,9 +63,11 @@ class PulsarServerOutputCollector(object):
def __pulsar_outputs(job_directory): def __pulsar_outputs(job_directory):
working_directory_contents = job_directory.working_directory_contents() working_directory_contents = job_directory.working_directory_contents()
output_directory_contents = job_directory.outputs_directory_contents() output_directory_contents = job_directory.outputs_directory_contents()
metadata_directory_contents = job_directory.metadata_directory_contents()
return PulsarOutputs( return PulsarOutputs(
working_directory_contents, working_directory_contents,
output_directory_contents, output_directory_contents,
metadata_directory_contents,
) )
__all__ = ['postprocess'] __all__ = ['postprocess']
...@@ -89,14 +89,14 @@ def cancel(manager, job_id): ...@@ -89,14 +89,14 @@ def cancel(manager, job_id):
@PulsarController(path="/jobs/{job_id}/files", method="POST", response_type='json') @PulsarController(path="/jobs/{job_id}/files", method="POST", response_type='json')
def upload_file(manager, type, file_cache, job_id, name, body, cache_token=None): def upload_file(manager, type, file_cache, job_id, name, body, cache_token=None):
# Input type should be one of input, config, workdir, tool, or unstructured (see action_mapper.path_type) # Input type should be one of input, config, workdir, metadata, tool, or unstructured (see action_mapper.path_type)
path = manager.job_directory(job_id).calculate_path(name, type) path = manager.job_directory(job_id).calculate_path(name, type)
return _handle_upload(file_cache, path, body, cache_token=cache_token) return _handle_upload(file_cache, path, body, cache_token=cache_token)
@PulsarController(path="/jobs/{job_id}/files/path", method="GET", response_type='json') @PulsarController(path="/jobs/{job_id}/files/path", method="GET", response_type='json')
def path(manager, type, job_id, name): def path(manager, type, job_id, name):
if type in [path_type.OUTPUT, path_type.OUTPUT_WORKDIR]: if type in [path_type.OUTPUT, path_type.OUTPUT_WORKDIR, path_type.OUTPUT_METADATA]:
path = _output_path(manager, job_id, name, type) path = _output_path(manager, job_id, name, type)
else: else:
path = manager.job_directory(job_id).calculate_path(name, type) path = manager.job_directory(job_id).calculate_path(name, type)
...@@ -121,6 +121,8 @@ def _output_path(manager, job_id, name, output_type): ...@@ -121,6 +121,8 @@ def _output_path(manager, job_id, name, output_type):
directory = manager.job_directory(job_id).outputs_directory() directory = manager.job_directory(job_id).outputs_directory()
if output_type == path_type.OUTPUT_WORKDIR: # action_mapper.path_type.OUTPUT_WORKDIR if output_type == path_type.OUTPUT_WORKDIR: # action_mapper.path_type.OUTPUT_WORKDIR
directory = manager.job_directory(job_id).working_directory() directory = manager.job_directory(job_id).working_directory()
elif output_type == path_type.OUTPUT_METADATA:
directory = manager.job_directory(job_id).metadata_directory()
path = os.path.join(directory, name) path = os.path.join(directory, name)
verify_is_in_directory(path, directory) verify_is_in_directory(path, directory)
return path return path
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment