diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index 7b0a47c3c9b2c3a86b53ab4b63dd32444089e382..c121bf28be757506ec5211af3df36d1b4358c659 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -1,3 +1,4 @@ +import re from os.path import basename from os.path import join from os.path import dirname @@ -6,6 +7,7 @@ from os import sep from ..util import PathHelper COMMAND_VERSION_FILENAME = "COMMAND_VERSION" +DEFAULT_DYNAMIC_COLLECTION_PATTERN = [r"primary_.*|galaxy.json|metadata_.*|dataset_\d+\.dat|__instrument_.*|dataset_\d+_files.+"] class ClientJobDescription(object): @@ -50,12 +52,12 @@ class ClientJobDescription(object): def __init__( self, - tool, command_line, - config_files, - input_files, - client_outputs, - working_directory, + tool=None, + config_files=[], + input_files=[], + client_outputs=None, + working_directory=None, # More sensible default? dependencies_description=None, env=[], arbitrary_files=None, @@ -65,7 +67,7 @@ class ClientJobDescription(object): self.command_line = command_line self.config_files = config_files self.input_files = input_files - self.client_outputs = client_outputs + self.client_outputs = client_outputs or ClientOutputs() self.working_directory = working_directory self.dependencies_description = dependencies_description self.env = env @@ -95,18 +97,28 @@ class ClientOutputs(object): runner client. """ - def __init__(self, working_directory, output_files, work_dir_outputs=None, version_file=None): + def __init__( + self, + working_directory=None, + output_files=[], + work_dir_outputs=None, + version_file=None, + dynamic_outputs=None + ): self.working_directory = working_directory - self.work_dir_outputs = work_dir_outputs - self.output_files = output_files + self.work_dir_outputs = work_dir_outputs or [] + self.output_files = output_files or [] self.version_file = version_file + self.dynamic_outputs = dynamic_outputs or DEFAULT_DYNAMIC_COLLECTION_PATTERN + self.__dynamic_patterns = list(map(re.compile, self.dynamic_outputs)) def to_dict(self): return dict( working_directory=self.working_directory, work_dir_outputs=self.work_dir_outputs, output_files=self.output_files, - version_file=self.version_file + version_file=self.version_file, + dynamic_outputs=self.dynamic_outputs, ) @staticmethod @@ -116,8 +128,12 @@ class ClientOutputs(object): work_dir_outputs=config_dict.get('work_dir_outputs'), output_files=config_dict.get('output_files'), version_file=config_dict.get('version_file'), + dynamic_outputs=config_dict.get('dynamic_outputs'), ) + def dynamic_match(self, filename): + return any(map(lambda pattern: pattern.match(filename), self.__dynamic_patterns)) + class PulsarOutputs(object): """ Abstraction describing the output files PRODUCED by the remote Pulsar diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index 00191009e7b13e323ad10e640b787b8c2f9a06f6..8feeefc4a2b07b1a2a151057a8d804054bf75861 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -110,7 +110,8 @@ class ResultsCollector(object): for name in self.working_directory_contents: if name in self.downloaded_working_directory_files: continue - if COPY_FROM_WORKING_DIRECTORY_PATTERN.match(name): + if self.client_outputs.dynamic_match(name): + log.info("collecting %s" % name) output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name)) if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name): self.downloaded_working_directory_files.append(name) diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index b88bfea23d39a794fa0ca3d73ee2d6df39852405..46ebe3192909f2ba1c4532d3969afc665e83cd58 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -65,9 +65,14 @@ class FileStager(object): self.config_files = client_job_description.config_files self.input_files = client_job_description.input_files self.output_files = client_job_description.output_files - self.tool_id = client_job_description.tool.id - self.tool_version = client_job_description.tool.version - self.tool_dir = abspath(client_job_description.tool.tool_dir) + if client_job_description.tool is not None: + self.tool_id = client_job_description.tool.id + self.tool_version = client_job_description.tool.version + self.tool_dir = abspath(client_job_description.tool.tool_dir) + else: + self.tool_id = None + self.tool_version = None + self.tool_dir = None self.working_directory = client_job_description.working_directory self.version_file = client_job_description.version_file self.arbitrary_files = client_job_description.arbitrary_files @@ -180,11 +185,17 @@ class FileStager(object): def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be # uploaded if present. - working_directory_files = listdir(self.working_directory) if exists(self.working_directory) else [] + working_directory_files = self.__working_directory_files() for working_directory_file in working_directory_files: path = join(self.working_directory, working_directory_file) self.transfer_tracker.handle_transfer(path, path_type.WORKDIR) + def __working_directory_files(self): + if self.working_directory and exists(self.working_directory): + return listdir(self.working_directory) + else: + return [] + def __initialize_version_file_rename(self): version_file = self.version_file if version_file: @@ -298,6 +309,9 @@ class JobInputs(object): Full path to directory to search. """ + if directory is None: + return [] + pattern = r"(%s%s\S+)" % (directory, sep) return self.find_pattern_references(pattern) diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 5dc739ef5ae64441d578ccaf48da91dac8edd270..f2b0df128a14ff565c8e99462656a608483d5d9d 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -12,6 +12,8 @@ import tempfile import threading import time import traceback +from collections import namedtuple + from io import open from six import binary_type @@ -83,12 +85,15 @@ HELP_SUPPRESS_OUTPUT = "" HELP_DISABLE_CLEANUP = ("Specify to disable cleanup after the job, this " "is useful to checking the files generated during " "the job and stored on the Pulsar server.") +HELP_JOB_ID = "Submit the Pulsar job with this 'external' id." EXPECTED_OUTPUT = b"hello world output" EXAMPLE_UNICODE_TEXT = u'єχαмÏâ„“Ñ” συтÏÏ…Ñ‚' TEST_REQUIREMENT = ToolRequirement(name="dep1", version="1.1", type="package") TEST_DEPENDENCIES = DependenciesDescription(requirements=[TEST_REQUIREMENT]) +ClientInfo = namedtuple("ClientInfo", ["client", "client_manager"]) + class MockTool(object): @@ -303,7 +308,8 @@ def __exercise_errors(options, client, temp_output_path, temp_directory): def __client(temp_directory, options): - default_file_action = getattr(options, "default_file_action", None) + client_options = extract_client_options(options) + default_file_action = client_options.get("default_file_action", None) unstructured_action = default_file_action or "transfer" path_defs = [ dict(path=os.path.join(temp_directory, "idx"), path_types="unstructured", depth=2, action=unstructured_action), @@ -317,17 +323,7 @@ def __client(temp_directory, options): destination_directory=os.path.join(temp_directory, "shared2") ) path_defs.append(rewrite_def) - client_options = { - "url": getattr(options, "url", None), - "private_token": getattr(options, "private_token", None), - "file_action_config": write_config(temp_directory, dict(paths=path_defs)), - } - if default_file_action: - client_options["default_file_action"] = default_file_action - if hasattr(options, "jobs_directory"): - client_options["jobs_directory"] = getattr(options, "jobs_directory") - if hasattr(options, "files_endpoint"): - client_options["files_endpoint"] = getattr(options, "files_endpoint") + client_options["file_action_config"] = write_config(temp_directory, dict(paths=path_defs)) if default_file_action in ["remote_scp_transfer", "remote_rsync_transfer"]: test_key = os.environ["PULSAR_TEST_KEY"] if not test_key.startswith("----"): @@ -340,12 +336,42 @@ def __client(temp_directory, options): user = getattr(options, 'user', None) if user: client_options["submit_user"] = user - client_manager = __client_manager(options) - client = client_manager.get_client(client_options, "123456") - return client, client_manager + job_id = getattr(options, "job_id", "123456") + return client_info(options, client_options, job_id=job_id) + + +def extract_client_options(options): + """ Exract options explicitly related to build client from + configured client manager. + + """ + default_file_action = getattr(options, "default_file_action", None) + client_options = { + "url": getattr(options, "url", None), + "private_token": getattr(options, "private_token", None), + } + if default_file_action: + client_options["default_file_action"] = default_file_action + if hasattr(options, "jobs_directory"): + client_options["jobs_directory"] = getattr(options, "jobs_directory") + if hasattr(options, "files_endpoint"): + client_options["files_endpoint"] = getattr(options, "files_endpoint") + return client_options + + +def client_info(options, client_options, job_id=None): + """ From command-line arguments ``options`` - extract options + related to build a client manager and build it. Then get a client + with supplied client options and optional job id. + """ + if job_id is None: + job_id = options.job_id + client_manager = client_manager_from_args(options) + client = client_manager.get_client(client_options, job_id) + return ClientInfo(client, client_manager) -def __client_manager(options): +def client_manager_from_args(options): manager_args = {} simple_client_manager_options = ['cache', 'job_manager', 'file_cache'] for client_manager_option in simple_client_manager_options: @@ -403,7 +429,7 @@ def __finish(options, client, client_outputs, result_status): assert False, failed_message -def main(): +def main(argv=None): """ Exercises a running Pulsar with the Pulsar client. """ mod_docstring = sys.modules[__name__].__doc__ parser = optparse.OptionParser(mod_docstring) @@ -414,7 +440,8 @@ def main(): parser.add_option('--test_errors', default=False, action="store_true", help=HELP_TEST_ERRORS) parser.add_option('--suppress_output', default=False, action="store_true", help=HELP_SUPPRESS_OUTPUT) parser.add_option('--disable_cleanup', dest="cleanup", default=True, action="store_false", help=HELP_DISABLE_CLEANUP) - (options, args) = parser.parse_args() + parser.add_option('--job_id', default="123456", help=HELP_JOB_ID) + (options, args) = parser.parse_args(argv) run(options) if __name__ == "__main__": diff --git a/pulsar/scripts/mesos_executor.py b/pulsar/scripts/mesos_executor.py index df39ecbd24a4f2f022c7bec2ec407a7c876ff007..c89417ae5eb3e027d32e0e5cc7e3a437dccb6828 100644 --- a/pulsar/scripts/mesos_executor.py +++ b/pulsar/scripts/mesos_executor.py @@ -8,7 +8,7 @@ from pulsar.mesos import ( ensure_mesos_libs, ) from pulsar.client.util import from_base64_json -from pulsar.scripts.submit import ( +from pulsar.scripts.submit_util import ( manager_from_args, wait_for_job ) diff --git a/pulsar/scripts/run.py b/pulsar/scripts/run.py new file mode 100644 index 0000000000000000000000000000000000000000..ea6be32e2539b0b63ac539ea51dc31977a7eac2b --- /dev/null +++ b/pulsar/scripts/run.py @@ -0,0 +1,114 @@ +""" CLI related utilities for submitting Pulsar jobs. +""" +import fnmatch +import sys +import uuid + +from pulsar.main import ArgumentParser +from pulsar.scripts.submit_util import ( + add_common_submit_args, + run_server_for_job, +) + +from pulsar.client.test.check import ( + HELP_URL, + HELP_PRIVATE_TOKEN, + HELP_TRANSPORT, + HELP_SUPPRESS_OUTPUT, + HELP_DISABLE_CLEANUP, + HELP_JOB_ID, + extract_client_options, + client_info, + Waiter, +) +from pulsar.client import ClientJobDescription +from pulsar.client import ClientOutputs +from pulsar.client import PulsarOutputs +from pulsar.client import submit_job +from pulsar.client import finish_job +from pulsar.client.util import json_dumps + +HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL." +HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one." +HELP_COMMAND = "Shell command to execute on Pulsar server." +HELP_WORKING_DIRECTORY = "Local working directory (will be translated to a new directory)." +HELP_OUTPUT = "Output glob to collect from job (relative to remote working directory)." +HELP_OUTPUT_PATTERN = "Output pattern to collect from job (relative to remote working directory)." + +DEFAULT_CLIENT_URL = 'http://localhost:8913/' + + +def main(argv=None): + mod_docstring = sys.modules[__name__].__doc__ + arg_parser = ArgumentParser(description=mod_docstring) + add_common_submit_args(arg_parser) + arg_parser.add_argument('--url', default=DEFAULT_CLIENT_URL, help=HELP_URL) + arg_parser.add_argument('--amqp_url', default=DEFAULT_CLIENT_URL, help=HELP_AMQP_URL) + arg_parser.add_argument('--private_token', default=None, help=HELP_PRIVATE_TOKEN) + # TODO: choices... + arg_parser.add_argument('--default_file_action', default="none") + arg_parser.add_argument('--file_action_config', default=None) + arg_parser.add_argument('--transport', default=None, choices=["urllib", "curl"], help=HELP_TRANSPORT) # set to curl to use pycurl + arg_parser.add_argument('--suppress_output', default=False, action="store_true", help=HELP_SUPPRESS_OUTPUT) + arg_parser.add_argument('--disable_cleanup', dest="cleanup", default=True, action="store_false", help=HELP_DISABLE_CLEANUP) + arg_parser.add_argument('--server', default=False, action="store_true", help=HELP_SERVER) + arg_parser.add_argument('--job_id', default=None, help=HELP_JOB_ID) + arg_parser.add_argument('--command', help=HELP_COMMAND) + arg_parser.add_argument('--working_directory', default=".", help=HELP_WORKING_DIRECTORY) + arg_parser.add_argument('--result_json', default=None) + arg_parser.add_argument('--output', default=[], action="append", help=HELP_OUTPUT) + arg_parser.add_argument('--output_pattern', default=[], action="append", help=HELP_OUTPUT_PATTERN) + + args = arg_parser.parse_args(argv) + if args.server: + return run_server_for_job(args) + else: + failed = _run_client_for_job(args) + if failed: + return 1 + else: + return 0 + + +def _run_client_for_job(args): + if args.job_id is None: + args.job_id = str(uuid.uuid4()) + output_patterns = [] + output_patterns.extend(args.output_pattern) + for output in args.output: + output_patterns.append(fnmatch.translate(output)) + + client_options = extract_client_options(args) + client, client_manager = client_info(args, client_options) + try: + working_directory = args.working_directory + client_outputs = ClientOutputs( + working_directory=working_directory, + dynamic_outputs=output_patterns, + ) + job_description = ClientJobDescription( + command_line=args.command, + working_directory=working_directory, + client_outputs=client_outputs, + ) + submit_job(client, job_description) + waiter = Waiter(client, client_manager) + result_status = waiter.wait() + pulsar_outputs = PulsarOutputs.from_status_response(result_status) + if args.result_json: + open(args.result_json, "w").write(json_dumps(result_status)) + finish_args = dict( + client=client, + job_completed_normally=True, + cleanup_job=args.cleanup, + client_outputs=client_outputs, + pulsar_outputs=pulsar_outputs, + ) + failed = finish_job(**finish_args) + return failed + finally: + client_manager.shutdown() + + +if __name__ == "__main__": + main() diff --git a/pulsar/scripts/submit.py b/pulsar/scripts/submit.py index 449d90d277b858ca9f42fc16fca406556bfc56c7..2fdbe2c00b8f81659fd8bc37d5e878dfe2cadc4f 100644 --- a/pulsar/scripts/submit.py +++ b/pulsar/scripts/submit.py @@ -1,71 +1,21 @@ -import time -import json +"""Submit a job and wait for it. +""" +import sys from pulsar.main import ArgumentParser -from pulsar.client.util import from_base64_json -from pulsar.main import ( - load_pulsar_app, - PulsarManagerConfigBuilder +from pulsar.scripts.submit_util import ( + run_server_for_job, + add_common_submit_args, ) -from pulsar.manager_endpoint_util import submit_job -from pulsar.managers.status import is_job_done - -import logging -log = logging.getLogger(__name__) - -DESCRIPTION = "Submit a job and wait for it." -DEFAULT_POLL_TIME = 2 def main(args=None): - arg_parser = ArgumentParser(description=DESCRIPTION) - arg_parser.add_argument("--file", default=None) - arg_parser.add_argument("--base64", default=None) - PulsarManagerConfigBuilder.populate_options(arg_parser) + mod_docstring = sys.modules[__name__].__doc__ + arg_parser = ArgumentParser(description=mod_docstring) + add_common_submit_args(arg_parser) args = arg_parser.parse_args(args) + run_server_for_job(args) - config_builder = PulsarManagerConfigBuilder(args) - manager, app = manager_from_args(config_builder) - try: - job_config = __load_job_config(args) - submit_job(manager, job_config) - wait_for_job(manager, job_config) - except BaseException: - log.exception("Failure submitting or waiting on job.") - finally: - app.shutdown() - - -def wait_for_job(manager, job_config, poll_time=DEFAULT_POLL_TIME): - job_id = job_config.get('job_id') - while True: - status = manager.get_status(job_id) - if is_job_done(status): - break - time.sleep(poll_time) - - -def __load_job_config(args): - if args.base64: - base64_job_config = args.base64 - job_config = from_base64_json(base64_job_config) - else: - job_config = json.load(open(args.file, "r")) - return job_config - - -def manager_from_args(config_builder): - manager_name = config_builder.manager - - pulsar_app = load_pulsar_app( - config_builder, - # Set message_queue_consume so this Pulsar app doesn't try to consume - # setup/kill messages and only publishes status updates to configured - # queue. - message_queue_consume=False, - ) - manager = pulsar_app.managers[manager_name] - return manager, pulsar_app if __name__ == "__main__": main() diff --git a/pulsar/scripts/submit_util.py b/pulsar/scripts/submit_util.py new file mode 100644 index 0000000000000000000000000000000000000000..87ce6df17f25482385e9ce7b86b7b8548626be48 --- /dev/null +++ b/pulsar/scripts/submit_util.py @@ -0,0 +1,68 @@ +""" CLI related utilities for submitting Pulsar jobs. +""" +import time +import json + +from pulsar.client.util import from_base64_json +from pulsar.main import ( + load_pulsar_app, + PulsarManagerConfigBuilder +) +from pulsar.manager_endpoint_util import submit_job +from pulsar.managers.status import is_job_done + +import logging +log = logging.getLogger(__name__) + +DEFAULT_POLL_TIME = 2 + + +def add_common_submit_args(arg_parser): + arg_parser.add_argument("--file", default=None) + arg_parser.add_argument("--base64", default=None) + PulsarManagerConfigBuilder.populate_options(arg_parser) + + +def run_server_for_job(args): + config_builder = PulsarManagerConfigBuilder(args) + manager, app = manager_from_args(config_builder) + try: + job_config = _load_job_config(args) + submit_job(manager, job_config) + wait_for_job(manager, job_config) + except BaseException: + log.exception("Failure submitting or waiting on job.") + finally: + app.shutdown() + + +def wait_for_job(manager, job_config, poll_time=DEFAULT_POLL_TIME): + job_id = job_config.get('job_id') + while True: + status = manager.get_status(job_id) + if is_job_done(status): + break + time.sleep(poll_time) + + +def _load_job_config(args): + if args.base64: + base64_job_config = args.base64 + job_config = from_base64_json(base64_job_config) + else: + job_config = json.load(open(args.file, "r")) + return job_config + + +def manager_from_args(config_builder): + manager_name = config_builder.manager + + pulsar_app = load_pulsar_app( + config_builder, + # Set message_queue_consume so this Pulsar app doesn't try to consume + # setup/kill messages and only publishes status updates to configured + # queue. + message_queue_consume=False, + ) + manager = pulsar_app.managers[manager_name] + return manager, pulsar_app diff --git a/setup.py b/setup.py index 7eb8513741cb9f984fa738268b8b84f58571797a..a556a008ed5d061fa567d51222ddf09f729e6ff7 100644 --- a/setup.py +++ b/setup.py @@ -95,6 +95,7 @@ setup( pulsar-drmaa-launch=pulsar.scripts.drmaa_launch:main pulsar-drmaa-kill=pulsar.scripts.drmaa_kill:main pulsar-chown-working-directory=pulsar.scripts.chown_working_directory:main + pulsar-submit=pulsar.scripts.run:main ''', scripts=scripts, package_data={'pulsar': [ diff --git a/test/cli_help_tests.py b/test/cli_help_tests.py index 693535a68f4d61b77fb506fea7fe717c2de5b220..2eb81e63f5b93b9d3058e38bb59f92d31609d7b3 100644 --- a/test/cli_help_tests.py +++ b/test/cli_help_tests.py @@ -4,6 +4,7 @@ import pulsar.scripts.drmaa_launch import pulsar.scripts.mesos_executor import pulsar.scripts.mesos_framework import pulsar.scripts.submit +import pulsar.client.test.check MODULES = [ pulsar.scripts.drmaa_kill, @@ -11,6 +12,7 @@ MODULES = [ pulsar.scripts.mesos_executor, pulsar.scripts.mesos_framework, pulsar.scripts.submit, + pulsar.client.test.check, ] diff --git a/test/script_run_test.py b/test/script_run_test.py new file mode 100644 index 0000000000000000000000000000000000000000..5998e1536f9be6ff7db7a3adad451d285d62568c --- /dev/null +++ b/test/script_run_test.py @@ -0,0 +1,60 @@ +from __future__ import print_function +import os +from .test_utils import ( + TempDirectoryTestCase, + test_pulsar_server, +) +from pulsar.scripts import run + + +class ScriptRunTestCase(TempDirectoryTestCase): + + def setUp(self): + super(ScriptRunTestCase, self).setUp() + input1 = os.path.join(self._working_directory, "input1") + open(input1, "w").write("Hello World!") + self.input1 = input1 + + def simple_test(self): + app_conf = {} + with test_pulsar_server(app_conf=app_conf) as server: + url = server.application_url + self._run([ + "--url", url, + "--default_file_action", "transfer", + ]) + self._check_outputs() + + def _run(self, pulsar_args): + run_args = pulsar_args[:] + run_args.extend([ + "--command", "echo `pwd` > output1; cp input1 output_test2", + "--working_directory", self._working_directory, + "--output", "output1", + "--output_pattern", "output_test\d", + "--result_json", self._result, + ]) + exit_code = run.main(run_args) + if os.path.exists(self._result): + print(open(self._result, "r").read()) + else: + assert False, "No result json file found" + assert exit_code == 0 + + def _check_outputs(self): + output1 = os.path.join(self._working_directory, "output1") + output_test2 = os.path.join(self._working_directory, "output_test2") + # Prove it went somewhere else :) + assert open(output1, "r").read() != self._working_directory + assert open(output_test2, "r").read() == "Hello World!" + + @property + def _result(self): + return os.path.join(self.temp_directory, "r.json") + + @property + def _working_directory(self): + work_dir = os.path.join(self.temp_directory, "work") + if not os.path.exists(work_dir): + os.makedirs(work_dir) + return work_dir