diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py
index a71045def53bcc94d1d372a904db8e51ebb337be..34e40b753f91d4fd385bd1b7ce4b9e0735066e76 100644
--- a/pulsar/client/action_mapper.py
+++ b/pulsar/client/action_mapper.py
@@ -39,12 +39,17 @@ path_type = Bunch(
     CONFIG="config",
     # Files from tool's tool_dir (for now just wrapper if available).
     TOOL="tool",
-    # Input work dir files - e.g. metadata files, task-split input files, etc..
+    # Input work dir files - e.g.task-split input file
     WORKDIR="workdir",
+    # Input work dir files - e.g. metadata files, etc..
+    METADATA="metadata",
     # Galaxy output datasets in their final home.
     OUTPUT="output",
     # Galaxy from_work_dir output paths and other files (e.g. galaxy.json)
     OUTPUT_WORKDIR="output_workdir",
+    # Meta job and data files (e.g. Galaxy metadata generation files and
+    # metric instrumentation files)
+    OUTPUT_METADATA="output_metadata",
     # Other fixed tool parameter paths (likely coming from tool data, but not
     # nessecarily). Not sure this is the best name...
     UNSTRUCTURED="unstructured",
@@ -56,8 +61,10 @@ ACTION_DEFAULT_PATH_TYPES = [
     path_type.CONFIG,
     path_type.TOOL,
     path_type.WORKDIR,
+    path_type.METADATA,
     path_type.OUTPUT,
     path_type.OUTPUT_WORKDIR,
+    path_type.OUTPUT_METADATA,
 ]
 ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED]
 
@@ -206,7 +213,7 @@ class FileActionMapper(object):
         action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none"
         if mapper:
             action_type = mapper.action_type
-        if type in ["workdir", "output_workdir"] and action_type == "none":
+        if type in ["workdir", "output_workdir", "output_metadata"] and action_type == "none":
             # We are changing the working_directory relative to what
             # Galaxy would use, these need to be copied over.
             action_type = "copy"
