From 7d92304acb45aa65c42f3d85ed1940ca2f327ce1 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Tue, 18 Feb 2014 16:23:16 -0600 Subject: [PATCH] Add new remote action "remote_transfer". This action will transfer files via URL get/post (using pycurl currently). Added test case (including dummy app to mock a potential Galaxy files API) to test this functionality. Still more work to do - there is no way to populate such urls during setup process and Galaxy still doesn't have a job files API in order to access these files. --- lwr/framework.py | 16 ++++-- lwr/lwr_client/action_mapper.py | 55 ++++++++++++++++-- lwr/lwr_client/transport/__init__.py | 7 ++- lwr/lwr_client/transport/curl.py | 42 ++++++++++---- lwr/managers/staging/postprocess.py | 7 +-- test/test_utils.py | 6 ++ test/transfer_action_test.py | 83 ++++++++++++++++++++++++++++ 7 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 test/transfer_action_test.py diff --git a/lwr/framework.py b/lwr/framework.py index 78aeb81e..6932b55b 100644 --- a/lwr/framework.py +++ b/lwr/framework.py @@ -124,12 +124,7 @@ class Controller(object): def __build_response(self, result): if self.response_type == 'file': - resp = Response() - path = result - if exists(path): - resp.app_iter = FileIterator(path) - else: - raise exc.HTTPNotFound("No file found with path %s." % path) + resp = file_response(result) else: resp = Response(body=self.body(result)) return resp @@ -164,6 +159,15 @@ class Controller(object): pass +def file_response(path): + resp = Response() + if exists(path): + resp.app_iter = FileIterator(path) + else: + raise exc.HTTPNotFound("No file found with path %s." % path) + return resp + + class FileIterator(Iterator): def __init__(self, path): diff --git a/lwr/lwr_client/action_mapper.py b/lwr/lwr_client/action_mapper.py index 68255bbb..ad97bce1 100644 --- a/lwr/lwr_client/action_mapper.py +++ b/lwr/lwr_client/action_mapper.py @@ -11,6 +11,9 @@ import galaxy.util from galaxy.util.bunch import Bunch from .util import directory_files from .util import unique_path_prefix +from .transport import get_file +from .transport import post_file + DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? DEFAULT_PATH_MAPPER_TYPE = 'prefix' @@ -233,7 +236,7 @@ class RemoteCopyAction(BaseAction): staging = STAGING_ACTION_REMOTE def to_dict(self): - return dict(path=self.path, action_type=RemoteCopyAction.action_type) + return dict(path=self.path, action_type=self.action_type) @classmethod def from_dict(cls, action_dict): @@ -242,6 +245,37 @@ class RemoteCopyAction(BaseAction): def write_to_path(self, path): galaxy.util.copy_to_path(open(self.path, "rb"), path) + def write_from_path(self, lwr_path): + with open(lwr_path, "rb") as f: + galaxy.util.copy_to_path(f, self.path) + + +class RemoteTransferAction(BaseAction): + """ This action indicates the LWR server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the LWR server instead of on + the client. + """ + action_type = "remote_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, path, file_lister=None, url=None): + super(RemoteTransferAction, self).__init__(path, file_lister=file_lister) + self.url = url + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type, url=self.url) + + @classmethod + def from_dict(cls, action_dict): + return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"]) + + def write_to_path(self, path): + get_file(self.url, path) + + def write_from_path(self, lwr_path): + post_file(self.url, lwr_path) + class MessageAction(object): """ Sort of pseudo action describing "files" store in memory and @@ -275,7 +309,8 @@ class MessageAction(object): def write_to_path(self, path): open(path, "w").write(self.contents) -DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, MessageAction] + +DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction] def from_dict(action_dict): @@ -402,8 +437,20 @@ class FileLister(object): DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) -ACTION_CLASSES = [NoneAction, TransferAction, CopyAction, RemoteCopyAction] +ACTION_CLASSES = [ + NoneAction, + TransferAction, + CopyAction, + RemoteCopyAction, + RemoteTransferAction +] actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) -__all__ = [FileActionMapper, path_type, from_dict, MessageAction] +__all__ = [ + FileActionMapper, + path_type, + from_dict, + MessageAction, + RemoteTransferAction, # For testing +] diff --git a/lwr/lwr_client/transport/__init__.py b/lwr/lwr_client/transport/__init__.py index 9071d7b7..5e0a62ec 100644 --- a/lwr/lwr_client/transport/__init__.py +++ b/lwr/lwr_client/transport/__init__.py @@ -23,4 +23,9 @@ def __get_transport_type(transport_type, os_module): transport_type = 'curl' return transport_type -__all__ = [get_transport] +# TODO: Provide urllib implementation if these unavailable, +# also explore a requests+poster option. +from .curl import get_file +from .curl import post_file + +__all__ = [get_transport, get_file, post_file] diff --git a/lwr/lwr_client/transport/curl.py b/lwr/lwr_client/transport/curl.py index 72ee1d92..b96d92cd 100644 --- a/lwr/lwr_client/transport/curl.py +++ b/lwr/lwr_client/transport/curl.py @@ -4,7 +4,7 @@ except ImportError: from io import StringIO try: from pycurl import Curl -except: +except ImportError: pass from os.path import getsize @@ -16,9 +16,9 @@ PYCURL_UNAVAILABLE_MESSAGE = \ class PycurlTransport(object): def execute(self, url, data=None, input_path=None, output_path=None): - buf = self._open_output(output_path) + buf = _open_output(output_path) try: - c = self._new_curl_object() + c = _new_curl_object() c.setopt(c.URL, url.encode('ascii')) c.setopt(c.WRITEFUNCTION, buf.write) if input_path: @@ -37,11 +37,33 @@ class PycurlTransport(object): finally: buf.close() - def _new_curl_object(self): - try: - return Curl() - except NameError: - raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) - def _open_output(self, output_path): - return open(output_path, 'wb') if output_path else StringIO() +def post_file(url, path): + c = _new_curl_object() + c.setopt(c.URL, url.encode('ascii')) + c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, path))]) + c.perform() + + +def get_file(url, path): + buf = _open_output(path) + try: + c = _new_curl_object() + c.setopt(c.URL, url.encode('ascii')) + c.setopt(c.WRITEFUNCTION, buf.write) + c.perform() + finally: + buf.close() + + +def _open_output(output_path): + return open(output_path, 'wb') if output_path else StringIO() + + +def _new_curl_object(): + try: + return Curl() + except NameError: + raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) + +___all__ = [PycurlTransport, post_file, get_file] diff --git a/lwr/managers/staging/postprocess.py b/lwr/managers/staging/postprocess.py index 0a4b0939..b6b073f8 100644 --- a/lwr/managers/staging/postprocess.py +++ b/lwr/managers/staging/postprocess.py @@ -46,12 +46,7 @@ class LwrServerOutputCollector(object): return # Galaxy (client) will collect output. lwr_path = self.job_directory.calculate_path(name, output_type) - # TODO: Handle other action types... - if action.action_type == "remote_copy": - with open(lwr_path, "rb") as f: - copy_to_path(f, action.path) - else: - log.warn("Unable to handle action %s on LWR side." % action) + action.write_from_path(lwr_path) def __lwr_outputs(job_directory): diff --git a/test/test_utils.py b/test/test_utils.py index 2c872cd7..c2676e80 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -12,6 +12,11 @@ if version_info < (2, 7): else: from unittest import TestCase, skip +try: + from nose.tools import nottest +except ImportError: + nottest = lambda x: x + from webtest import TestApp from webtest.http import StopableWSGIServer @@ -119,6 +124,7 @@ class TestDependencyManager(object): return [] +@nottest @contextmanager def server_for_test_app(app): try: diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py new file mode 100644 index 00000000..7d8a6032 --- /dev/null +++ b/test/transfer_action_test.py @@ -0,0 +1,83 @@ +import os + +from .test_utils import server_for_test_app +from .test_utils import temp_directory + +import galaxy.util + +import webob +import webtest +import contextlib + +from lwr.lwr_client.action_mapper import RemoteTransferAction +from lwr.lwr_client.transport import curl +from lwr.framework import file_response + + +def app_factory(global_conf, **local_conf): + return JobFilesApp() + + +class JobFilesApp(object): + + def __init__(self, root_directory=None): + self.root_directory = root_directory + + def __call__(self, environ, start_response): + req = webob.Request(environ) + params = req.params.mixed() + method = req.method + if method == "POST": + resp = self._post(req, params) + elif method == "GET": + resp = self._get(req, params) + return resp(environ, start_response) + + def _post(self, request, params): + path = params['path'] + assert galaxy.util.in_directory(path, self.root_directory) + galaxy.util.copy_to_path(params["file"].file, path) + return webob.Response(body='') + + def _get(self, request, params): + path = params['path'] + assert galaxy.util.in_directory(path, self.root_directory) + return file_response(path) + + +@contextlib.contextmanager +def files_server(directory=None): + if not directory: + with temp_directory() as directory: + app = webtest.TestApp(JobFilesApp(directory)) + with server_for_test_app(app) as server: + yield server, directory + else: + app = webtest.TestApp(JobFilesApp(directory)) + with server_for_test_app(app) as server: + yield server + + +def test_write_to_file(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "remote_get") + open(from_path, "wb").write(u"123456") + + to_path = os.path.join(directory, "local_get") + url = server.application_url + "?path=%s" % from_path + RemoteTransferAction(to_path, url=url).write_to_path(to_path) + + assert open(to_path, "rb").read() == u"123456" + + +def test_write_from_file(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "local_post") + open(from_path, "wb").write(u"123456") + + to_path = os.path.join(directory, "remote_post") + url = server.application_url + "?path=%s" % to_path + RemoteTransferAction(to_path, url=url).write_from_path(from_path) + + posted_contents = open(to_path, "rb").read() + assert posted_contents == u"123456", posted_contents -- GitLab