diff --git a/lwr/lwr_client/action_mapper.py b/lwr/lwr_client/action_mapper.py index 85b12b2898f727bda72cde7dff211cb915a5f359..a5096b45c681dff14bd9f5237ec5ec2fde604230 100644 --- a/lwr/lwr_client/action_mapper.py +++ b/lwr/lwr_client/action_mapper.py @@ -1,4 +1,6 @@ from json import load +from os import makedirs +from os.path import exists from os.path import abspath from os.path import dirname from os.path import join @@ -265,8 +267,12 @@ class RemoteCopyAction(BaseAction): galaxy.util.copy_to_path(open(self.path, "rb"), path) def write_from_path(self, lwr_path): + destination = self.path + parent_directory = dirname(destination) + if not exists(parent_directory): + makedirs(parent_directory) with open(lwr_path, "rb") as f: - galaxy.util.copy_to_path(f, self.path) + galaxy.util.copy_to_path(f, destination) class RemoteTransferAction(BaseAction): diff --git a/lwr/lwr_client/staging/up.py b/lwr/lwr_client/staging/up.py index 23f678e2b039cdb4ea179ec2af2f48a76d209469..251ee65c56612fa6affd4a86f4aa699878992ca9 100644 --- a/lwr/lwr_client/staging/up.py +++ b/lwr/lwr_client/staging/up.py @@ -381,7 +381,7 @@ class TransferTracker(object): def register_rewrite(self, local_path, remote_path, type, force=False): action = self.__action(local_path, type) - if action.action_type in ['transfer', 'copy'] or force: + if action.staging_needed or force: self.file_renames[local_path] = remote_path def rewrite_input_paths(self): diff --git a/lwr/lwr_client/transport/curl.py b/lwr/lwr_client/transport/curl.py index b96d92cd1c3b0748100ba97046d39b8445022904..99f6788f488698f100df0e6926a6c2697d5be0cb 100644 --- a/lwr/lwr_client/transport/curl.py +++ b/lwr/lwr_client/transport/curl.py @@ -41,7 +41,7 @@ class PycurlTransport(object): def post_file(url, path): c = _new_curl_object() c.setopt(c.URL, url.encode('ascii')) - c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, path))]) + c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, path.encode('ascii')))]) c.perform() diff --git a/lwr/managers/staging/postprocess.py b/lwr/managers/staging/postprocess.py index bb99d472d75d58dff1a161dbf142ac1d57ba23b9..67a26560348055201c9a3df9b8ebacbf192291db 100644 --- a/lwr/managers/staging/postprocess.py +++ b/lwr/managers/staging/postprocess.py @@ -1,3 +1,5 @@ +import os + from lwr.lwr_client import action_mapper from lwr.lwr_client import staging from lwr.lwr_client.staging import LwrOutputs @@ -44,6 +46,11 @@ class LwrServerOutputCollector(object): if action.staging_action_local: return # Galaxy (client) will collect output. + if not name: + # TODO: Would not work on Windows. Any use in allowing + # remote_transfer action for Windows? + name = os.path.basename(action.path) + lwr_path = self.job_directory.calculate_path(name, output_type) action.write_from_path(lwr_path) diff --git a/lwr/managers/stateful.py b/lwr/managers/stateful.py index dc98b972efde539cff7da026d8a5e370e73360a8..4fb6d9ade42c58c67f97e3383861d0d1f5ce785b 100644 --- a/lwr/managers/stateful.py +++ b/lwr/managers/stateful.py @@ -45,20 +45,21 @@ class StatefulManagerProxy(ManagerProxy): def handle_remote_staging(self, job_id, staging_config): job_directory = self._proxied_manager.job_directory(job_id) - - def do_preprocess(): - preprocess(job_directory, staging_config.get("setup", [])) - new_thread_for_manager(self, "preprocess", do_preprocess, daemon=False) - job_directory.store_metadata("staging_config", staging_config) def launch(self, job_id, *args, **kwargs): job_directory = self._proxied_manager.job_directory(job_id) - result = self._proxied_manager.launch(job_id, *args, **kwargs) - with job_directory.lock("status"): - job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) - self.active_jobs.activate_job(job_id) - return result + + def do_preprocess(): + # TODO: Handle preprocess or launch failures! + staging_config = job_directory.load_metadata("staging_config", {}) + preprocess(job_directory, staging_config.get("setup", [])) + self._proxied_manager.launch(job_id, *args, **kwargs) + with job_directory.lock("status"): + job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) + self.active_jobs.activate_job(job_id) + + new_thread_for_manager(self, "preprocess", do_preprocess, daemon=False) def get_status(self, job_id): """ Compute status used proxied manager and handle state transitions diff --git a/test/test_utils.py b/test/test_utils.py index 856145621e9c83e2a64b6d99140f432505574b2b..c3ab5ffd32dcede98c33e22521188717088b661b 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -2,7 +2,9 @@ from contextlib import contextmanager from stat import S_IXOTH import json from os import pardir, stat, chmod, access, X_OK, pathsep, environ +from os import makedirs from os.path import join, dirname, isfile, split +from os.path import exists from tempfile import mkdtemp from shutil import rmtree @@ -244,6 +246,9 @@ class JobFilesApp(object): path = params['path'] if not galaxy.util.in_directory(path, self.root_directory): assert False, "%s not in %s" % (path, self.root_directory) + parent_directory = dirname(path) + if not exists(parent_directory): + makedirs(parent_directory) galaxy.util.copy_to_path(params["file"].file, path) return webob.Response(body='')