diff --git a/pulsar/client/client.py b/pulsar/client/client.py
index d7f3439bac9174d1a3e5049848f0ab830e91c3a7..5bcd21feeff2b04e9e919ed7135c3a5be563f3c9 100644
--- a/pulsar/client/client.py
+++ b/pulsar/client/client.py
@@ -204,8 +204,8 @@ class JobClient(BaseJobClient):
             used if targetting an older Pulsar server that didn't return statuses
             allowing this to be inferred.
         """
-        if output_type == 'output_workdir':
-            self._fetch_work_dir_output(name, working_directory, path, action_type=action_type)
+        if output_type in ['output_workdir', 'output_metadata']:
+            self._populate_output_path(name, path, action_type, output_type)
         elif output_type == 'output':
             self._fetch_output(path=path, name=name, action_type=action_type)
         else:
@@ -219,22 +219,14 @@ class JobClient(BaseJobClient):
             # Extra files will send in the path.
             name = os.path.basename(path)
 
-        self.__populate_output_path(name, path, action_type)
+        self._populate_output_path(name, path, action_type, path_type.OUTPUT)
 
-    def _fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'):
+    def _populate_output_path(self, name, output_path, action_type, path_type):
         ensure_directory(output_path)
         if action_type == 'transfer':
-            self.__raw_download_output(name, self.job_id, path_type.OUTPUT_WORKDIR, output_path)
-        else:  # Even if action is none - Pulsar has a different work_dir so this needs to be copied.
-            pulsar_path = self._output_path(name, self.job_id, path_type.OUTPUT_WORKDIR)['path']
-            copy(pulsar_path, output_path)
-
-    def __populate_output_path(self, name, output_path, action_type):
-        ensure_directory(output_path)
-        if action_type == 'transfer':
-            self.__raw_download_output(name, self.job_id, path_type.OUTPUT, output_path)
+            self.__raw_download_output(name, self.job_id, path_type, output_path)
         elif action_type == 'copy':
-            pulsar_path = self._output_path(name, self.job_id, path_type.OUTPUT)['path']
+            pulsar_path = self._output_path(name, self.job_id, path_type)['path']
             copy(pulsar_path, output_path)
 
     @parseJson()
diff --git a/pulsar/client/job_directory.py b/pulsar/client/job_directory.py
index c4026aead9a7d5bcc29bf5bf11a3eae8266daeb9..f8d49b1c12151cfb5b0928a881626dd307d3cf50 100644
--- a/pulsar/client/job_directory.py
+++ b/pulsar/client/job_directory.py
@@ -17,8 +17,10 @@ TYPES_TO_METHOD = dict(
     config="configs_directory",
     tool="tool_files_directory",
     workdir="working_directory",
+    metadata="metadata_directory",
     output="outputs_directory",
     output_workdir="working_directory",
+    output_metadata="metadata_directory",
 )
 
 
@@ -33,6 +35,9 @@ class RemoteJobDirectory(object):
             remote_id
         )
 
+    def metadata_directory(self):
+        return self._sub_dir('metadata')
+
     def working_directory(self):
         return self._sub_dir('working')
 
@@ -72,7 +77,7 @@ class RemoteJobDirectory(object):
         # Obviously this client won't be legacy because this is in the
         # client module, but this code is reused on server which may
         # serve legacy clients.
-        allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir']
+        allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir', 'metadata', 'output_metadata']
         directory_function = getattr(self, TYPES_TO_METHOD.get(file_type, None), None)
         if not directory_function:
             raise Exception("Unknown file_type specified %s" % file_type)
diff --git a/pulsar/client/path_mapper.py b/pulsar/client/path_mapper.py
index 97c1a770311e51ab9b4c5c77dc54f0815fba6cf3..8e963387bb171fe37eb641ae9b764a5a6bde4d81 100644
--- a/pulsar/client/path_mapper.py
+++ b/pulsar/client/path_mapper.py
@@ -32,6 +32,7 @@ class PathMapper(object):
         self.input_directory = remote_job_config["inputs_directory"]
         self.output_directory = remote_job_config["outputs_directory"]
         self.working_directory = remote_job_config["working_directory"]
+        self.metadata_directory = remote_job_config.get("working_directory", None)
         self.unstructured_files_directory = remote_job_config["unstructured_files_directory"]
         self.config_directory = remote_job_config["configs_directory"]
         separator = remote_job_config["system_properties"]["separator"]
@@ -87,6 +88,8 @@ class PathMapper(object):
     def __remote_directory(self, dataset_path_type):
         if dataset_path_type in [path_type.OUTPUT]:
             return self.output_directory
+        elif dataset_path_type in [path_type.METADATA, path_type.OUTPUT_METADATA]:
+            return self.metadata_directory
         elif dataset_path_type in [path_type.WORKDIR, path_type.OUTPUT_WORKDIR]:
             return self.working_directory
         elif dataset_path_type in [path_type.INPUT]:
diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py
index c121bf28be757506ec5211af3df36d1b4358c659..029e08bc9c2bd60d04580933fdad641a1aeed6dd 100644
--- a/pulsar/client/staging/__init__.py
+++ b/pulsar/client/staging/__init__.py
@@ -31,7 +31,9 @@ class ClientJobDescription(object):
         Directory containing tool to execute (if a wrapper is used, it will
         be transferred to remote server).
     working_directory : str
-        Local path created by Galaxy for running this job.
+        Local path created by Galaxy for running this job (job_wrapper.tool_working_directory).
+    metadata_directory : str
+        Local path created by Galaxy for running this job (job_wrapper.working_directory).
     dependencies_description : list
         galaxy.tools.deps.dependencies.DependencyDescription object describing
         tool dependency context for remote depenency resolution.
@@ -57,7 +59,8 @@ class ClientJobDescription(object):
         config_files=[],
         input_files=[],
         client_outputs=None,
-        working_directory=None,  # More sensible default?
+        working_directory=None,
+        metadata_directory=None,
         dependencies_description=None,
         env=[],
         arbitrary_files=None,
@@ -69,6 +72,7 @@ class ClientJobDescription(object):
         self.input_files = input_files
         self.client_outputs = client_outputs or ClientOutputs()
         self.working_directory = working_directory
+        self.metadata_directory = metadata_directory
         self.dependencies_description = dependencies_description
         self.env = env
         self.rewrite_paths = rewrite_paths
@@ -103,9 +107,11 @@ class ClientOutputs(object):
         output_files=[],
         work_dir_outputs=None,
         version_file=None,
-        dynamic_outputs=None
+        dynamic_outputs=None,
+        metadata_directory=None,
     ):
         self.working_directory = working_directory
+        self.metadata_directory = metadata_directory
         self.work_dir_outputs = work_dir_outputs or []
         self.output_files = output_files or []
         self.version_file = version_file
@@ -115,6 +121,7 @@ class ClientOutputs(object):
     def to_dict(self):
         return dict(
             working_directory=self.working_directory,
+            metadata_directory=self.metadata_directory,
             work_dir_outputs=self.work_dir_outputs,
             output_files=self.output_files,
             version_file=self.version_file,
@@ -125,6 +132,7 @@ class ClientOutputs(object):
     def from_dict(config_dict):
         return ClientOutputs(
             working_directory=config_dict.get('working_directory'),
+            metadata_directory=config_dict.get('metadata_directory'),
             work_dir_outputs=config_dict.get('work_dir_outputs'),
             output_files=config_dict.get('output_files'),
             version_file=config_dict.get('version_file'),
@@ -139,9 +147,12 @@ class PulsarOutputs(object):
     """ Abstraction describing the output files PRODUCED by the remote Pulsar
     server. """
 
-    def __init__(self, working_directory_contents, output_directory_contents, remote_separator=sep):
+    def __init__(
+        self, working_directory_contents, output_directory_contents, metadata_directory_contents, remote_separator=sep
+    ):
         self.working_directory_contents = working_directory_contents
         self.output_directory_contents = output_directory_contents
+        self.metadata_directory_contents = metadata_directory_contents
         self.path_helper = PathHelper(remote_separator)
 
     @staticmethod
@@ -150,6 +161,7 @@ class PulsarOutputs(object):
         # by the Pulsar - older Pulsar instances will not set these in complete response.
         working_directory_contents = complete_response.get("working_directory_contents")
         output_directory_contents = complete_response.get("outputs_directory_contents")
+        metadata_directory_contents = complete_response.get("metadata_directory_contents")
         # Older (pre-2014) Pulsar servers will not include separator in response,
         # so this should only be used when reasoning about outputs in
         # subdirectories (which was not previously supported prior to that).
@@ -157,6 +169,7 @@ class PulsarOutputs(object):
         return PulsarOutputs(
             working_directory_contents,
             output_directory_contents,
+            metadata_directory_contents,
             remote_separator
         )
 
diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py
index 37f2e4965c372db0297fd53e73b59da24d5b7a1d..e03a0fe0bb8a572144238411d82e6da86cac9b11 100644
--- a/pulsar/client/staging/down.py
+++ b/pulsar/client/staging/down.py
@@ -1,6 +1,5 @@
 from os.path import join
 from os.path import relpath
-from re import compile
 from contextlib import contextmanager
 
 from ..staging import COMMAND_VERSION_FILENAME
@@ -10,12 +9,6 @@ from ..action_mapper import FileActionMapper
 from logging import getLogger
 log = getLogger(__name__)
 
-# All output files marked with from_work_dir attributes will copied or downloaded
-# this pattern picks up attiditional files to copy back - such as those
-# associated with multiple outputs and metadata configuration. Set to .* to just
-# copy everything
-COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*|dataset_\d+\.dat|__instrument_.*|dataset_\d+_files.+")
-
 
 def finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs):
     """ Responsible for downloading results from remote server and cleaning up
