Skip to content
Snippets Groups Projects
Commit 4c6dda36 authored by John Chilton's avatar John Chilton
Browse files

Progress toward structured inputs...

parent 8bd55111
No related branches found
No related tags found
No related merge requests found
......@@ -159,6 +159,10 @@ class ClientInput(object):
self.path = path
self.input_type = input_type
@property
def action_source(self):
return {"path": self.path}
class ClientOutputs(object):
""" Abstraction describing the output datasets EXPECTED by the Galaxy job
......
......@@ -212,43 +212,42 @@ class FileStager(object):
def __upload_input_files(self):
handled_inputs = set()
for client_input in self.client_inputs:
# TODO: use object identity to handle this.
path = client_input.path
if path in handled_inputs:
continue
if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH:
self.__upload_input_file(path)
self.__upload_input_file(client_input.action_source)
handled_inputs.add(path)
elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH:
self.__upload_input_extra_files(path)
self.__upload_input_extra_files(client_input.action_source)
handled_inputs.add(path)
elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH:
self.__upload_input_metadata_file(path)
self.__upload_input_metadata_file(client_input.action_source)
handled_inputs.add(path)
else:
raise NotImplementedError()
def __upload_input_file(self, input_file):
if self.__stage_input(input_file):
if exists(input_file):
self.transfer_tracker.handle_transfer_path(input_file, path_type.INPUT)
else:
message = "Pulsar: __upload_input_file called on empty or missing dataset." + \
" No such file: [%s]" % input_file
log.debug(message)
def __upload_input_file(self, input_action_source):
if self.__stage_input(input_action_source):
self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT)
def __upload_input_extra_files(self, files_path):
if self.__stage_input(files_path):
def __upload_input_extra_files(self, input_action_source):
if self.__stage_input(input_action_source):
# TODO: needs to happen else where if using remote object store staging
# but we don't have the action type yet.
files_path = input_action_source['path']
for extra_file_name in directory_files(files_path):
extra_file_path = join(files_path, extra_file_name)
remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path)))
self.transfer_tracker.handle_transfer_path(extra_file_path, path_type.INPUT, name=remote_name)
def __upload_input_metadata_file(self, path):
if self.__stage_input(path):
def __upload_input_metadata_file(self, input_action_source):
if self.__stage_input(input_action_source):
# Name must match what is generated in remote_input_path_rewrite in path_mapper.
remote_name = "metadata_%s" % basename(path)
self.transfer_tracker.handle_transfer_path(path, path_type.INPUT, name=remote_name)
remote_name = "metadata_%s" % basename(input_action_source['path'])
self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT, name=remote_name)
def __upload_working_directory_files(self):
# Task manager stages files into working directory, these need to be
......@@ -317,10 +316,13 @@ class FileStager(object):
"""
return self.job_inputs.command_line
def __stage_input(self, file_path):
def __stage_input(self, source):
if not self.rewrite_paths:
return True
# If we have disabled path rewriting, just assume everything needs to be transferred,
# else check to ensure the file is referenced before transferring it.
return (not self.rewrite_paths) or self.job_inputs.path_referenced(file_path)
return self.job_inputs.path_referenced(source['path'])
class JobInputs(object):
......@@ -448,6 +450,12 @@ class TransferTracker(object):
local_action = action.staging_action_local
if local_action:
path = source['path']
if not exists(path):
message = "Pulsar: __upload_input_file called on empty or missing dataset." + \
" No such file: [%s]" % path
log.debug(message)
return
response = self.client.put_file(path, type, name=name, contents=contents, action_type=action.action_type)
def get_path():
......@@ -490,7 +498,7 @@ class TransferTracker(object):
else:
path = source.get("path")
if path is not None and not exists(path):
message = "handle_transfer_path called on non-existent file - [%s]" % path
message = "__action_for_transfer called on non-existent file - [%s]" % path
log.warn(message)
raise Exception(message)
action = self.__action(source, type)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment