From 39de377e8fc017e5307ad2d2c863f37233bff020 Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Fri, 10 May 2019 11:43:51 -0400
Subject: [PATCH] Implement staging metadata inputs in the client.

---
 pulsar/client/__init__.py         | 15 ++++++++++++---
 pulsar/client/path_mapper.py      |  8 ++++++--
 pulsar/client/staging/__init__.py |  4 ++--
 pulsar/client/staging/up.py       | 11 ++++++++++-
 pulsar/client/test/check.py       | 18 +++++++++++++++---
 test/client_staging_test.py       |  1 +
 6 files changed, 46 insertions(+), 11 deletions(-)

diff --git a/pulsar/client/__init__.py b/pulsar/client/__init__.py
index 951cb3c7..8ecdb1c1 100644
--- a/pulsar/client/__init__.py
+++ b/pulsar/client/__init__.py
@@ -44,9 +44,14 @@ from .destination import url_to_destination_params
 from .exceptions import PulsarClientTransportError
 from .manager import build_client_manager
 from .path_mapper import PathMapper
-from .staging import ClientJobDescription
-from .staging import PulsarOutputs
-from .staging import ClientOutputs
+from .staging import (
+    ClientJobDescription,
+    ClientInputs,
+    ClientInput,
+    ClientOutputs,
+    CLIENT_INPUT_PATH_TYPES,
+    PulsarOutputs,
+)
 from .staging.down import finish_job
 from .staging.up import submit_job
 
@@ -58,6 +63,10 @@ __all__ = [
     'submit_job',
     'ClientJobDescription',
     'PulsarOutputs',
+    'ClientInput',
+    'ClientInputs',
+    'ClientOutputs',
+    'CLIENT_INPUT_PATH_TYPES',
     'ClientOutputs',
     'PathMapper',
     'PulsarClientTransportError',
diff --git a/pulsar/client/path_mapper.py b/pulsar/client/path_mapper.py
index 3383ed39..dab809e5 100644
--- a/pulsar/client/path_mapper.py
+++ b/pulsar/client/path_mapper.py
@@ -4,6 +4,7 @@ from galaxy.util import in_directory
 
 from .action_mapper import FileActionMapper
 from .action_mapper import path_type
+from .staging import CLIENT_INPUT_PATH_TYPES
 from .util import PathHelper
 
 
@@ -46,8 +47,11 @@ class PathMapper(object):
         remote_path = self.__remote_path_rewrite(local_path, output_type)
         return remote_path
 
-    def remote_input_path_rewrite(self, local_path):
-        remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT)
+    def remote_input_path_rewrite(self, local_path, client_input_path_type=None):
+        name = None
+        if client_input_path_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH:
+            name = "metadata_%s" % os.path.basename(local_path)
+        remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT, name=name)
         return remote_path
 
     def remote_version_path_rewrite(self, local_path):
diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py
index 17e257ea..453d276a 100644
--- a/pulsar/client/staging/__init__.py
+++ b/pulsar/client/staging/__init__.py
@@ -84,7 +84,7 @@ class ClientJobDescription(object):
             # Deprecated input but provided for backward compatibility.
             assert client_inputs is None
             client_inputs = ClientInputs.for_simple_input_paths(input_files)
-        self.client_inputs = client_inputs
+        self.client_inputs = client_inputs or ClientInputs([])
         self.client_outputs = client_outputs or ClientOutputs()
         self.working_directory = working_directory
         self.metadata_directory = metadata_directory
@@ -141,7 +141,7 @@ class ClientInputs(object):
             client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
             files_path = "%s_files" % input_file[0:-len(".dat")]
             if exists(files_path):
-                client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
+                client_inputs.append(ClientInput(files_path, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
 
         return ClientInputs(client_inputs)
 
diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py
index a6d8f8f2..f88d29b4 100644
--- a/pulsar/client/staging/up.py
+++ b/pulsar/client/staging/up.py
@@ -215,14 +215,17 @@ class FileStager(object):
             path = client_input.path
             if path in handled_inputs:
                 continue
+
             if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH:
                 self.__upload_input_file(path)
                 handled_inputs.add(path)
             elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH:
                 self.__upload_input_extra_files(path)
                 handled_inputs.add(path)
+            elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH:
+                self.__upload_input_metadata_file(path)
+                handled_inputs.add(path)
             else:
-                # TODO: implement metadata...
                 raise NotImplementedError()
 
     def __upload_input_file(self, input_file):
@@ -241,6 +244,12 @@ class FileStager(object):
                 remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path)))
                 self.transfer_tracker.handle_transfer(extra_file_path, path_type.INPUT, name=remote_name)
 