@@ -63,12 +56,14 @@ class ResultsCollector(object):
         self.exception_tracker = DownloadExceptionTracker()
         self.output_files = client_outputs.output_files
         self.working_directory_contents = pulsar_outputs.working_directory_contents or []
+        self.metadata_directory_contents = pulsar_outputs.metadata_directory_contents or []
 
     def collect(self):
         self.__collect_working_directory_outputs()
         self.__collect_outputs()
         self.__collect_version_file()
         self.__collect_other_working_directory_files()
+        self.__collect_metadata_directory_files()
         return self.exception_tracker.collection_failure_exceptions
 
     def __collect_working_directory_outputs(self):
@@ -105,15 +100,31 @@ class ResultsCollector(object):
             self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME)
 
     def __collect_other_working_directory_files(self):
-        working_directory = self.client_outputs.working_directory
+        self.__collect_directory_files(
+            self.client_outputs.working_directory,
+            self.working_directory_contents,
+            'output_workdir',
+        )
+
+    def __collect_metadata_directory_files(self):
+        self.__collect_directory_files(
+            self.client_outputs.metadata_directory,
+            self.metadata_directory_contents,
+            'output_metadata',
+        )
+
+    def __collect_directory_files(self, directory, contents, output_type):
+        if directory is None:  # e.g. output_metadata_directory
+            return
+
         # Fetch remaining working directory outputs of interest.
-        for name in self.working_directory_contents:
+        for name in contents:
             if name in self.downloaded_working_directory_files:
                 continue
             if self.client_outputs.dynamic_match(name):
-                log.debug("collecting dynamic output %s" % name)
-                output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name))
-                if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name):
+                log.debug("collecting dynamic %s file %s" % (output_type, name))
+                output_file = join(directory, self.pulsar_outputs.path_helper.local_name(name))
+                if self._attempt_collect_output(output_type=output_type, path=output_file, name=name):
                     self.downloaded_working_directory_files.append(name)
 
     def _attempt_collect_output(self, output_type, path, name=None):
diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py
index 46ebe3192909f2ba1c4532d3969afc665e83cd58..9e648d35761415369bc13e4a92e6b53b3229235a 100644
--- a/pulsar/client/staging/up.py
+++ b/pulsar/client/staging/up.py
@@ -74,6 +74,7 @@ class FileStager(object):
             self.tool_version = None
             self.tool_dir = None
         self.working_directory = client_job_description.working_directory
+        self.metadata_directory = client_job_description.metadata_directory
         self.version_file = client_job_description.version_file
         self.arbitrary_files = client_job_description.arbitrary_files
         self.rewrite_paths = client_job_description.rewrite_paths
@@ -95,6 +96,7 @@ class FileStager(object):
         self.__upload_tool_files()
         self.__upload_input_files()
         self.__upload_working_directory_files()
+        self.__upload_metadata_directory_files()
         self.__upload_arbitrary_files()
 
         if self.rewrite_paths:
@@ -190,12 +192,24 @@ class FileStager(object):
             path = join(self.working_directory, working_directory_file)
             self.transfer_tracker.handle_transfer(path, path_type.WORKDIR)
 
+    def __upload_metadata_directory_files(self):
+        metadata_directory_files = self.__metadata_directory_files()
+        for metadata_directory_file in metadata_directory_files:
+            path = join(self.metadata_directory, metadata_directory_file)
+            self.transfer_tracker.handle_transfer(path, path_type.METADATA)
+
     def __working_directory_files(self):
         if self.working_directory and exists(self.working_directory):
             return listdir(self.working_directory)
         else:
             return []
 