+    def __upload_input_metadata_file(self, path):
+        if self.__stage_input(path):
+            # Name must match what is generated in remote_input_path_rewrite in path_mapper.
+            remote_name = "metadata_%s" % basename(path)
+            self.transfer_tracker.handle_transfer(path, path_type.INPUT, name=remote_name)
+
     def __upload_working_directory_files(self):
         # Task manager stages files into working directory, these need to be
         # uploaded if present.
diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py
index dbc4550e..5935c9c4 100644
--- a/pulsar/client/test/check.py
+++ b/pulsar/client/test/check.py
@@ -24,7 +24,10 @@ from six import binary_type
 from pulsar.client import (
     build_client_manager,
     ClientJobDescription,
+    ClientInputs,
+    ClientInput,
     ClientOutputs,
+    CLIENT_INPUT_PATH_TYPES,
     finish_job,
     PulsarOutputs,
     submit_job,
@@ -73,6 +76,7 @@ try:
     assert_path_contents(sys.argv[2], "Hello world input!!@!")
     assert_path_contents(sys.argv[8], "INPUT_EXTRA_CONTENTS")
     assert_path_contents(sys.argv[13], "meta input")
+    assert_path_contents(sys.argv[14], "INPUT METADATA CONTENTS...")
     contents = config_input.read(1024)
     output.write(contents)
     open("workdir_output", "w").write("WORK DIR OUTPUT")
@@ -149,6 +153,7 @@ def run(options):
 
         temp_input_path = os.path.join(temp_directory, "dataset_0.dat")
         temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra")
+        temp_input_metadata_path = os.path.join(temp_directory, "metadata", "12312231231231.dat")
         temp_index_path = os.path.join(temp_index_dir, "human.fa")
 
         temp_config_path = os.path.join(temp_work_dir, "config.txt")
@@ -167,6 +172,7 @@ def run(options):
 
         __write_to_file(temp_input_path, b"Hello world input!!@!")
         __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS")
+        __write_to_file(temp_input_metadata_path, b"INPUT METADATA CONTENTS...")
         __write_to_file(temp_config_path, EXPECTED_OUTPUT)
         __write_to_file(temp_metadata_path, "meta input")
         __write_to_file(temp_tool_path, TEST_SCRIPT)
@@ -194,11 +200,17 @@ def run(options):
             temp_output4_path,
             temp_shared_dir,
             temp_metadata_path,
+            temp_input_metadata_path,
         )
         assert os.path.exists(temp_index_path)
-        command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
+        command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
         config_files = [temp_config_path]
-        input_files = [temp_input_path, temp_input_path, empty_input]
+        client_inputs = []
+        client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
+        client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
+        client_inputs.append(ClientInput(empty_input, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
+        client_inputs.append(ClientInput(os.path.join(temp_directory, "dataset_0_files"), CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
+        client_inputs.append(ClientInput(temp_input_metadata_path, CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH))
         output_files = [
             temp_output_path,
             temp_output2_path,
@@ -224,7 +236,7 @@ def run(options):
             command_line=command_line,
             tool=MockTool(temp_tool_dir),
             config_files=config_files,
-            input_files=input_files,
+            client_inputs=ClientInputs(client_inputs),
             client_outputs=client_outputs,
             working_directory=temp_work_dir,
             metadata_directory=temp_metadata_dir,
diff --git a/test/client_staging_test.py b/test/client_staging_test.py
index d6ae5155..cc570557 100644
--- a/test/client_staging_test.py
+++ b/test/client_staging_test.py
@@ -45,6 +45,7 @@ class TestStager(TempDirectoryTestCase):
         os.makedirs(files_directory)
         self.input1 = os.path.join(files_directory, "dataset_1.dat")
         self.input1_files_path = os.path.join(files_directory, "dataset_1_files")
+        os.makedirs(self.input1_files_path)
         open(self.input1, "wb").write(b"012345")
         self.input2 = os.path.join(files_directory, "dataset_2.dat")
         open(self.input2, "wb").write(b"6789")
-- 
GitLab