+    def __metadata_directory_files(self):
+        if self.metadata_directory and exists(self.metadata_directory):
+            return listdir(self.metadata_directory)
+        else:
+            return []
+
     def __initialize_version_file_rename(self):
         version_file = self.version_file
         if version_file:
diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py
index 50ccb4ca4923b64e80968f4d7b19c48a9de2668e..9273b6425696ba9f54052e18505e9394611c2f12 100644
--- a/pulsar/client/test/check.py
+++ b/pulsar/client/test/check.py
@@ -33,13 +33,24 @@ import sys
 from os import getenv
 from os import makedirs
 from os import listdir
+from os.path import exists
 from os.path import join
 from os.path import basename
 from os.path import dirname
 
+
+def assert_path_contents(path, expected_contents):
+    if not exists(path):
+        message = "Expected path [%s] to exist, but it doesn't."
+        raise AssertionError(message % path)
+
+    with open(path, 'r') as f:
+        contents = f.read()
+        if contents != expected_contents:
+            message = "Expected path [%s] to be have contents [%s], but contains [%s]."
+            raise AssertionError(message % (path, expected_contents, contents))
+
 config_input = open(sys.argv[1], 'r')
-input_input = open(sys.argv[2], 'r')
-input_extra = open(sys.argv[8], 'r')
 output = open(sys.argv[3], 'w')
 output2 = open(sys.argv[5], 'w')
 output2_contents = sys.argv[6]
@@ -49,9 +60,12 @@ index_path = sys.argv[10]
 assert len(listdir(dirname(index_path))) == 2
 assert len(listdir(join(dirname(dirname(index_path)), "seq"))) == 1
 output4_index_path = open(sys.argv[11], 'w')
+metadata_dir = dirname(sys.argv[13])
+output_metadata_path = join(metadata_dir, "metadata_output")
 try:
-    assert input_input.read() == "Hello world input!!@!"
-    assert input_extra.read() == "INPUT_EXTRA_CONTENTS"
+    assert_path_contents(sys.argv[2], "Hello world input!!@!")
+    assert_path_contents(sys.argv[8], "INPUT_EXTRA_CONTENTS")
+    assert_path_contents(sys.argv[13], "meta input")
     contents = config_input.read(1024)
     output.write(contents)
     open("workdir_output", "w").write("WORK DIR OUTPUT")
@@ -59,6 +73,7 @@ try:
     open("rewrite_action_test", "w").write(sys.argv[12])
     output2.write(output2_contents)
     with open("galaxy.json", "w") as f: f.write("GALAXY_JSON")
+    with open(output_metadata_path, "w") as f: f.write("meta output")
     output3.write(getenv("MOO", "moo_default"))
     output1_extras_path = "%s_files" % sys.argv[3][0:-len(".dat")]
     makedirs(output1_extras_path)
@@ -111,9 +126,17 @@ def run(options):
         temp_index_dir_sibbling = os.path.join(temp_directory, "idx", "seq")
         temp_shared_dir = os.path.join(temp_directory, "shared", "test1")
         temp_work_dir = os.path.join(temp_directory, "w")
+        temp_metadata_dir = os.path.join(temp_directory, "m")
         temp_tool_dir = os.path.join(temp_directory, "t")
 
-        __makedirs([temp_tool_dir, temp_work_dir, temp_index_dir, temp_index_dir_sibbling, temp_shared_dir])
+        __makedirs([
+            temp_tool_dir,
+            temp_work_dir,
+            temp_index_dir,
+            temp_index_dir_sibbling,
+            temp_shared_dir,
+            temp_metadata_dir,
+        ])
 
         temp_input_path = os.path.join(temp_directory, "dataset_0.dat")
         temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra")
@@ -128,6 +151,7 @@ def run(options):
         temp_version_output_path = os.path.join(temp_directory, "GALAXY_VERSION_1234")
         temp_output_workdir_destination = os.path.join(temp_directory, "dataset_77.dat")
         temp_output_workdir = os.path.join(temp_work_dir, "env_test")
+        temp_metadata_path = os.path.join(temp_metadata_dir, "metadata_test123")
 
         temp_output_workdir_destination2 = os.path.join(temp_directory, "dataset_78.dat")
         temp_output_workdir2 = os.path.join(temp_work_dir, "rewrite_action_test")
@@ -135,6 +159,7 @@ def run(options):
         __write_to_file(temp_input_path, b"Hello world input!!@!")
         __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS")
         __write_to_file(temp_config_path, EXPECTED_OUTPUT)
+        __write_to_file(temp_metadata_path, "meta input")
         __write_to_file(temp_tool_path, TEST_SCRIPT)
         __write_to_file(temp_index_path, b"AGTC")
         # Implicit files that should also get transferred since depth > 0
@@ -159,9 +184,10 @@ def run(options):
             temp_index_path,
             temp_output4_path,
             temp_shared_dir,
+            temp_metadata_path,
         )
         assert os.path.exists(temp_index_path)
-        command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
+        command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
         config_files = [temp_config_path]
         input_files = [temp_input_path, empty_input]
         output_files = [
@@ -176,6 +202,7 @@ def run(options):
         waiter = Waiter(client, client_manager)
         client_outputs = ClientOutputs(
             working_directory=temp_work_dir,
+            metadata_directory=temp_metadata_dir,
             work_dir_outputs=[
                 (temp_output_workdir, temp_output_workdir_destination),
                 (temp_output_workdir2, temp_output_workdir_destination2),
@@ -191,6 +218,7 @@ def run(options):
             input_files=input_files,
             client_outputs=client_outputs,
             working_directory=temp_work_dir,
+            metadata_directory=temp_metadata_dir,
             **__extra_job_description_kwargs(options)
         )
         submit_job(client, job_description)
@@ -201,6 +229,7 @@ def run(options):
         __assert_contents(temp_output2_path, cmd_text, result_status)
         __assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON", result_status)
         __assert_contents(os.path.join(temp_directory, "dataset_1_files", "extra"), b"EXTRA_OUTPUT_CONTENTS", result_status)
+        __assert_contents(os.path.join(temp_metadata_dir, "metadata_output"), b"meta output", result_status)
         if getattr(options, "test_rewrite_action", False):
             __assert_contents(temp_output_workdir_destination2, os.path.join(temp_directory, "shared2", "test1"), result_status)
         if job_description.env:
diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py
index 707da732e75477cd16342727f3accdf7a1b62481..b48b9c1788f07eb2f8a22eec6351afc24c92be04 100644
--- a/pulsar/manager_endpoint_util.py
+++ b/pulsar/manager_endpoint_util.py
@@ -40,6 +40,7 @@ def __job_complete_dict(complete_status, manager, job_id):
         stderr=stderr_contents,
         working_directory=job_directory.working_directory(),
         working_directory_contents=job_directory.working_directory_contents(),
+        metadata_directory_contents=job_directory.metadata_directory_contents(),
         outputs_directory_contents=job_directory.outputs_directory_contents(),
         system_properties=manager.system_properties(),
     )
diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py
index 6c448df40881a415df169e2d6b9f99084f407c93..a1365d0108e90904ef07337a1611521402398177 100644
--- a/pulsar/managers/base/__init__.py
+++ b/pulsar/managers/base/__init__.py
@@ -29,6 +29,7 @@ from pulsar.client.job_directory import verify_is_in_directory
 JOB_DIRECTORY_INPUTS = "inputs"
 JOB_DIRECTORY_OUTPUTS = "outputs"
 JOB_DIRECTORY_WORKING = "working"
+JOB_DIRECTORY_METADATA = "metadata"
 JOB_DIRECTORY_CONFIGS = "configs"
 JOB_DIRECTORY_TOOL_FILES = "tool_files"
 
@@ -145,7 +146,8 @@ class BaseManager(ManagerInterface):
                           JOB_DIRECTORY_WORKING,
                           JOB_DIRECTORY_OUTPUTS,
                           JOB_DIRECTORY_CONFIGS,
-                          JOB_DIRECTORY_TOOL_FILES]:
+                          JOB_DIRECTORY_TOOL_FILES,
+                          JOB_DIRECTORY_METADATA]:
             job_directory.make_directory(directory)
         return job_directory
 
@@ -283,6 +285,10 @@ class JobDirectory(RemoteJobDirectory):
         outputs_directory = self.outputs_directory()
         return self.__directory_contents(outputs_directory)
 
+    def metadata_directory_contents(self):
+        metadata_directory = self.metadata_directory()
+        return self.__directory_contents(metadata_directory)
+
     def __directory_contents(self, directory):
         contents = []
         for path, _, files in walk(directory):
diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py
index a13792b0089c9a6c30576cf06c34a0853b060448..060dcb053d07adeef0d83b00f0c82700dc382852 100644
--- a/pulsar/managers/staging/post.py
+++ b/pulsar/managers/staging/post.py
@@ -30,6 +30,7 @@ def __collect_outputs(job_directory, staging_config, action_executor):
         pulsar_outputs = __pulsar_outputs(job_directory)
         output_collector = PulsarServerOutputCollector(job_directory, action_executor)
         results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
+        results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
         collection_failure_exceptions = results_collector.collect()
         if collection_failure_exceptions:
             log.warn("Failures collecting results %s" % collection_failure_exceptions)
@@ -62,9 +63,11 @@ class PulsarServerOutputCollector(object):
 def __pulsar_outputs(job_directory):
     working_directory_contents = job_directory.working_directory_contents()
     output_directory_contents = job_directory.outputs_directory_contents()
+    metadata_directory_contents = job_directory.metadata_directory_contents()
     return PulsarOutputs(
         working_directory_contents,
         output_directory_contents,
+        metadata_directory_contents,
     )
 
 __all__ = ['postprocess']
diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py
index cc8680e3628f1df81fed615b54c94061ae63bea3..45fb1ede07402f4c226b8e891320f8c53d9410dc 100644
--- a/pulsar/web/routes.py
+++ b/pulsar/web/routes.py
@@ -89,14 +89,14 @@ def cancel(manager, job_id):
 
 @PulsarController(path="/jobs/{job_id}/files", method="POST", response_type='json')
 def upload_file(manager, type, file_cache, job_id, name, body, cache_token=None):
-    # Input type should be one of input, config, workdir, tool, or unstructured (see action_mapper.path_type)
+    # Input type should be one of input, config, workdir, metadata, tool, or unstructured (see action_mapper.path_type)
     path = manager.job_directory(job_id).calculate_path(name, type)
     return _handle_upload(file_cache, path, body, cache_token=cache_token)
 
 
 @PulsarController(path="/jobs/{job_id}/files/path", method="GET", response_type='json')
 def path(manager, type, job_id, name):
-    if type in [path_type.OUTPUT, path_type.OUTPUT_WORKDIR]:
+    if type in [path_type.OUTPUT, path_type.OUTPUT_WORKDIR, path_type.OUTPUT_METADATA]:
         path = _output_path(manager, job_id, name, type)
     else:
         path = manager.job_directory(job_id).calculate_path(name, type)
@@ -121,6 +121,8 @@ def _output_path(manager, job_id, name, output_type):
     directory = manager.job_directory(job_id).outputs_directory()
     if output_type == path_type.OUTPUT_WORKDIR:  # action_mapper.path_type.OUTPUT_WORKDIR
         directory = manager.job_directory(job_id).working_directory()
+    elif output_type == path_type.OUTPUT_METADATA:
+        directory = manager.job_directory(job_id).metadata_directory()
     path = os.path.join(directory, name)
     verify_is_in_directory(path, directory)
     return path