diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000000000000000000000000000000000000..c383ae144a167b8236686dc59c2f6b19f1c21dba --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,14 @@ +[settings] +combine_as_imports=true +extend_skip=doc/*,scripts/* +force_alphabetical_sort_within_sections=true +# Override force_grid_wrap value from profile=black, but black is still happy +force_grid_wrap=2 +line_length=140 +no_lines_before=LOCALFOLDER +profile=black +reverse_relative=true +skip_gitignore=true +# Make isort run faster by skipping database +skip_glob=database/* +src_paths=pulsar,test diff --git a/Makefile b/Makefile index c318eb6f500b22dc4a7a7467c504a99595d48423..6bbc78b567bc810951bd214546c3c3cbad28e4e5 100644 --- a/Makefile +++ b/Makefile @@ -128,6 +128,9 @@ open-rtd: docs open-project: open $(PROJECT_URL) || xdg-open $(PROJECT_URL) +format: ## Format Python code base + $(IN_VENV) isort . + dist: clean-build clean-pyc $(IN_VENV) python setup.py sdist bdist_wheel ls -l dist diff --git a/pulsar/client/__init__.py b/pulsar/client/__init__.py index cac005fc42d440dc63791d9411f8d83ed00d88e6..734ddb8aaa270dea195806285999b3529b4bc403 100644 --- a/pulsar/client/__init__.py +++ b/pulsar/client/__init__.py @@ -45,11 +45,11 @@ from .exceptions import PulsarClientTransportError from .manager import build_client_manager from .path_mapper import PathMapper from .staging import ( - ClientJobDescription, - ClientInputs, + CLIENT_INPUT_PATH_TYPES, ClientInput, + ClientInputs, + ClientJobDescription, ClientOutputs, - CLIENT_INPUT_PATH_TYPES, EXTENDED_METADATA_DYNAMIC_COLLECTION_PATTERN, PulsarOutputs, ) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ed289d18750856fadc92fadac20152ea08731027..f6ae3dc45b424cbfb3a497d13d0810bf9ef7d7be 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -1,9 +1,10 @@ import fnmatch import tempfile - from contextlib import contextmanager -from os import makedirs -from os import unlink +from os import ( + makedirs, + unlink, +) from os.path import ( abspath, basename, @@ -12,25 +13,34 @@ from os.path import ( join, sep, ) -from re import compile, escape -from typing import Any, Dict, List, Type +from re import ( + compile, + escape, +) +from typing import ( + Any, + Dict, + List, + Type, +) from urllib.parse import urlencode from galaxy.util.bunch import Bunch from .config_util import read_file -from .transport import get_file -from .transport import post_file from .transport import ( + get_file, + post_file, rsync_get_file, rsync_post_file, scp_get_file, scp_post_file, ) -from .util import copy_to_path -from .util import directory_files -from .util import unique_path_prefix - +from .util import ( + copy_to_path, + directory_files, + unique_path_prefix, +) DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? DEFAULT_PATH_MAPPER_TYPE = 'prefix' diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 5009f5f2fc94250905a700b8e65b1ac3a29624d7..edc274585c5a987e0d755973ccfe7209bc9bebb3 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -3,8 +3,10 @@ import logging import socket import threading import uuid - -from time import sleep, time +from time import ( + sleep, + time, +) try: import kombu diff --git a/pulsar/client/amqp_exchange_factory.py b/pulsar/client/amqp_exchange_factory.py index c36242c44fb29bdcda20b583787f937db48ffab0..65b3ae9e6acebdf218d627498084fc678506c24c 100644 --- a/pulsar/client/amqp_exchange_factory.py +++ b/pulsar/client/amqp_exchange_factory.py @@ -1,5 +1,8 @@ from .amqp_exchange import PulsarExchange -from .util import filter_destination_params, MessageQueueUUIDStore +from .util import ( + filter_destination_params, + MessageQueueUUIDStore, +) def get_exchange(url, manager_name, params): diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 2f56b27592894ff2ed219e2082eab9e62a9c3982..a214b1dd31fad6beaab0f83a6cb6bfa84b51debb 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -2,32 +2,36 @@ import logging import os from pulsar.managers.util.pykube_util import ( + delete_job, ensure_pykube, find_job_object_by_name, find_pod_object_by_name, galaxy_instance_id, Job, job_object_dict, - produce_unique_k8s_job_name, + produce_k8s_job_prefix, pull_policy, pykube_client_from_dict, - stop_job, ) from .action_mapper import ( actions, path_type, ) from .amqp_exchange import ACK_FORCE_NOACK_KEY -from .decorators import parseJson -from .decorators import retry +from .decorators import ( + parseJson, + retry, +) from .destination import submit_params from .job_directory import RemoteJobDirectory from .setup_handler import build as build_setup_handler -from .util import copy -from .util import ensure_directory -from .util import json_dumps -from .util import json_loads -from .util import to_base64_json +from .util import ( + copy, + ensure_directory, + json_dumps, + json_loads, + to_base64_json, +) log = logging.getLogger(__name__) @@ -475,7 +479,7 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): base64_message = to_base64_json(launch_params) base64_app_conf = to_base64_json(pulsar_app_config) - job_name = self._k8s_job_name + k8s_job_prefix = self._k8s_job_prefix params = self.destination_params pulsar_container_image = self.pulsar_container_image @@ -523,7 +527,7 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): template = { "metadata": { - "labels": {"app": job_name}, + "labels": {"app": k8s_job_prefix}, }, "spec": { "volumes": volumes, @@ -534,23 +538,23 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): spec = {"template": template} if "k8s_walltime_limit" in params: spec["activeDeadlineSeconds"] = int(params["k8s_walltime_limit"]) - k8s_job_obj = job_object_dict(params, job_name, spec) + k8s_job_obj = job_object_dict(params, k8s_job_prefix, spec) pykube_client = self._pykube_client job = Job(pykube_client, k8s_job_obj) job.create() def kill(self): - job_name = self._k8s_job_name + job_name = self._k8s_job_prefix pykube_client = self._pykube_client job = find_job_object_by_name(pykube_client, job_name) if job: log.info("Kill k8s job with name %s" % job_name) - stop_job(job) + delete_job(job) else: log.info("Attempted to kill k8s job but it is unavailable.") def job_ip(self): - job_name = self._k8s_job_name + job_name = self._k8s_job_prefix pykube_client = self._pykube_client pod = find_pod_object_by_name(pykube_client, job_name) if pod: @@ -569,10 +573,10 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): return pykube_client_from_dict(self.destination_params) @property - def _k8s_job_name(self): + def _k8s_job_prefix(self): job_id = self.job_id - job_name = produce_unique_k8s_job_name(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) - return job_name + job_prefix = produce_k8s_job_prefix(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) + return job_prefix def _pulsar_container_resources(self, params): return self._container_resources(params, container='pulsar') diff --git a/pulsar/client/config_util.py b/pulsar/client/config_util.py index 5b57b913a0e0854eebd00a47293bbed023ecc7e0..8c74e5402ca34427d6b9314a749c22c18f49965c 100644 --- a/pulsar/client/config_util.py +++ b/pulsar/client/config_util.py @@ -7,9 +7,8 @@ try: import yaml except ImportError: yaml = None # type: ignore -from configparser import ConfigParser import json - +from configparser import ConfigParser CONFIG_TYPE_JSON = "json" CONFIG_TYPE_YAML = "yaml" diff --git a/pulsar/client/job_directory.py b/pulsar/client/job_directory.py index d93502c3cdc6773925caa921475bd7ba2e0dd3cb..6295727a446d4cad955a592cbbb0d61db5be051d 100644 --- a/pulsar/client/job_directory.py +++ b/pulsar/client/job_directory.py @@ -2,7 +2,6 @@ """ import os.path import posixpath - from collections import deque from glob import glob from logging import getLogger @@ -10,6 +9,7 @@ from logging import getLogger from galaxy.util import in_directory from .util import PathHelper + log = getLogger(__name__) diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 47e0b89c4c804f4740f9b5d0947f67c57f6c6ebb..630f0a0f19f93aed7ab69afc5cbeed027c91bc4b 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -7,32 +7,34 @@ specific actions. import functools import threading -from typing import Any, Dict, Type - from logging import getLogger from os import getenv from queue import Queue +from typing import ( + Any, + Dict, + Type, +) from .amqp_exchange_factory import get_exchange from .client import ( BaseJobClient, InputCachingJobClient, JobClient, - MessageJobClient, MessageCLIJobClient, MessageCoexecutionPodJobClient, + MessageJobClient, ) from .destination import url_to_destination_params +from .object_client import ObjectStoreClient from .server_interface import ( HttpPulsarInterface, LocalPulsarInterface, PulsarInterface, ) -from .object_client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager - log = getLogger(__name__) DEFAULT_TRANSFER_THREADS = 2 diff --git a/pulsar/client/path_mapper.py b/pulsar/client/path_mapper.py index c3350d069b0ff80a0b38430940da7b63a6207c30..50b3d5e7bc7f5569d4cc66a5b33a28603e0d9816 100644 --- a/pulsar/client/path_mapper.py +++ b/pulsar/client/path_mapper.py @@ -2,8 +2,10 @@ import os.path from galaxy.util import in_directory -from .action_mapper import FileActionMapper -from .action_mapper import path_type +from .action_mapper import ( + FileActionMapper, + path_type, +) from .staging import CLIENT_INPUT_PATH_TYPES from .util import PathHelper diff --git a/pulsar/client/server_interface.py b/pulsar/client/server_interface.py index d5d39a63adf9e8c9d1f83cbb31df2baf7447f451..d4b2434c6287de7ed1bea74e708f7fcd7b439af5 100644 --- a/pulsar/client/server_interface.py +++ b/pulsar/client/server_interface.py @@ -1,9 +1,14 @@ import logging -from abc import ABCMeta -from abc import abstractmethod +from abc import ( + ABCMeta, + abstractmethod, +) from io import BytesIO from string import Template -from urllib.parse import urlencode, urljoin +from urllib.parse import ( + urlencode, + urljoin, +) from galaxy.util import unicodify diff --git a/pulsar/client/setup_handler.py b/pulsar/client/setup_handler.py index 38f1dc18f0e1b152b89a2854d5d9c16dbe721309..65710ef96d07e157245aa02a4e34ddb2b0951f40 100644 --- a/pulsar/client/setup_handler.py +++ b/pulsar/client/setup_handler.py @@ -2,7 +2,6 @@ import os from uuid import uuid4 from pulsar import __version__ as pulsar_version - from .util import filter_destination_params REMOTE_SYSTEM_PROPERTY_PREFIX = "remote_property_" diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index afce33474be85399178b56003ca9f4f497adff89..4b8aa1b8bb3b3ae799b1b84b4ac346eb23929e7a 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -1,6 +1,5 @@ import re from enum import Enum - from os import sep from os.path import ( basename, diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index bfb9b7af30499723f0cfcd57d48d8315cb648d39..c14a9beccf06078d9dbe01cfd56bc37db1750593 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -1,14 +1,16 @@ """Code run on the client side for unstaging complete Pulsar jobs.""" import fnmatch from contextlib import contextmanager -from logging import getLogger -from os.path import join, relpath from json import loads +from logging import getLogger +from os.path import ( + join, + relpath, +) from ..action_mapper import FileActionMapper from ..staging import COMMAND_VERSION_FILENAME - log = getLogger(__name__) diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index a111746bcffdcd28f59c60f3b2d96f574b8b262d..00246d012418ee32ae70e5734eee5850bcb553fd 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -9,15 +9,25 @@ from os.path import ( join, relpath, ) -from re import escape, findall +from re import ( + escape, + findall, +) -from ..action_mapper import FileActionMapper -from ..action_mapper import MessageAction -from ..action_mapper import path_type +from ..action_mapper import ( + FileActionMapper, + MessageAction, + path_type, +) from ..job_directory import RemoteJobDirectory -from ..staging import CLIENT_INPUT_PATH_TYPES, COMMAND_VERSION_FILENAME -from ..util import directory_files -from ..util import PathHelper +from ..staging import ( + CLIENT_INPUT_PATH_TYPES, + COMMAND_VERSION_FILENAME, +) +from ..util import ( + directory_files, + PathHelper, +) log = getLogger(__name__) diff --git a/pulsar/client/transport/__init__.py b/pulsar/client/transport/__init__.py index a286c905cdae8293880a5ee85d7331dc63a87f2f..c2a8a59458d5b5038dbb8a7c968850a9a404a24d 100644 --- a/pulsar/client/transport/__init__.py +++ b/pulsar/client/transport/__init__.py @@ -1,21 +1,33 @@ import os -from .curl import curl_available -from .curl import PycurlTransport +from .curl import ( + curl_available, + PycurlTransport, +) from .requests import requests_multipart_post_available -from .ssh import rsync_get_file, scp_get_file -from .ssh import rsync_post_file, scp_post_file +from .ssh import ( + rsync_get_file, + rsync_post_file, + scp_get_file, + scp_post_file, +) from .standard import UrllibTransport if curl_available: - from .curl import get_file - from .curl import post_file + from .curl import ( + get_file, + post_file, + ) elif requests_multipart_post_available: - from .requests import get_file - from .requests import post_file + from .requests import ( + get_file, + post_file, + ) else: - from .poster import get_file - from .poster import post_file + from .poster import ( + get_file, + post_file, + ) def get_transport(transport_type=None, os_module=os, transport_params=None): diff --git a/pulsar/client/transport/curl.py b/pulsar/client/transport/curl.py index 4eb9441c989745e5f68fa256902d3378772d9969..49dea4478e02c922799c1d312052b22e4b1702f5 100644 --- a/pulsar/client/transport/curl.py +++ b/pulsar/client/transport/curl.py @@ -4,14 +4,17 @@ import os.path try: import pycurl - from pycurl import Curl, HTTP_CODE, error + from pycurl import ( + Curl, + error, + HTTP_CODE, + ) curl_available = True except ImportError: curl_available = False from ..exceptions import PulsarClientTransportError - PYCURL_UNAVAILABLE_MESSAGE = \ "You are attempting to use the Pycurl version of the Pulsar client but pycurl is unavailable." diff --git a/pulsar/client/transport/standard.py b/pulsar/client/transport/standard.py index 60d8ff963b493e9f75340718675e942dc2182ae8..2bd472f3a9c531bc53afb67d3dd77609bca8a9c4 100644 --- a/pulsar/client/transport/standard.py +++ b/pulsar/client/transport/standard.py @@ -5,10 +5,12 @@ Pulsar HTTP Client layer based on Python Standard Library (urllib) import mmap import socket - from os.path import getsize -from urllib.request import Request, urlopen from urllib.error import URLError +from urllib.request import ( + Request, + urlopen, +) from ..exceptions import PulsarClientTransportError diff --git a/pulsar/client/util.py b/pulsar/client/util.py index 743b7cc469838a9a419219ad814324fd1650b03b..c01abfa2e44685246e3f78cc472b0c8083f193b5 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -2,8 +2,14 @@ import hashlib import json import os.path import shutil -from base64 import b64decode as _b64decode, b64encode as _b64encode -from errno import EEXIST, ENOENT +from base64 import ( + b64decode as _b64decode, + b64encode as _b64encode, +) +from errno import ( + EEXIST, + ENOENT, +) from functools import wraps from os import ( curdir, @@ -18,7 +24,10 @@ from os.path import ( join, relpath, ) -from threading import Event, Lock +from threading import ( + Event, + Lock, +) from weakref import WeakValueDictionary # TODO: move to galaxy.util so it doesn't have to be duplicated diff --git a/pulsar/core.py b/pulsar/core.py index 204cd2ff3c0acb2b08e4659ae7cb094461f4894d..4f12406741e9e60245fb578d6d8bd549abc1ac13 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -1,19 +1,20 @@ """ """ import os +from logging import getLogger from tempfile import tempdir -from pulsar.manager_factory import build_managers -from pulsar.cache import Cache -from pulsar.tools import ToolBox -from pulsar.tools.authorization import get_authorizer -from pulsar import messaging +from galaxy.job_metrics import JobMetrics from galaxy.objectstore import build_object_store_from_config from galaxy.tool_util.deps import build_dependency_manager -from galaxy.job_metrics import JobMetrics from galaxy.util.bunch import Bunch -from logging import getLogger +from pulsar import messaging +from pulsar.cache import Cache +from pulsar.manager_factory import build_managers +from pulsar.tools import ToolBox +from pulsar.tools.authorization import get_authorizer + log = getLogger(__name__) DEFAULT_PRIVATE_TOKEN = None diff --git a/pulsar/locks.py b/pulsar/locks.py index 25f87e50b33c84d4b7eda7dc6ab3c9aa85f6f4d9..0f5eeeb177f3b48e6eb206a88d16aa2272b15dfb 100644 --- a/pulsar/locks.py +++ b/pulsar/locks.py @@ -3,9 +3,9 @@ try: except ImportError: lockfile = None +import logging import threading -import logging log = logging.getLogger(__name__) NO_PYLOCKFILE_MESSAGE = "pylockfile module not found, skipping experimental lockfile handling." diff --git a/pulsar/main.py b/pulsar/main.py index 77050c77a3bfc08364e8a375e51e22f638c8ce89..e22157a4a5c523f47d2a6dfdc11262fbb061424f 100644 --- a/pulsar/main.py +++ b/pulsar/main.py @@ -17,14 +17,13 @@ This script can be used in a standalone fashion, but it is generally better to run the ``pulsar`` script with ``--mode webless`` - which will in turn delegate to this script. """ +import configparser +import functools import logging -from logging.config import fileConfig - import os -import functools -import time import sys -import configparser +import time +from logging.config import fileConfig try: import yaml @@ -36,8 +35,10 @@ try: except ImportError: Daemonize = None -from argparse import ArgumentParser -from argparse import RawDescriptionHelpFormatter +from argparse import ( + ArgumentParser, + RawDescriptionHelpFormatter, +) log = logging.getLogger(__name__) diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index 7c40a16e15f6cf6c35a9dff4d3e4bf5c841c6517..257ffed8a31fd89309499f75f357d91e608c9f3e 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -7,8 +7,10 @@ import os from pulsar import __version__ as pulsar_version from pulsar.client.setup_handler import build_job_config -from pulsar.managers import status -from pulsar.managers import PULSAR_UNKNOWN_RETURN_CODE +from pulsar.managers import ( + PULSAR_UNKNOWN_RETURN_CODE, + status, +) from pulsar.managers.staging import realized_dynamic_file_sources log = logging.getLogger(__name__) diff --git a/pulsar/manager_factory.py b/pulsar/manager_factory.py index 4264d85f91b575196907c4e0f52ae3ca8f7c20c9..746778e257ada8e19c7b0a75091436445dc8a3af 100644 --- a/pulsar/manager_factory.py +++ b/pulsar/manager_factory.py @@ -94,7 +94,7 @@ def _get_manager_modules(): managers_dir = pulsar.managers.__path__[0] module_names = [] for fname in os.listdir(managers_dir): - if not(fname.startswith("_")) and fname.endswith(".py"): + if not (fname.startswith("_")) and fname.endswith(".py"): manager_module_name = "pulsar.managers.%s" % fname[:-len(".py")] module_names.append(manager_module_name) return module_names diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index 2ab54f600566e13cc31eeac508a83923fdc607ac..e7af36b2f102588168c4afd32735eacd18077aa6 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -1,6 +1,9 @@ """ """ -from abc import ABCMeta, abstractmethod +from abc import ( + ABCMeta, + abstractmethod, +) PULSAR_UNKNOWN_RETURN_CODE = '__unknown__' diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 97ed73fc1ac61b8b2483522d686cc01b086afd23..2bbe0e8ea3293f67dedd30bcf568cdbbd120f3b9 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -6,6 +6,7 @@ import errno import json import logging import os +import platform from os import ( curdir, getenv, @@ -26,8 +27,8 @@ from uuid import uuid4 from pulsar import locks from pulsar.client.job_directory import ( - RemoteJobDirectory, get_mapped_file, + RemoteJobDirectory, ) from pulsar.managers import ManagerInterface @@ -78,6 +79,10 @@ class BaseManager(ManagerInterface): self.job_metrics = app.job_metrics self.object_store = app.object_store + @property + def _is_windows(self) -> bool: + return platform.system().lower() == "windows" + def clean(self, job_id): if self.debug: # In debug mode skip cleaning job directories. @@ -193,7 +198,7 @@ class BaseManager(ManagerInterface): else: return listdir(directory_or_none) - def _expand_command_line(self, command_line, dependencies_description, job_directory=None): + def _expand_command_line(self, job_id, command_line: str, dependencies_description, job_directory=None) -> str: if dependencies_description is None: return command_line diff --git a/pulsar/managers/base/base_drmaa.py b/pulsar/managers/base/base_drmaa.py index 3f0f4e5b465136ab877b22c13f87598bf19e14e1..e966a1de74a808087c8d91943668ae1bca8974ba 100644 --- a/pulsar/managers/base/base_drmaa.py +++ b/pulsar/managers/base/base_drmaa.py @@ -6,12 +6,10 @@ try: except (OSError, ImportError, RuntimeError): JobState = None +from pulsar.managers import status from .external import ExternalBaseManager from ..util.drmaa import DrmaaSessionFactory -from pulsar.managers import status - - log = logging.getLogger(__name__) IGNORE_SUBMISSION_SPEC_MESSAGE = "Submission recieved native_specification but being overridden by manager specification." diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 7816a34f247f9cf5313ba476ea71d7d598342217..98e133e5d8a27291850e498c2eec03c4bf207230 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -3,10 +3,11 @@ import os import stat from galaxy.util import asbool -from pulsar.managers.base import BaseManager + from pulsar.managers import PULSAR_UNKNOWN_RETURN_CODE -from ..util.job_script import job_script +from pulsar.managers.base import BaseManager from ..util.env import env_to_statement +from ..util.job_script import job_script log = logging.getLogger(__name__) @@ -116,9 +117,21 @@ class DirectoryBaseManager(BaseManager): tool_id = job_directory.load_metadata(JOB_FILE_TOOL_ID) return tool_id + def _expand_command_line(self, job_id, command_line: str, dependencies_description, job_directory=None) -> str: + command_line = super()._expand_command_line( + job_id, command_line, dependencies_description, job_directory=job_directory + ) + if not self._is_windows: + rc_path = self._return_code_path(job_id) + CAPTURE_RETURN_CODE = "return_code=$?" + command_line = f"{command_line}; {CAPTURE_RETURN_CODE}; echo $return_code > {rc_path};" + return command_line + # Helpers methods related to setting up job script files. def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[], setup_params=None): - command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) + command_line = self._expand_command_line( + job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory + ) script_env = self._job_template_env(job_id, command_line=command_line, env=env, setup_params=setup_params) script = job_script(**script_env) return self._write_job_script(job_id, script) @@ -137,7 +150,6 @@ class DirectoryBaseManager(BaseManager): return tmp_dir def _job_template_env(self, job_id, command_line=None, env=[], setup_params=None): - return_code_path = self._return_code_path(job_id) # TODO: Add option to ignore remote env. env = env + self.env_vars setup_params = setup_params or {} @@ -148,7 +160,6 @@ class DirectoryBaseManager(BaseManager): 'galaxy_lib': self._galaxy_lib(), 'preserve_python_environment': setup_params.get('preserve_galaxy_python_environment', False), 'env_setup_commands': env_setup_commands, - 'exit_code_path': return_code_path, 'job_directory': self.job_directory(job_id).job_directory, 'working_directory': self.job_directory(job_id).working_directory(), 'metadata_directory': self.job_directory(job_id).metadata_directory(), diff --git a/pulsar/managers/base/external.py b/pulsar/managers/base/external.py index f050ded1eaf9224e87d03e1c9ed936a013822d3e..35cf6c84f22b158e5d301b0fdbb9a4d74def69f4 100644 --- a/pulsar/managers/base/external.py +++ b/pulsar/managers/base/external.py @@ -2,7 +2,6 @@ import logging from string import Template from pulsar.managers import status - from .directory import DirectoryBaseManager DEFAULT_JOB_NAME_TEMPLATE = "pulsar_$job_id" diff --git a/pulsar/managers/queued_cli.py b/pulsar/managers/queued_cli.py index e9fe3d52d9d80b2e744435142bbbe1b7b32b23a2..bc24dd2f11a41c4da85a4029ee3e2fd7de718cf5 100644 --- a/pulsar/managers/queued_cli.py +++ b/pulsar/managers/queued_cli.py @@ -3,12 +3,16 @@ Pulsar job manager that uses a CLI interface to a job queue (e.g. Torque's qsub, qstat, etc...). """ +from logging import getLogger + from .base.external import ExternalBaseManager +from .util.cli import ( + CliInterface, + split_params, +) from .util.external import parse_external_id -from .util.cli import CliInterface, split_params from .util.job_script import job_script -from logging import getLogger log = getLogger(__name__) @@ -17,7 +21,7 @@ class CliQueueManager(ExternalBaseManager): def __init__(self, name, app, **kwds): super().__init__(name, app, **kwds) - self.cli_interface = CliInterface(code_dir='.') + self.cli_interface = CliInterface() self.shell_params, self.job_params = split_params(kwds) def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): @@ -26,7 +30,9 @@ class CliQueueManager(ExternalBaseManager): stdout_path = self._stdout_path(job_id) stderr_path = self._stderr_path(job_id) job_name = self._job_name(job_id) - command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) + command_line = self._expand_command_line( + job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory + ) job_script_kwargs = self._job_template_env( job_id, command_line=command_line, diff --git a/pulsar/managers/queued_condor.py b/pulsar/managers/queued_condor.py index 96266b96746062bfd1539bab0b5beee6f3d6288d..688aca944f08efa4967a344e6f3f587378c6a190 100644 --- a/pulsar/managers/queued_condor.py +++ b/pulsar/managers/queued_condor.py @@ -1,12 +1,17 @@ -from os.path import exists +from logging import getLogger from os import stat +from os.path import exists -from .util.condor import build_submit_description -from .util.condor import condor_submit, condor_stop, summarize_condor_log, submission_params from .base.external import ExternalBaseManager +from .util.condor import ( + build_submit_description, + condor_stop, + condor_submit, + submission_params, + summarize_condor_log, +) from ..managers import status -from logging import getLogger log = getLogger(__name__) diff --git a/pulsar/managers/queued_drmaa.py b/pulsar/managers/queued_drmaa.py index d4efb9cc0db96d7f56524cc2e47a0c72d217f854..8257f65a08b81a5c9c154a544d4e359ea8d04aa6 100644 --- a/pulsar/managers/queued_drmaa.py +++ b/pulsar/managers/queued_drmaa.py @@ -1,6 +1,7 @@ +import logging + from .base.base_drmaa import BaseDrmaaManager -import logging log = logging.getLogger(__name__) diff --git a/pulsar/managers/queued_drmaa_xsede.py b/pulsar/managers/queued_drmaa_xsede.py index 797bda2d61de4e737d95245487d49e2ab3219253..7641e9248775bb0dded82f3ca4b0a8dae9e1c774 100644 --- a/pulsar/managers/queued_drmaa_xsede.py +++ b/pulsar/managers/queued_drmaa_xsede.py @@ -1,8 +1,13 @@ -from subprocess import check_call, Popen, PIPE, CalledProcessError +import logging +from subprocess import ( + CalledProcessError, + check_call, + PIPE, + Popen, +) from .queued_drmaa import DrmaaQueueManager -import logging log = logging.getLogger(__name__) diff --git a/pulsar/managers/queued_external_drmaa.py b/pulsar/managers/queued_external_drmaa.py index 1614631a082e6166a8734c73b23daf38232d8466..4eb97192e93c851f5471b1879a39411004fcbb52 100644 --- a/pulsar/managers/queued_external_drmaa.py +++ b/pulsar/managers/queued_external_drmaa.py @@ -1,5 +1,5 @@ -from json import dumps from getpass import getuser +from json import dumps from .base.base_drmaa import BaseDrmaaManager from .util.sudo import sudo_popen @@ -11,6 +11,7 @@ except ImportError: from galaxy.tool_util.deps.commands import which from logging import getLogger + log = getLogger(__name__) DEFAULT_CHOWN_WORKING_DIRECTORY_SCRIPT = "scripts/chown_working_directory.bash" diff --git a/pulsar/managers/staging/__init__.py b/pulsar/managers/staging/__init__.py index d2f6962980c43612c411ace497ce653cd21a7a7a..c66b9769394d4b87be1eaa9fe920927d1b53e7ea 100644 --- a/pulsar/managers/staging/__init__.py +++ b/pulsar/managers/staging/__init__.py @@ -3,7 +3,10 @@ preprocessing (currently this means downloading or copying files) and then unsta or send results back to client during postprocessing. """ -from .post import postprocess, realized_dynamic_file_sources +from .post import ( + postprocess, + realized_dynamic_file_sources, +) from .pre import preprocess __all__ = ['preprocess', 'postprocess', 'realized_dynamic_file_sources'] diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 0befd0330c4138bac64bca08d5b4f3dc687b647d..2fe6d06f166eea1261c3a4133c2a4deec4c252e4 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -1,13 +1,15 @@ """ """ +import logging import os -from pulsar.client import action_mapper -from pulsar.client import staging +from pulsar.client import ( + action_mapper, + staging, +) from pulsar.client.staging import PulsarOutputs from pulsar.client.staging.down import ResultsCollector -import logging log = logging.getLogger(__name__) diff --git a/pulsar/managers/staging/pre.py b/pulsar/managers/staging/pre.py index 262e79187b590d8a95c3066310223f14dca941d6..543e9cdfe01a7af3fac3a6ac0ef51757c5e5f034 100644 --- a/pulsar/managers/staging/pre.py +++ b/pulsar/managers/staging/pre.py @@ -1,8 +1,9 @@ """ """ -from pulsar.client.action_mapper import from_dict import logging +from pulsar.client.action_mapper import from_dict + log = logging.getLogger(__name__) diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index bbfde0cfe1030c1c638eb5c2101946f120c135c3..32b843d4dd733f15404acda48e043ce7c9610df0 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -1,8 +1,8 @@ import contextlib import datetime import os -import time import threading +import time try: # If galaxy-lib or Galaxy 19.05 present. @@ -11,14 +11,19 @@ except ImportError: # If galaxy-tool-util or Galaxy 19.09 present. from galaxy.tool_util.deps.dependencies import DependenciesDescription +import logging + from pulsar.client.util import filter_destination_params -from pulsar.managers import ManagerProxy -from pulsar.managers import status +from pulsar.managers import ( + ManagerProxy, + status, +) from pulsar.managers.util.retry import RetryActionExecutor -from .staging import preprocess -from .staging import postprocess +from .staging import ( + postprocess, + preprocess, +) -import logging log = logging.getLogger(__name__) DEFAULT_DO_MONITOR = False diff --git a/pulsar/managers/unqueued.py b/pulsar/managers/unqueued.py index 75e282e7accc78b525a280771d311061de77a228..9a79114850ce9c516a5d7a0141b20f1e3d021be3 100644 --- a/pulsar/managers/unqueued.py +++ b/pulsar/managers/unqueued.py @@ -60,10 +60,12 @@ class BaseUnqueuedManager(DirectoryBaseManager): def _prepare_run(self, job_id, command_line, dependencies_description, env, setup_params=None): self._check_execution_with_tool_file(job_id, command_line) self._record_submission(job_id) - if platform.system().lower() == "windows": + if self._is_windows: # TODO: Don't ignore requirements and env without warning. Ideally # process them or at least warn about them being ignored. - command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) + command_line = self._expand_command_line( + job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory + ) else: command_line = self._setup_job_file( job_id, diff --git a/pulsar/managers/util/__init__.py b/pulsar/managers/util/__init__.py index 6f204e8d81400e79338cde7d590d864083ef96bb..897ac69afebb21ef6b374811ee0e4ad7b00cdca6 100644 --- a/pulsar/managers/util/__init__.py +++ b/pulsar/managers/util/__init__.py @@ -4,6 +4,21 @@ processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the Pulsar. """ from galaxy.util.bunch import Bunch + from .kill import kill_pid -__all__ = ('kill_pid', 'Bunch') +runner_states = Bunch( + WALLTIME_REACHED="walltime_reached", + MEMORY_LIMIT_REACHED="memory_limit_reached", + JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER="Job output not returned from cluster", + UNKNOWN_ERROR="unknown_error", + GLOBAL_WALLTIME_REACHED="global_walltime_reached", + OUTPUT_SIZE_LIMIT="output_size_limit", + TOOL_DETECT_ERROR="tool_detected", # job runner interaction worked fine but the tool indicated error +) + + +__all__ = ( + "kill_pid", + "runner_states", +) diff --git a/pulsar/managers/util/cli/__init__.py b/pulsar/managers/util/cli/__init__.py index 8fcecc87c1d44d9b66de65c2de58b8baba70c173..9b677bea0a8e08ce0e4e23102dbe5e8dd59bd0c4 100644 --- a/pulsar/managers/util/cli/__init__.py +++ b/pulsar/managers/util/cli/__init__.py @@ -1,14 +1,10 @@ """ """ import json -from glob import glob -from os import getcwd -from os.path import ( - basename, - join -) -DEFAULT_SHELL_PLUGIN = 'LocalShell' +from galaxy.util.plugin_config import plugins_dict + +DEFAULT_SHELL_PLUGIN = "LocalShell" ERROR_MESSAGE_NO_JOB_PLUGIN = "No job plugin parameter found, cannot create CLI job interface" ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN = "Failed to find job_plugin of type %s, available types include %s" @@ -20,31 +16,12 @@ class CliInterface: them to specified parameters. """ - def __init__(self, code_dir='lib'): - """ - """ - def __load(module_path, d): - module_pattern = join(join(getcwd(), code_dir, *module_path.split('.')), '*.py') - for file in glob(module_pattern): - if basename(file).startswith('_'): - continue - module_name = '{}.{}'.format(module_path, basename(file).rsplit('.py', 1)[0]) - module = __import__(module_name) - for comp in module_name.split(".")[1:]: - module = getattr(module, comp) - for name in module.__all__: - try: - d[name] = getattr(module, name) - except TypeError: - raise TypeError("Invalid type for name %s" % name) - - self.cli_shells = {} - self.cli_job_interfaces = {} - self.active_cli_shells = {} - + def __init__(self): + """ """ module_prefix = self.__module__ - __load('%s.shell' % module_prefix, self.cli_shells) - __load('%s.job' % module_prefix, self.cli_job_interfaces) + self.cli_shells = plugins_dict(f"{module_prefix}.shell", "__name__") + self.cli_job_interfaces = plugins_dict(f"{module_prefix}.job", "__name__") + self.active_cli_shells = {} def get_plugins(self, shell_params, job_params): """ @@ -56,25 +33,28 @@ class CliInterface: return shell, job_interface def get_shell_plugin(self, shell_params): - shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN) + shell_plugin = shell_params.get("plugin", DEFAULT_SHELL_PLUGIN) requested_shell_settings = json.dumps(shell_params, sort_keys=True) if requested_shell_settings not in self.active_cli_shells: - self.active_cli_shells[requested_shell_settings] = self.cli_shells[shell_plugin](**shell_params) + shell_plugin_class = self.cli_shells.get(shell_plugin) + if not shell_plugin_class: + raise ValueError( + f"Unknown shell_plugin [{shell_plugin}], available plugins are {list(self.cli_shells.keys())}" + ) + self.active_cli_shells[requested_shell_settings] = shell_plugin_class(**shell_params) return self.active_cli_shells[requested_shell_settings] def get_job_interface(self, job_params): - job_plugin = job_params.get('plugin', None) + job_plugin = job_params.get("plugin") if not job_plugin: raise ValueError(ERROR_MESSAGE_NO_JOB_PLUGIN) - job_plugin_class = self.cli_job_interfaces.get(job_plugin, None) + job_plugin_class = self.cli_job_interfaces.get(job_plugin) if not job_plugin_class: raise ValueError(ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN % (job_plugin, list(self.cli_job_interfaces.keys()))) - job_interface = job_plugin_class(**job_params) - - return job_interface + return job_plugin_class(**job_params) def split_params(params): - shell_params = {k.replace('shell_', '', 1): v for k, v in params.items() if k.startswith('shell_')} - job_params = {k.replace('job_', '', 1): v for k, v in params.items() if k.startswith('job_')} + shell_params = {k.replace("shell_", "", 1): v for k, v in params.items() if k.startswith("shell_")} + job_params = {k.replace("job_", "", 1): v for k, v in params.items() if k.startswith("job_")} return shell_params, job_params diff --git a/pulsar/managers/util/cli/factory.py b/pulsar/managers/util/cli/factory.py index 89554931d4ca4c5260ab341285eeeb0e175e3e8a..e7fe72b3c37746ba81ac89ef4c3aa78522780701 100644 --- a/pulsar/managers/util/cli/factory.py +++ b/pulsar/managers/util/cli/factory.py @@ -1,19 +1,17 @@ try: from galaxy.jobs.runners.util.cli import ( CliInterface, - split_params + split_params, ) - code_dir = 'lib' except ImportError: - from pulsar.managers.util.cli import ( + from pulsar.managers.util.cli import ( # type: ignore[no-redef] CliInterface, - split_params + split_params, ) - code_dir = '.' def build_cli_interface(): - return CliInterface(code_dir=code_dir) + return CliInterface() def get_shell(params): diff --git a/pulsar/managers/util/cli/job/__init__.py b/pulsar/managers/util/cli/job/__init__.py index 3eef5d390fd6f9b23214a0b03624539b66104542..829724143edbeccd46384540c951b41c8c8929c1 100644 --- a/pulsar/managers/util/cli/job/__init__.py +++ b/pulsar/managers/util/cli/job/__init__.py @@ -3,20 +3,33 @@ Abstract base class for cli job plugins. """ from abc import ( ABCMeta, - abstractmethod + abstractmethod, ) +from enum import Enum +try: + from galaxy.model import Job -class BaseJobExec(metaclass=ABCMeta): + job_states = Job.states +except ImportError: - @abstractmethod + # Not in Galaxy, map Galaxy job states to Pulsar ones. + class job_states(str, Enum): # type: ignore[no-redef] + RUNNING = "running" + OK = "complete" + QUEUED = "queued" + ERROR = "failed" + + +class BaseJobExec(metaclass=ABCMeta): def __init__(self, **params): """ Constructor for CLI job executor. """ + self.params = params.copy() def job_script_kwargs(self, ofile, efile, job_name): - """ Return extra keyword argument for consumption by job script + """Return extra keyword argument for consumption by job script module. """ return {} @@ -70,3 +83,9 @@ class BaseJobExec(metaclass=ABCMeta): Parses the failure reason, assigning it against a """ return None + + +__all__ = ( + "BaseJobExec", + "job_states", +) diff --git a/pulsar/managers/util/cli/job/lsf.py b/pulsar/managers/util/cli/job/lsf.py index 6904b2c107d99a2ed148f7f8624395147e3896f2..68c96345096a1606da345ecfcd574409d9474b8d 100644 --- a/pulsar/managers/util/cli/job/lsf.py +++ b/pulsar/managers/util/cli/job/lsf.py @@ -1,57 +1,50 @@ # A simple CLI runner for slurm that can be used when running Galaxy from a # non-submit host and using a Slurm cluster. from logging import getLogger +from os import path -try: - from galaxy.model import Job - job_states = Job.states -except ImportError: - # Not in Galaxy, map Galaxy job states to Pulsar ones. - from pulsar.util import enum - job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed") -from ..job import BaseJobExec +from ..job import ( + BaseJobExec, + job_states, +) +from ... import runner_states log = getLogger(__name__) argmap = { - 'memory': '-M', # There is code in job_script_kwargs relying on this name's setting - 'cores': '-n', - 'queue': '-q', - 'working_dir': '-cwd', - 'project': '-P' + "memory": "-M", # There is code in job_script_kwargs relying on this name's setting + "cores": "-n", + "queue": "-q", + "working_dir": "-cwd", + "project": "-P", } class LSF(BaseJobExec): - - def __init__(self, **params): - self.params = {} - for k, v in params.items(): - self.params[k] = v - def job_script_kwargs(self, ofile, efile, job_name): - scriptargs = {'-o': ofile, - '-e': efile, - '-J': job_name} + scriptargs = {"-o": ofile, "-e": efile, "-J": job_name} # Map arguments using argmap. for k, v in self.params.items(): - if k == 'plugin': + if k == "plugin" or k == "excluded_hosts": continue try: - if k == 'memory': + if k == "memory": # Memory requires both -m and -R rusage[mem=v] request - scriptargs['-R'] = "\"rusage[mem=%s]\"" % v - if not k.startswith('-'): + scriptargs["-R"] = f'"rusage[mem={v}]"' + if not k.startswith("-"): k = argmap[k] scriptargs[k] = v except Exception: - log.warning('Unrecognized long argument passed to LSF CLI plugin: %s' % k) + log.warning(f"Unrecognized long argument passed to LSF CLI plugin: {k}") # Generated template. - template_scriptargs = '' + template_scriptargs = "" for k, v in scriptargs.items(): - template_scriptargs += '#BSUB {} {}\n'.format(k, v) + template_scriptargs += f"#BSUB {k} {v}\n" + # Excluded hosts use the same -R option already in use for mem, so easier adding here. + for host in self._get_excluded_hosts(): + template_scriptargs += f"#BSUB -R \"select[hname!='{host}']\"\n" return dict(headers=template_scriptargs) def submit(self, script_file): @@ -62,13 +55,13 @@ class LSF(BaseJobExec): return "bsub <%s | awk '{ print $2}' | sed 's/[<>]//g'" % script_file def delete(self, job_id): - return 'bkill %s' % job_id + return f"bkill {job_id}" def get_status(self, job_ids=None): - return "bjobs -a -o \"id stat\" -noheader" # check this + return 'bjobs -a -o "id stat" -noheader' # check this def get_single_status(self, job_id): - return "bjobs -o stat -noheader " + job_id + return f"bjobs -o stat -noheader {job_id}" def parse_status(self, status, job_ids): # Get status for each job, skipping header. @@ -88,12 +81,12 @@ class LSF(BaseJobExec): # which would be badly handled here. So this only works well when Galaxy # is constantly monitoring the jobs. The logic here is that DONE jobs get forgotten # faster than failed jobs. - log.warning("Job id '%s' not found LSF status check" % job_id) + log.warning(f"Job id '{job_id}' not found LSF status check") return job_states.OK return self._get_job_state(status) def get_failure_reason(self, job_id): - return "bjobs -l " + job_id + return f"bjobs -l {job_id}" def parse_failure_reason(self, reason, job_id): # LSF will produce the following in the job output file: @@ -101,8 +94,7 @@ class LSF(BaseJobExec): # Exited with exit code 143. for line in reason.splitlines(): if "TERM_MEMLIMIT" in line: - from galaxy.jobs import JobState - return JobState.runner_states.MEMORY_LIMIT_REACHED + return runner_states.MEMORY_LIMIT_REACHED return None def _get_job_state(self, state): @@ -111,19 +103,44 @@ class LSF(BaseJobExec): # https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html try: return { - 'EXIT': job_states.ERROR, - 'RUN': job_states.RUNNING, - 'PEND': job_states.QUEUED, - 'DONE': job_states.OK, - 'PSUSP': job_states.ERROR, - 'USUSP': job_states.ERROR, - 'SSUSP': job_states.ERROR, - 'UNKWN': job_states.ERROR, - 'WAIT': job_states.QUEUED, - 'ZOMBI': job_states.ERROR + "EXIT": job_states.ERROR, + "RUN": job_states.RUNNING, + "PEND": job_states.QUEUED, + "DONE": job_states.OK, + "PSUSP": job_states.ERROR, + "USUSP": job_states.ERROR, + "SSUSP": job_states.ERROR, + "UNKWN": job_states.ERROR, + "WAIT": job_states.QUEUED, + "ZOMBI": job_states.ERROR, }.get(state) except KeyError: - raise KeyError("Failed to map LSF status code [%s] to job state." % state) - - -__all__ = ('LSF',) + raise KeyError(f"Failed to map LSF status code [{state}] to job state.") + + def _get_excluded_hosts(self): + """ + Reads a file in the set path with one node name per line. All these nodes will be added + to the exclusion list for execution. + + The path can be added to destinations like this: + + <destination id="lsf_8cpu_16GbRam" runner="cli"> + <param id="shell_plugin">LocalShell</param> + <param id="job_plugin">LSF</param> + <param id="job_memory">16000</param> + <param id="job_cores">7</param> + <param id="job_excluded_hosts">/path/to/file/with/hosts/to/exclude/one/per/line.txt</param> + </destination> + + :param pathExcludedNodes: + :return: list with node names + """ + if "excluded_hosts" in self.params: + path_excluded = self.params["excluded_hosts"] + if path.isfile(path_excluded): + with open(path_excluded) as f: + return f.read().splitlines() + return [] + + +__all__ = ("LSF",) diff --git a/pulsar/managers/util/cli/job/pbs.py b/pulsar/managers/util/cli/job/pbs.py new file mode 100644 index 0000000000000000000000000000000000000000..b885a2de1188926973e699d40b87f8443de80c95 --- /dev/null +++ b/pulsar/managers/util/cli/job/pbs.py @@ -0,0 +1,32 @@ +import json +from logging import getLogger + +from .torque import Torque + +log = getLogger(__name__) + + +class OpenPBS(Torque): + + ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to OpenPBS CLI plugin: %s" + + def get_status(self, job_ids=None): + return "qstat -f -F json" + + def get_single_status(self, job_id): + return f"qstat -f {job_id}" + + def parse_status(self, status, job_ids): + try: + data = json.loads(status) + except Exception: + log.warning(f"No valid qstat JSON return from `qstat -f -F json`, got the following: {status}") + rval = {} + for job_id, job in data.get("Jobs", {}).items(): + if job_id in job_ids: + # map PBS job states to Galaxy job states. + rval[id] = self._get_job_state(job["job_state"]) + return rval + + +__all__ = ("OpenPBS",) diff --git a/pulsar/managers/util/cli/job/slurm.py b/pulsar/managers/util/cli/job/slurm.py index 618ebc3b2a444be9a8f80a0b569d85063b17422e..5b694b132537905a1d01c3ddbf8ca10ff17e8ea0 100644 --- a/pulsar/managers/util/cli/job/slurm.py +++ b/pulsar/managers/util/cli/job/slurm.py @@ -2,65 +2,48 @@ # non-submit host and using a Slurm cluster. from logging import getLogger -try: - from galaxy.model import Job - job_states = Job.states -except ImportError: - # Not in Galaxy, map Galaxy job states to Pulsar ones. - from pulsar.util import enum - job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed") - -from ..job import BaseJobExec +from ..job import ( + BaseJobExec, + job_states, +) log = getLogger(__name__) -argmap = { - 'time': '-t', - 'ncpus': '-c', - 'partition': '-p' -} +argmap = {"time": "-t", "ncpus": "-c", "partition": "-p"} class Slurm(BaseJobExec): - - def __init__(self, **params): - self.params = {} - for k, v in params.items(): - self.params[k] = v - def job_script_kwargs(self, ofile, efile, job_name): - scriptargs = {'-o': ofile, - '-e': efile, - '-J': job_name} + scriptargs = {"-o": ofile, "-e": efile, "-J": job_name} # Map arguments using argmap. for k, v in self.params.items(): - if k == 'plugin': + if k == "plugin": continue try: - if not k.startswith('-'): + if not k.startswith("-"): k = argmap[k] scriptargs[k] = v except Exception: - log.warning('Unrecognized long argument passed to Slurm CLI plugin: %s' % k) + log.warning(f"Unrecognized long argument passed to Slurm CLI plugin: {k}") # Generated template. - template_scriptargs = '' + template_scriptargs = "" for k, v in scriptargs.items(): - template_scriptargs += '#SBATCH {} {}\n'.format(k, v) + template_scriptargs += f"#SBATCH {k} {v}\n" return dict(headers=template_scriptargs) def submit(self, script_file): - return 'sbatch %s' % script_file + return f"sbatch {script_file}" def delete(self, job_id): - return 'scancel %s' % job_id + return f"scancel {job_id}" def get_status(self, job_ids=None): return "squeue -a -o '%A %t'" def get_single_status(self, job_id): - return "squeue -a -o '%A %t' -j " + job_id + return f"squeue -a -o '%A %t' -j {job_id}" def parse_status(self, status, job_ids): # Get status for each job, skipping header. @@ -84,14 +67,14 @@ class Slurm(BaseJobExec): def _get_job_state(self, state): try: return { - 'F': job_states.ERROR, - 'R': job_states.RUNNING, - 'CG': job_states.RUNNING, - 'PD': job_states.QUEUED, - 'CD': job_states.OK + "F": job_states.ERROR, + "R": job_states.RUNNING, + "CG": job_states.RUNNING, + "PD": job_states.QUEUED, + "CD": job_states.OK, }.get(state) except KeyError: - raise KeyError("Failed to map slurm status code [%s] to job state." % state) + raise KeyError(f"Failed to map slurm status code [{state}] to job state.") -__all__ = ('Slurm',) +__all__ = ("Slurm",) diff --git a/pulsar/managers/util/cli/job/slurm_torque.py b/pulsar/managers/util/cli/job/slurm_torque.py index 78f2576ca385e60ccd9d9f06cf00e57fd96cfc23..1cc097087676ce6dc7503ef247515fa99927346d 100644 --- a/pulsar/managers/util/cli/job/slurm_torque.py +++ b/pulsar/managers/util/cli/job/slurm_torque.py @@ -2,17 +2,17 @@ import re from .torque import Torque -__all__ = ('SlurmTorque',) +__all__ = ("SlurmTorque",) class SlurmTorque(Torque): - """ A CLI job executor for Slurm's Torque compatibility mode. This differs + """A CLI job executor for Slurm's Torque compatibility mode. This differs from real torque CLI in that -x command line is not available so job status needs to be parsed from qstat table instead of XML. """ def get_status(self, job_ids=None): - return 'qstat' + return "qstat" def parse_status(self, status, job_ids): rval = {} diff --git a/pulsar/managers/util/cli/job/torque.py b/pulsar/managers/util/cli/job/torque.py index 2e40b98621abe1d5088896cdb7d5c435d62b73bf..12c843583c92ea0c56d2a49f56aac308dd44f4b0 100644 --- a/pulsar/managers/util/cli/job/torque.py +++ b/pulsar/managers/util/cli/job/torque.py @@ -1,78 +1,70 @@ -import xml.etree.ElementTree as et from logging import getLogger -try: - from galaxy.model import Job - job_states = Job.states -except ImportError: - # Not in Galaxy, map Galaxy job states to Pulsar ones. - from pulsar.util import enum - job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') +from galaxy.util import parse_xml_string -from ..job import BaseJobExec +from ..job import ( + BaseJobExec, + job_states, +) log = getLogger(__name__) -ERROR_MESSAGE_UNRECOGNIZED_ARG = 'Unrecognized long argument passed to Torque CLI plugin: %s' - -argmap = {'destination': '-q', - 'Execution_Time': '-a', - 'Account_Name': '-A', - 'Checkpoint': '-c', - 'Error_Path': '-e', - 'Group_List': '-g', - 'Hold_Types': '-h', - 'Join_Paths': '-j', - 'Keep_Files': '-k', - 'Resource_List': '-l', - 'Mail_Points': '-m', - 'Mail_Users': '-M', - 'Job_Name': '-N', - 'Output_Path': '-o', - 'Priority': '-p', - 'Rerunable': '-r', - 'Shell_Path_List': '-S', - 'job_array_request': '-t', - 'User_List': '-u', - 'Variable_List': '-v'} + +argmap = { + "destination": "-q", + "Execution_Time": "-a", + "Account_Name": "-A", + "Checkpoint": "-c", + "Error_Path": "-e", + "Group_List": "-g", + "Hold_Types": "-h", + "Join_Paths": "-j", + "Keep_Files": "-k", + "Resource_List": "-l", + "Mail_Points": "-m", + "Mail_Users": "-M", + "Job_Name": "-N", + "Output_Path": "-o", + "Priority": "-p", + "Rerunable": "-r", + "Shell_Path_List": "-S", + "job_array_request": "-t", + "User_List": "-u", + "Variable_List": "-v", +} class Torque(BaseJobExec): - def __init__(self, **params): - self.params = {} - for k, v in params.items(): - self.params[k] = v + ERROR_MESSAGE_UNRECOGNIZED_ARG = "Unrecognized long argument passed to Torque CLI plugin: %s" def job_script_kwargs(self, ofile, efile, job_name): - pbsargs = {'-o': ofile, - '-e': efile, - '-N': job_name} + pbsargs = {"-o": ofile, "-e": efile, "-N": job_name} for k, v in self.params.items(): - if k == 'plugin': + if k == "plugin": continue try: - if not k.startswith('-'): + if not k.startswith("-"): k = argmap[k] pbsargs[k] = v except KeyError: - log.warning(ERROR_MESSAGE_UNRECOGNIZED_ARG % k) - template_pbsargs = '' + log.warning(self.ERROR_MESSAGE_UNRECOGNIZED_ARG, k) + template_pbsargs = "" for k, v in pbsargs.items(): - template_pbsargs += '#PBS {} {}\n'.format(k, v) + template_pbsargs += f"#PBS {k} {v}\n" return dict(headers=template_pbsargs) def submit(self, script_file): - return 'qsub %s' % script_file + return f"qsub {script_file}" def delete(self, job_id): - return 'qdel %s' % job_id + return f"qdel {job_id}" def get_status(self, job_ids=None): - return 'qstat -x' + return "qstat -x" def get_single_status(self, job_id): - return 'qstat -f %s' % job_id + return f"qstat -f {job_id}" def parse_status(self, status, job_ids): # in case there's noise in the output, find the big blob 'o xml @@ -80,41 +72,38 @@ class Torque(BaseJobExec): rval = {} for line in status.strip().splitlines(): try: - tree = et.fromstring(line.strip()) - assert tree.tag == 'Data' + tree = parse_xml_string(line.strip()) + assert tree.tag == "Data" break except Exception: tree = None if tree is None: - log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status) + log.warning(f"No valid qstat XML return from `qstat -x`, got the following: {status}") return None else: - for job in tree.findall('Job'): - id = job.find('Job_Id').text + for job in tree.findall("Job"): + id = job.find("Job_Id").text if id in job_ids: - state = job.find('job_state').text + state = job.find("job_state").text # map PBS job states to Galaxy job states. rval[id] = self._get_job_state(state) return rval def parse_single_status(self, status, job_id): for line in status.splitlines(): - line = line.split(' = ') - if line[0].strip() == 'job_state': + line = line.split(" = ") + if line[0].strip() == "job_state": return self._get_job_state(line[1].strip()) # no state found, job has exited return job_states.OK def _get_job_state(self, state): try: - return { - 'E': job_states.RUNNING, - 'R': job_states.RUNNING, - 'Q': job_states.QUEUED, - 'C': job_states.OK - }.get(state) + return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}.get( + state + ) except KeyError: - raise KeyError("Failed to map torque status code [%s] to job state." % state) + raise KeyError(f"Failed to map torque status code [{state}] to job state.") -__all__ = ('Torque',) +__all__ = ("Torque",) diff --git a/pulsar/managers/util/cli/shell/__init__.py b/pulsar/managers/util/cli/shell/__init__.py index 54e77e22bd1bf71d58a5c7c05ab225f64ef89521..5e650fc28352f93ad4e16ba55949ed60d91b886e 100644 --- a/pulsar/managers/util/cli/shell/__init__.py +++ b/pulsar/managers/util/cli/shell/__init__.py @@ -3,12 +3,11 @@ Abstract base class for runners which execute commands via a shell. """ from abc import ( ABCMeta, - abstractmethod + abstractmethod, ) class BaseShellExec(metaclass=ABCMeta): - @abstractmethod def __init__(self, *args, **kwargs): """ diff --git a/pulsar/managers/util/cli/shell/local.py b/pulsar/managers/util/cli/shell/local.py index 0ff8cfff072c7ccda961bf26475539ae8215b1e5..03c5fac038a2b953519db78da1b1316e965d927e 100644 --- a/pulsar/managers/util/cli/shell/local.py +++ b/pulsar/managers/util/cli/shell/local.py @@ -1,20 +1,23 @@ +import os from logging import getLogger from subprocess import ( PIPE, - Popen + Popen, ) from tempfile import TemporaryFile from time import sleep -from ..shell import BaseShellExec -from ....util import ( - Bunch, - kill_pid +from galaxy.util.bunch import Bunch + +from . import BaseShellExec +from ....util.process_groups import ( + check_pg, + kill_pg, ) log = getLogger(__name__) -TIMEOUT_ERROR_MESSAGE = 'Execution timed out' +TIMEOUT_ERROR_MESSAGE = "Execution timed out" TIMEOUT_RETURN_CODE = -1 DEFAULT_TIMEOUT = 60 DEFAULT_TIMEOUT_CHECK_INTERVAL = 3 @@ -30,7 +33,9 @@ class LocalShell(BaseShellExec): True >>> exec_result.stdout.strip() == u'Hello World' True - >>> exec_result = exec_python("import time; time.sleep(90)", timeout=1, timeout_check_interval=.1) + >>> exec_result.returncode + 0 + >>> exec_result = exec_python("import time; time.sleep(10)", timeout=1, timeout_check_interval=.1) >>> exec_result.stdout == u'' True >>> exec_result.stderr == 'Execution timed out' @@ -44,28 +49,31 @@ class LocalShell(BaseShellExec): def __init__(self, **kwds): pass - def execute(self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds): + def execute( + self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds + ): is_cmd_string = isinstance(cmd, str) outf = TemporaryFile() - p = Popen(cmd, stdin=None, stdout=outf, stderr=PIPE, shell=is_cmd_string) - # poll until timeout + p = Popen(cmd, stdin=None, stdout=outf, stderr=PIPE, shell=is_cmd_string, preexec_fn=os.setpgrp) + # check process group until timeout for _ in range(int(timeout / timeout_check_interval)): sleep(0.1) # For fast returning commands - r = p.poll() - if r is not None: + if not check_pg(p.pid): break sleep(timeout_check_interval) else: - kill_pid(p.pid) - return Bunch(stdout='', stderr=TIMEOUT_ERROR_MESSAGE, returncode=TIMEOUT_RETURN_CODE) + kill_pg(p.pid) + return Bunch(stdout="", stderr=TIMEOUT_ERROR_MESSAGE, returncode=TIMEOUT_RETURN_CODE) outf.seek(0) + # Need to poll once to establish return code + p.poll() return Bunch(stdout=_read_str(outf), stderr=_read_str(p.stderr), returncode=p.returncode) def _read_str(stream): contents = stream.read() - return contents.decode('UTF-8') if isinstance(contents, bytes) else contents + return contents.decode("UTF-8") if isinstance(contents, bytes) else contents -__all__ = ('LocalShell',) +__all__ = ("LocalShell",) diff --git a/pulsar/managers/util/cli/shell/rsh.py b/pulsar/managers/util/cli/shell/rsh.py index bded5a09f7289d0d1628508a393513d190e1f00e..e4e814b4a38222139cbd5090a44f8f00ab2e7643 100644 --- a/pulsar/managers/util/cli/shell/rsh.py +++ b/pulsar/managers/util/cli/shell/rsh.py @@ -2,24 +2,24 @@ import logging import time import paramiko -from pulsar.managers.util.retry import RetryActionExecutor - from galaxy.util import ( smart_str, - unicodify + string_as_bool, + unicodify, ) from galaxy.util.bunch import Bunch + +from pulsar.managers.util.retry import RetryActionExecutor from .local import LocalShell log = logging.getLogger(__name__) logging.getLogger("paramiko").setLevel(logging.WARNING) # paramiko logging is very verbose -__all__ = ('RemoteShell', 'SecureShell', 'GlobusSecureShell', 'ParamikoShell') +__all__ = ("RemoteShell", "SecureShell", "GlobusSecureShell", "ParamikoShell") class RemoteShell(LocalShell): - - def __init__(self, rsh='rsh', rcp='rcp', hostname='localhost', username=None, options=None, **kwargs): + def __init__(self, rsh="rsh", rcp="rcp", hostname="localhost", username=None, options=None, **kwargs): super().__init__(**kwargs) self.rsh = rsh self.rcp = rcp @@ -40,28 +40,37 @@ class RemoteShell(LocalShell): class SecureShell(RemoteShell): - SSH_NEW_KEY_STRING = 'Are you sure you want to continue connecting' - - def __init__(self, rsh='ssh', rcp='scp', private_key=None, port=None, strict_host_key_checking=True, **kwargs): - strict_host_key_checking = "yes" if strict_host_key_checking else "no" - options = ["-o", "StrictHostKeyChecking=%s" % strict_host_key_checking] + def __init__(self, rsh="ssh", rcp="scp", private_key=None, port=None, strict_host_key_checking=True, **kwargs): + options = [] + if not string_as_bool(strict_host_key_checking): + options.extend(["-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"]) options.extend(["-o", "ConnectTimeout=60"]) if private_key: - options.extend(['-i', private_key]) + options.extend(["-i", private_key]) if port: - options.extend(['-p', str(port)]) + options.extend(["-p", str(port)]) super().__init__(rsh=rsh, rcp=rcp, options=options, **kwargs) class ParamikoShell: - - def __init__(self, username, hostname, password=None, private_key=None, port=22, timeout=60, **kwargs): + def __init__( + self, + username, + hostname, + password=None, + private_key=None, + port=22, + timeout=60, + strict_host_key_checking=True, + **kwargs, + ): self.username = username self.hostname = hostname self.password = password self.private_key = private_key self.port = int(port) if port else port self.timeout = int(timeout) if timeout else timeout + self.strict_host_key_checking = string_as_bool(strict_host_key_checking) self.ssh = None self.retry_action_executor = RetryActionExecutor(max_retries=100, interval_max=300) self.connect() @@ -69,16 +78,20 @@ class ParamikoShell: def connect(self): log.info("Attempting establishment of new paramiko SSH channel") self.ssh = paramiko.SSHClient() - self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.ssh.connect(hostname=self.hostname, - port=self.port, - username=self.username, - password=self.password, - key_filename=self.private_key, - timeout=self.timeout) + self.ssh.set_missing_host_key_policy( + paramiko.RejectPolicy() if self.strict_host_key_checking else paramiko.WarningPolicy() + ) + self.ssh.load_system_host_keys() + self.ssh.connect( + hostname=self.hostname, + port=self.port, + username=self.username, + password=self.password, + key_filename=self.private_key, + timeout=self.timeout, + ) def execute(self, cmd, timeout=60): - def retry(): try: _, stdout, stderr = self._execute(cmd, timeout) @@ -98,6 +111,5 @@ class ParamikoShell: class GlobusSecureShell(SecureShell): - - def __init__(self, rsh='gsissh', rcp='gsiscp', **kwargs): + def __init__(self, rsh="gsissh", rcp="gsiscp", **kwargs): super().__init__(rsh=rsh, rcp=rcp, **kwargs) diff --git a/pulsar/managers/util/condor/__init__.py b/pulsar/managers/util/condor/__init__.py index f68b7ea1318f46e9a5027d2d31a32b469e07b3d4..95229b6ad25d37e8b79e84dcd970a59e6d17a897 100644 --- a/pulsar/managers/util/condor/__init__.py +++ b/pulsar/managers/util/condor/__init__.py @@ -4,23 +4,22 @@ Condor helper utilities. from subprocess import ( CalledProcessError, check_call, - PIPE, - Popen, - STDOUT +) + +from galaxy.util import ( + commands, + unicodify, ) from ..external import parse_external_id DEFAULT_QUERY_CLASSAD = dict( - universe='vanilla', - getenv='true', - notification='NEVER', + universe="vanilla", + getenv="true", + notification="NEVER", ) -PROBLEM_RUNNING_CONDOR_SUBMIT = \ - "Problem encountered while running condor_submit." -PROBLEM_PARSING_EXTERNAL_ID = \ - "Failed to find job id from condor_submit" +PROBLEM_PARSING_EXTERNAL_ID = "Failed to find job id from condor_submit" SUBMIT_PARAM_PREFIX = "submit_" @@ -60,13 +59,13 @@ def build_submit_description(executable, output, error, user_log, query_params): submit_description = [] for key, value in all_query_params.items(): - submit_description.append('{} = {}'.format(key, value)) - submit_description.append('executable = ' + executable) - submit_description.append('output = ' + output) - submit_description.append('error = ' + error) - submit_description.append('log = ' + user_log) - submit_description.append('queue') - return '\n'.join(submit_description) + submit_description.append(f"{key} = {value}") + submit_description.append(f"executable = {executable}") + submit_description.append(f"output = {output}") + submit_description.append(f"error = {error}") + submit_description.append(f"log = {user_log}") + submit_description.append("queue") + return "\n".join(submit_description) def condor_submit(submit_file): @@ -77,15 +76,14 @@ def condor_submit(submit_file): external_id = None failure_message = None try: - submit = Popen(('condor_submit', submit_file), stdout=PIPE, stderr=STDOUT) - outs, _ = submit.communicate() - condor_message = outs.decode() - if submit.returncode == 0: - external_id = parse_external_id(condor_message, type='condor') - else: - failure_message = "{}: {}".format(PROBLEM_PARSING_EXTERNAL_ID, condor_message) - except Exception as e: - failure_message = str(e) + condor_message = commands.execute(("condor_submit", submit_file)) + except commands.CommandLineException as e: + failure_message = unicodify(e) + else: + try: + external_id = parse_external_id(condor_message, type="condor") + except Exception: + failure_message = f"{PROBLEM_PARSING_EXTERNAL_ID}: {condor_message}" return external_id, failure_message @@ -96,30 +94,29 @@ def condor_stop(external_id): """ failure_message = None try: - check_call(('condor_rm', external_id)) + check_call(("condor_rm", external_id)) except CalledProcessError: failure_message = "condor_rm failed" except Exception as e: - "error encountered calling condor_rm: %s" % e + failure_message = f"error encountered calling condor_rm: {unicodify(e)}" return failure_message def summarize_condor_log(log_file, external_id): - """ - """ + """ """ log_job_id = external_id.zfill(3) s1 = s4 = s7 = s5 = s9 = False with open(log_file) as log_handle: for line in log_handle: - if '001 (' + log_job_id + '.' in line: + if f"001 ({log_job_id}." in line: s1 = True - if '004 (' + log_job_id + '.' in line: + if f"004 ({log_job_id}." in line: s4 = True - if '007 (' + log_job_id + '.' in line: + if f"007 ({log_job_id}." in line: s7 = True - if '005 (' + log_job_id + '.' in line: + if f"005 ({log_job_id}." in line: s5 = True - if '009 (' + log_job_id + '.' in line: + if f"009 ({log_job_id}." in line: s9 = True file_size = log_handle.tell() return s1, s4, s7, s5, s9, file_size diff --git a/pulsar/managers/util/drmaa/__init__.py b/pulsar/managers/util/drmaa/__init__.py index 568314459c77cf548d5f56f023615494f7431dab..168b41a57c8ef0d4f51db3d63a7f4c469defce25 100644 --- a/pulsar/managers/util/drmaa/__init__.py +++ b/pulsar/managers/util/drmaa/__init__.py @@ -3,7 +3,10 @@ import logging import threading try: - from drmaa import Session, JobControlAction + from drmaa import ( + JobControlAction, + Session, + ) except OSError as e: LOAD_ERROR_MESSAGE = "OSError - problem loading shared library [%s]." % e Session = None diff --git a/pulsar/managers/util/env.py b/pulsar/managers/util/env.py index f3ef1c197816bc820d4ef1ec366abc43f18e41ee..b7a30d89505565d8801e95696f2008fa4fb1b42a 100644 --- a/pulsar/managers/util/env.py +++ b/pulsar/managers/util/env.py @@ -2,7 +2,7 @@ RAW_VALUE_BY_DEFAULT = False def env_to_statement(env): - ''' Return the abstraction description of an environment variable definition + """Return the abstraction description of an environment variable definition into a statement for shell script. >>> env_to_statement(dict(name='X', value='Y')) @@ -20,20 +20,20 @@ def env_to_statement(env): '. "S"' >>> env_to_statement(dict(execute="module load java/1.5.1")) 'module load java/1.5.1' - ''' - source_file = env.get('file', None) + """ + source_file = env.get("file", None) if source_file: - return '. %s' % __escape(source_file, env) - execute = env.get('execute', None) + return f". {__escape(source_file, env)}" + execute = env.get("execute", None) if execute: return execute - name = env['name'] - value = __escape(env['value'], env) - return '{}={}; export {}'.format(name, value, name) + name = env["name"] + value = __escape(env["value"], env) + return f"{name}={value}; export {name}" def __escape(value, env): - raw = env.get('raw', RAW_VALUE_BY_DEFAULT) + raw = env.get("raw", RAW_VALUE_BY_DEFAULT) if not raw: value = '"' + value.replace('"', '\\"') + '"' return value diff --git a/pulsar/managers/util/external.py b/pulsar/managers/util/external.py index a02b448fec02918e5a4ed015f17232a6f0c33609..863ee0029332685d14fba05ec6d862ca36569f3a 100644 --- a/pulsar/managers/util/external.py +++ b/pulsar/managers/util/external.py @@ -3,9 +3,9 @@ from re import search EXTERNAL_ID_TYPE_ANY = None EXTERNAL_ID_PATTERNS = [ - ('condor', r'submitted to cluster (\d+)\.'), - ('slurm', r'Submitted batch job (\w+)'), - ('torque', r'(.+)'), # Default 'pattern' assumed by Galaxy code circa August 2013. + ("condor", r"submitted to cluster (\d+)\."), + ("slurm", r"Submitted batch job (\w+)"), + ("torque", r"(.+)"), # Default 'pattern' assumed by Galaxy code circa August 2013. ] diff --git a/pulsar/managers/util/job_script/CLUSTER_SLOTS_STATEMENT.sh b/pulsar/managers/util/job_script/CLUSTER_SLOTS_STATEMENT.sh index 01dccf22c5deee89c8f3cafbd9a5551fc62dd028..06e0cbc0adb47471d688aa66d5dc1e2c3ed8c7e0 100644 --- a/pulsar/managers/util/job_script/CLUSTER_SLOTS_STATEMENT.sh +++ b/pulsar/managers/util/job_script/CLUSTER_SLOTS_STATEMENT.sh @@ -30,3 +30,4 @@ else GALAXY_SLOTS="1" unset GALAXY_SLOTS_CONFIGURED fi +export GALAXY_SLOTS diff --git a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh index 502e012fb49704b27565ecdffc377ee085c2d7d8..7b4d04d0a58b81bb04d66c0d2239cecf5fd24170 100644 --- a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -4,8 +4,8 @@ $headers _galaxy_setup_environment() { local _use_framework_galaxy="$1" - _GALAXY_JOB_DIR="$job_directory" - _GALAXY_JOB_HOME_DIR="$job_directory/home" + _GALAXY_JOB_DIR="$working_directory" + _GALAXY_JOB_HOME_DIR="$working_directory/home" _GALAXY_JOB_TMP_DIR=$tmp_dir_creation_statement $env_setup_commands if [ "$GALAXY_LIB" != "None" -a "$_use_framework_galaxy" = "True" ]; then @@ -27,7 +27,6 @@ _galaxy_setup_environment() { $integrity_injection $slots_statement -export GALAXY_SLOTS export PYTHONWARNINGS="ignore" GALAXY_VIRTUAL_ENV="$galaxy_virtual_env" _GALAXY_VIRTUAL_ENV="$galaxy_virtual_env" @@ -40,7 +39,5 @@ GALAXY_PYTHON=`command -v python` cd $working_directory $memory_statement $instrument_pre_commands -export _GALAXY_JOB_TMP_DIR $command -echo $? > $exit_code_path $instrument_post_commands diff --git a/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh b/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh new file mode 100644 index 0000000000000000000000000000000000000000..09fc2f60383833f7fc818d6c0cbb6d11e7565c86 --- /dev/null +++ b/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh @@ -0,0 +1,16 @@ +if [ -z "$GALAXY_MEMORY_MB" ]; then + if [ -n "$SLURM_JOB_ID" ]; then + GALAXY_MEMORY_MB=`scontrol -do show job "$SLURM_JOB_ID" | sed 's/.*\( \|^\)Mem=\([0-9][0-9]*\)\( \|$\).*/\2/p;d'` 2>memory_statement.log + fi + if [ -n "$SGE_HGR_h_vmem" ]; then + GALAXY_MEMORY_MB=`echo "$SGE_HGR_h_vmem" | sed 's/G$/ * 1024/' | bc | cut -d"." -f1` 2>memory_statement.log + fi +fi + +if [ -z "$GALAXY_MEMORY_MB_PER_SLOT" -a -n "$GALAXY_MEMORY_MB" ]; then + GALAXY_MEMORY_MB_PER_SLOT=$(($GALAXY_MEMORY_MB / $GALAXY_SLOTS)) +elif [ -z "$GALAXY_MEMORY_MB" -a -n "$GALAXY_MEMORY_MB_PER_SLOT" ]; then + GALAXY_MEMORY_MB=$(($GALAXY_MEMORY_MB_PER_SLOT * $GALAXY_SLOTS)) +fi +[ "${GALAXY_MEMORY_MB--1}" -gt 0 ] 2>>memory_statement.log && export GALAXY_MEMORY_MB || unset GALAXY_MEMORY_MB +[ "${GALAXY_MEMORY_MB_PER_SLOT--1}" -gt 0 ] 2>>memory_statement.log && export GALAXY_MEMORY_MB_PER_SLOT || unset GALAXY_MEMORY_MB_PER_SLOT diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index dafdacf4160a59b7cac4738aee2bf883b2ad623b..3d662811cc1b64b25607a12963fb3b3383b42aca 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -3,24 +3,27 @@ import os import subprocess import time from string import Template -from typing import Any, Dict +from typing import ( + Any, + Dict, +) + +from typing_extensions import Protocol -from galaxy.util import unicodify -from pkg_resources import resource_string +from galaxy.util import ( + RWXR_XR_X, + unicodify, +) +from galaxy.util.resources import resource_string log = logging.getLogger(__name__) -DEFAULT_SHELL = '/bin/bash' +DEFAULT_SHELL = "/bin/bash" -DEFAULT_JOB_FILE_TEMPLATE = Template( - unicodify(resource_string(__name__, 'DEFAULT_JOB_FILE_TEMPLATE.sh')) -) +DEFAULT_JOB_FILE_TEMPLATE = Template(resource_string(__name__, "DEFAULT_JOB_FILE_TEMPLATE.sh")) -SLOTS_STATEMENT_CLUSTER_DEFAULT = \ - unicodify(resource_string(__name__, 'CLUSTER_SLOTS_STATEMENT.sh')) +SLOTS_STATEMENT_CLUSTER_DEFAULT = resource_string(__name__, "CLUSTER_SLOTS_STATEMENT.sh") -MEMORY_STATEMENT_DEFAULT_TEMPLATE = Template( - unicodify(resource_string(__name__, 'MEMORY_STATEMENT_TEMPLATE.sh')) -) +MEMORY_STATEMENT_DEFAULT = resource_string(__name__, "MEMORY_STATEMENT.sh") SLOTS_STATEMENT_SINGLE = """ GALAXY_SLOTS="1" @@ -38,20 +41,21 @@ fi INTEGRITY_SYNC_COMMAND = "/bin/sync" DEFAULT_INTEGRITY_CHECK = True DEFAULT_INTEGRITY_COUNT = 35 -DEFAULT_INTEGRITY_SLEEP = .25 -REQUIRED_TEMPLATE_PARAMS = ['working_directory', 'job_directory', 'command', 'exit_code_path'] +DEFAULT_INTEGRITY_SLEEP = 0.25 +REQUIRED_TEMPLATE_PARAMS = ["working_directory", "command"] OPTIONAL_TEMPLATE_PARAMS: Dict[str, Any] = { - 'galaxy_lib': None, - 'galaxy_virtual_env': None, - 'headers': '', - 'env_setup_commands': [], - 'slots_statement': SLOTS_STATEMENT_CLUSTER_DEFAULT, - 'instrument_pre_commands': '', - 'instrument_post_commands': '', - 'integrity_injection': INTEGRITY_INJECTION, - 'shell': DEFAULT_SHELL, - 'preserve_python_environment': True, - 'tmp_dir_creation_statement': '""', + "galaxy_lib": None, + "galaxy_virtual_env": None, + "headers": "", + "env_setup_commands": [], + "slots_statement": SLOTS_STATEMENT_CLUSTER_DEFAULT, + "memory_statement": MEMORY_STATEMENT_DEFAULT, + "instrument_pre_commands": "", + "instrument_post_commands": "", + "integrity_injection": INTEGRITY_INJECTION, + "shell": DEFAULT_SHELL, + "preserve_python_environment": True, + "tmp_dir_creation_statement": '""', } @@ -66,8 +70,6 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec') >>> '\\nuptime\\n' in script True - >>> 'echo $? > ec' in script - True >>> 'GALAXY_LIB="None"' in script True >>> script.startswith('#!/bin/sh\\n#PBS -test\\n') @@ -76,24 +78,21 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): >>> script.startswith('#!/bin/bash\\n\\n#PBS -test\\n') True >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', slots_statement='GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"') - >>> script.find('GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"\\nexport GALAXY_SLOTS\\n') > 0 + >>> script.find('GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"\\n') > 0 True >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', memory_statement='GALAXY_MEMORY_MB="32768"') >>> script.find('GALAXY_MEMORY_MB="32768"\\n') > 0 True """ - if any([param not in kwds for param in REQUIRED_TEMPLATE_PARAMS]): + if any(param not in kwds for param in REQUIRED_TEMPLATE_PARAMS): raise Exception("Failed to create job_script, a required parameter is missing.") job_instrumenter = kwds.get("job_instrumenter", None) - if "job_directory" not in kwds: - kwds["job_directory"] = kwds["working_directory"] - metadata_directory = kwds.get("metadata_directory", kwds["working_directory"]) if job_instrumenter: del kwds["job_instrumenter"] - kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(metadata_directory) or '' - kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(metadata_directory) or '' - if "memory_statement" not in kwds: - kwds["memory_statement"] = MEMORY_STATEMENT_DEFAULT_TEMPLATE.safe_substitute(metadata_directory=metadata_directory) + working_directory = kwds.get("metadata_directory", kwds["working_directory"]) + kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or "" + kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or "" + template_params = OPTIONAL_TEMPLATE_PARAMS.copy() template_params.update(**kwds) env_setup_commands_str = "\n".join(template_params["env_setup_commands"]) @@ -105,29 +104,28 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): return template.safe_substitute(template_params) -def check_script_integrity(config): - return getattr(config, "check_job_script_integrity", DEFAULT_INTEGRITY_CHECK) +class DescribesScriptIntegrityChecks(Protocol): + check_job_script_integrity: bool + check_job_script_integrity_count: int + check_job_script_integrity_sleep: float -def write_script(path, contents, config, mode=0o755): +def write_script(path, contents, job_io: DescribesScriptIntegrityChecks, mode=RWXR_XR_X) -> None: dir = os.path.dirname(path) if not os.path.exists(dir): os.makedirs(dir) - with open(path, 'w', encoding='utf-8') as f: + with open(path, "w", encoding="utf-8") as f: f.write(unicodify(contents)) os.chmod(path, mode) - _handle_script_integrity(path, config) + if job_io.check_job_script_integrity: + _handle_script_integrity(path, job_io.check_job_script_integrity_count, job_io.check_job_script_integrity_sleep) -def _handle_script_integrity(path, config): - if not check_script_integrity(config): - return +def _handle_script_integrity(path, check_job_script_integrity_count, check_job_script_integrity_sleep): script_integrity_verified = False - count = getattr(config, "check_job_script_integrity_count", DEFAULT_INTEGRITY_COUNT) - sleep_amt = getattr(config, "check_job_script_integrity_sleep", DEFAULT_INTEGRITY_SLEEP) - for _ in range(count): + for _ in range(check_job_script_integrity_count): try: returncode = subprocess.call([path], env={"ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ": "1"}) if returncode == 42: @@ -149,15 +147,14 @@ def _handle_script_integrity(path, config): except Exception as exc: log.debug("Script not available yet: %s", unicodify(exc)) - time.sleep(sleep_amt) + time.sleep(check_job_script_integrity_sleep) if not script_integrity_verified: - raise Exception("Failed to write job script '%s', could not verify job script integrity." % path) + raise Exception(f"Failed to write job script '{path}', could not verify job script integrity.") __all__ = ( - 'check_script_integrity', - 'job_script', - 'write_script', - 'INTEGRITY_INJECTION', + "job_script", + "write_script", + "INTEGRITY_INJECTION", ) diff --git a/pulsar/managers/util/kill.py b/pulsar/managers/util/kill.py index 1ba30980d4e6ba0603674446d766afd62f056185..cbc6afb3669d4374e4bef1c2704dfce9ae29060e 100644 --- a/pulsar/managers/util/kill.py +++ b/pulsar/managers/util/kill.py @@ -1,14 +1,17 @@ import os +import signal import subprocess from platform import system from time import sleep try: - from psutil import NoSuchProcess, Process + from psutil import ( + NoSuchProcess, + Process, + ) except ImportError: - """ Don't make psutil a strict requirement, but use if available. """ + """Don't make psutil a strict requirement, but use if available.""" Process = None # type: ignore - NoSuchProcess = Exception # type: ignore def kill_pid(pid: int, use_psutil: bool = True): @@ -32,7 +35,7 @@ def _psutil_kill_pid(pid: int): def _stock_kill_pid(pid: int): - is_windows = system() == 'Windows' + is_windows = system() == "Windows" if is_windows: __kill_windows(pid) @@ -42,7 +45,7 @@ def _stock_kill_pid(pid: int): def __kill_windows(pid): try: - subprocess.check_call(['taskkill', '/F', '/T', '/PID', pid]) + subprocess.check_call(["taskkill", "/F", "/T", "/PID", str(pid)]) except subprocess.CalledProcessError: pass @@ -56,7 +59,7 @@ def __kill_posix(pid: int): return False if __check_pid(): - for sig in [15, 9]: + for sig in [signal.SIGTERM, signal.SIGKILL]: try: os.killpg(pid, sig) except OSError: diff --git a/pulsar/managers/util/process_groups.py b/pulsar/managers/util/process_groups.py new file mode 100644 index 0000000000000000000000000000000000000000..c56042b31c33141cbcef9658e973d01fe9d946bc --- /dev/null +++ b/pulsar/managers/util/process_groups.py @@ -0,0 +1,50 @@ +import errno +import logging +import os +import signal +from time import sleep + +log = logging.getLogger(__name__) + + +def check_pg(pgid): + """Check whether processes in process group pgid are still alive.""" + try: + (pid, exit_status) = os.waitpid(-pgid, os.WNOHANG) + except OSError as e: + if e.errno == errno.ECHILD: + log.debug("check_pg(): No process found in process group %d", pgid) + else: + log.warning( + "check_pg(): Got errno %s when checking process group %d: %s", + errno.errorcode[e.errno], + pgid, + e.strerror, + ) + return False + # Since we are passing os.WNOHANG to os.waitpid(), pid is 0 if no process + # status is available immediately. + return pid == 0 + + +def kill_pg(pgid): + """Kill all processes in process group pgid.""" + for sig in [signal.SIGTERM, signal.SIGKILL]: + try: + os.killpg(pgid, sig) + except OSError as e: + if e.errno == errno.ESRCH: + return + log.warning( + "Got errno %s when sending signal %d to process group %d: %s", + errno.errorcode[e.errno], + sig, + pgid, + e.strerror, + ) + sleep(1) + if not check_pg(pgid): + log.debug("Processes in process group %d successfully killed with signal %d", pgid, sig) + return + else: + log.warning("Some process in process group %d refuses to die after signaling TERM/KILL", pgid) diff --git a/pulsar/managers/util/pykube_util.py b/pulsar/managers/util/pykube_util.py index 53fb0f4447907c3142186275122ca8f266ba910c..7c3f32d87b09537a3f3a930a7c1cf00469167ebb 100644 --- a/pulsar/managers/util/pykube_util.py +++ b/pulsar/managers/util/pykube_util.py @@ -2,30 +2,42 @@ import logging import os import re -import uuid +from pathlib import PurePath try: from pykube.config import KubeConfig + from pykube.exceptions import HTTPError from pykube.http import HTTPClient from pykube.objects import ( + Ingress, Job, - Pod + Pod, + Service, ) except ImportError as exc: KubeConfig = None + Ingress = None Job = None Pod = None - K8S_IMPORT_MESSAGE = ('The Python pykube package is required to use ' - 'this feature, please install it or correct the ' - 'following error:\nImportError %s' % str(exc)) + Service = None + HTTPError = None + K8S_IMPORT_MESSAGE = ( + "The Python pykube package is required to use " + "this feature, please install it or correct the " + "following error:\nImportError %s" % str(exc) + ) log = logging.getLogger(__name__) DEFAULT_JOB_API_VERSION = "batch/v1" +DEFAULT_SERVICE_API_VERSION = "v1" +DEFAULT_INGRESS_API_VERSION = "extensions/v1beta1" DEFAULT_NAMESPACE = "default" -INSTANCE_ID_INVALID_MESSAGE = ("Galaxy instance [%s] is either too long " - "(>20 characters) or it includes non DNS " - "acceptable characters, ignoring it.") +INSTANCE_ID_INVALID_MESSAGE = ( + "Galaxy instance [%s] is either too long " + "(>20 characters) or it includes non DNS " + "acceptable characters, ignoring it." +) def ensure_pykube(): @@ -39,87 +51,233 @@ def pykube_client_from_dict(params): else: config_path = params.get("k8s_config_path") if config_path is None: - config_path = os.environ.get('KUBECONFIG', None) + config_path = os.environ.get("KUBECONFIG", None) if config_path is None: - config_path = '~/.kube/config' + config_path = "~/.kube/config" pykube_client = HTTPClient(KubeConfig.from_file(config_path)) return pykube_client -def produce_unique_k8s_job_name(app_prefix=None, instance_id=None, job_id=None): - if job_id is None: - job_id = str(uuid.uuid4()) - - job_name = "" - if app_prefix: - job_name += "%s-" % app_prefix - - if instance_id and len(instance_id) > 0: - job_name += "%s-" % instance_id - - return job_name + job_id +def produce_k8s_job_prefix(app_prefix=None, instance_id=None): + job_name_elems = [app_prefix or "", instance_id or ""] + return "-".join(elem for elem in job_name_elems if elem) def pull_policy(params): # If this doesn't validate it returns None, that seems odd? if "k8s_pull_policy" in params: - if params['k8s_pull_policy'] in ["Always", "IfNotPresent", "Never"]: - return params['k8s_pull_policy'] + if params["k8s_pull_policy"] in ["Always", "IfNotPresent", "Never"]: + return params["k8s_pull_policy"] return None +def find_service_object_by_name(pykube_api, service_name, namespace=None): + if not service_name: + raise ValueError("service name must not be empty") + return Service.objects(pykube_api).filter(field_selector={"metadata.name": service_name}, namespace=namespace) + + +def find_ingress_object_by_name(pykube_api, ingress_name, namespace=None): + if not ingress_name: + raise ValueError("ingress name must not be empty") + return Ingress.objects(pykube_api).filter(field_selector={"metadata.name": ingress_name}, namespace=namespace) + + def find_job_object_by_name(pykube_api, job_name, namespace=None): - return _find_object_by_name(Job, pykube_api, job_name, namespace=namespace) + if not job_name: + raise ValueError("job name must not be empty") + return Job.objects(pykube_api).filter(field_selector={"metadata.name": job_name}, namespace=namespace) -def find_pod_object_by_name(pykube_api, pod_name, namespace=None): - return _find_object_by_name(Pod, pykube_api, pod_name, namespace=namespace) +def find_pod_object_by_name(pykube_api, job_name, namespace=None): + return Pod.objects(pykube_api).filter(selector=f"job-name={job_name}", namespace=namespace) -def _find_object_by_name(clazz, pykube_api, object_name, namespace=None): - filter_kwd = dict(selector="app=%s" % object_name) - if namespace is not None: - filter_kwd["namespace"] = namespace +def is_pod_unschedulable(pykube_api, pod, namespace=None): + is_unschedulable = any(c.get("reason") == "Unschedulable" for c in pod.obj["status"].get("conditions", [])) + if pod.obj["status"].get("phase") == "Pending" and is_unschedulable: + return True - objs = clazz.objects(pykube_api).filter(**filter_kwd) - obj = None - if len(objs.response['items']) > 0: - obj = clazz(pykube_api, objs.response['items'][0]) - return obj + return False -def stop_job(job, cleanup="always"): - job_failed = (job.obj['status']['failed'] > 0 - if 'failed' in job.obj['status'] else False) +def delete_job(job, cleanup="always"): + job_failed = job.obj["status"]["failed"] > 0 if "failed" in job.obj["status"] else False # Scale down the job just in case even if cleanup is never job.scale(replicas=0) - if (cleanup == "always" or - (cleanup == "onsuccess" and not job_failed)): - delete_options = { - "apiVersion": "v1", - "kind": "DeleteOptions", - "propagationPolicy": "Background" - } + api_delete = cleanup == "always" + if not api_delete and cleanup == "onsuccess" and not job_failed: + api_delete = True + if api_delete: + delete_options = {"apiVersion": "v1", "kind": "DeleteOptions", "propagationPolicy": "Background"} r = job.api.delete(json=delete_options, **job.api_kwargs()) job.api.raise_for_status(r) -def job_object_dict(params, job_name, spec): +def delete_ingress(ingress, cleanup="always", job_failed=False): + api_delete = cleanup == "always" + if not api_delete and cleanup == "onsuccess" and not job_failed: + api_delete = True + if api_delete: + delete_options = {"apiVersion": "v1", "kind": "DeleteOptions", "propagationPolicy": "Background"} + r = ingress.api.delete(json=delete_options, **ingress.api_kwargs()) + ingress.api.raise_for_status(r) + + +def delete_service(service, cleanup="always", job_failed=False): + api_delete = cleanup == "always" + if not api_delete and cleanup == "onsuccess" and not job_failed: + api_delete = True + if api_delete: + delete_options = {"apiVersion": "v1", "kind": "DeleteOptions", "propagationPolicy": "Background"} + r = service.api.delete(json=delete_options, **service.api_kwargs()) + service.api.raise_for_status(r) + + +def job_object_dict(params, job_prefix, spec): k8s_job_obj = { - "apiVersion": params.get('k8s_job_api_version', DEFAULT_JOB_API_VERSION), + "apiVersion": params.get("k8s_job_api_version", DEFAULT_JOB_API_VERSION), "kind": "Job", "metadata": { - # metadata.name is the name of the pod resource created, and must be unique - # http://kubernetes.io/docs/user-guide/configuring-containers/ - "name": job_name, - "namespace": params.get('k8s_namespace', DEFAULT_NAMESPACE), - "labels": {"app": job_name} + "generateName": f"{job_prefix}-", + "namespace": params.get("k8s_namespace", DEFAULT_NAMESPACE), }, "spec": spec, } return k8s_job_obj +def service_object_dict(params, service_name, spec): + k8s_service_obj = { + "apiVersion": params.get("k8s_service_api_version", DEFAULT_SERVICE_API_VERSION), + "kind": "Service", + "metadata": { + "name": service_name, + "namespace": params.get("k8s_namespace", DEFAULT_NAMESPACE), + }, + } + k8s_service_obj["metadata"].update(spec.pop("metadata", {})) + k8s_service_obj.update(spec) + return k8s_service_obj + + +def ingress_object_dict(params, ingress_name, spec): + k8s_ingress_obj = { + "apiVersion": params.get("k8s_ingress_api_version", DEFAULT_INGRESS_API_VERSION), + "kind": "Ingress", + "metadata": { + "name": ingress_name, + "namespace": params.get("k8s_namespace", DEFAULT_NAMESPACE), + # TODO: Add default annotations + }, + } + k8s_ingress_obj["metadata"].update(spec.pop("metadata", {})) + k8s_ingress_obj.update(spec) + return k8s_ingress_obj + + +def parse_pvc_param_line(pvc_param): + """ + :type pvc_param: str + :param pvc_param: the pvc mount param in the format ``pvc-name/subpath/desired:/mountpath/desired[:r]`` + + :rtype: dict + :return: a dict + like:: + + {"name": "pvc-name", + "subPath": "subpath/desired", + "mountPath": "/mountpath/desired", + "readOnly": False} + """ + claim, _, rest = pvc_param.partition(":") + mount_path, _, mode = rest.partition(":") + read_only = mode == "r" + claim_name, _, subpath = claim.partition("/") + return { + "name": claim_name.strip(), + "subPath": subpath.strip(), + "mountPath": mount_path.strip(), + "readOnly": read_only, + } + + +def generate_relative_mounts(pvc_param, files): + """ + Maps a list of files as mounts, relative to the base volume mount. + For example, given the pvc mount: + { + 'name': 'my_pvc', + 'mountPath': '/galaxy/database/jobs', + 'subPath': 'data', + 'readOnly': False + } + + and files: ['/galaxy/database/jobs/01/input.txt', '/galaxy/database/jobs/01/working'] + + returns each file as a relative mount as follows: + [ + { + 'name': 'my_pvc', + 'mountPath': '/galaxy/database/jobs/01/input.txt', + 'subPath': 'data/01/input.txt', + 'readOnly': False + }, + { + 'name': 'my_pvc', + 'mountPath': '/galaxy/database/jobs/01/working', + 'subPath': 'data/01/working', + 'readOnly': False + } + ] + + :param pvc_param: the pvc claim dict + :param files: a list of file or folder names + :return: A list of volume mounts + """ + if not pvc_param: + return + param_claim = parse_pvc_param_line(pvc_param) + claim_name = param_claim["name"] + base_subpath = PurePath(param_claim.get("subPath", "")) + base_mount = PurePath(param_claim["mountPath"]) + read_only = param_claim["readOnly"] + volume_mounts = [] + for f in files: + file_path = PurePath(str(f)) + if base_mount not in file_path.parents: + # force relative directory, needed for the job working directory in particular + file_path = base_mount.joinpath(file_path.relative_to("/") if file_path.is_absolute() else file_path) + relpath = file_path.relative_to(base_mount) + subpath = base_subpath.joinpath(relpath) + volume_mounts.append( + {"name": claim_name, "mountPath": str(file_path), "subPath": str(subpath), "readOnly": read_only} + ) + return volume_mounts + + +def deduplicate_entries(obj_list): + # remove duplicate entries in a list of dictionaries + # based on: https://stackoverflow.com/a/9428041 + return [i for n, i in enumerate(obj_list) if i not in obj_list[n + 1 :]] + + +def get_volume_mounts_for_job(job_wrapper, data_claim=None, working_claim=None): + volume_mounts = [] + if data_claim: + volume_mounts.extend(generate_relative_mounts(data_claim, job_wrapper.job_io.get_input_fnames())) + # for individual output files, mount the parent folder of each output as there could be wildcard outputs + output_folders = deduplicate_entries( + [str(PurePath(str(f)).parent) for f in job_wrapper.job_io.get_output_fnames()] + ) + volume_mounts.extend(generate_relative_mounts(data_claim, output_folders)) + + if working_claim: + volume_mounts.extend(generate_relative_mounts(working_claim, [job_wrapper.working_directory])) + + return deduplicate_entries(volume_mounts) + + def galaxy_instance_id(params): """Parse and validate the id of the Galaxy instance from supplied dict. @@ -133,7 +291,7 @@ def galaxy_instance_id(params): setup of a Job that is being recovered or restarted after a downtime/reboot. """ if "k8s_galaxy_instance_id" in params: - raw_value = params['k8s_galaxy_instance_id'] + raw_value = params["k8s_galaxy_instance_id"] if re.match(r"(?!-)[a-z\d-]{1,20}(?<!-)$", raw_value): return raw_value else: @@ -143,15 +301,29 @@ def galaxy_instance_id(params): __all__ = ( "DEFAULT_JOB_API_VERSION", + "DEFAULT_SERVICE_API_VERSION", + "DEFAULT_INGRESS_API_VERSION", "ensure_pykube", + "find_service_object_by_name", + "find_ingress_object_by_name", "find_job_object_by_name", "find_pod_object_by_name", "galaxy_instance_id", + "HTTPError", + "is_pod_unschedulable", "Job", + "Service", + "Ingress", "job_object_dict", + "service_object_dict", + "ingress_object_dict", "Pod", - "produce_unique_k8s_job_name", + "produce_k8s_job_prefix", "pull_policy", "pykube_client_from_dict", - "stop_job", + "delete_job", + "delete_service", + "delete_ingress", + "get_volume_mounts_for_job", + "parse_pvc_param_line", ) diff --git a/pulsar/managers/util/retry.py b/pulsar/managers/util/retry.py index cc778460714fafcf1651b24f42103a1de37327f1..b7e2bbd26b847ad148a5674bcbe169aee557e0da 100644 --- a/pulsar/managers/util/retry.py +++ b/pulsar/managers/util/retry.py @@ -1,7 +1,7 @@ +import logging from itertools import count from time import sleep -import logging log = logging.getLogger(__name__) DEFAULT_MAX_RETRIES = -1 # By default don't retry. diff --git a/pulsar/managers/util/sudo.py b/pulsar/managers/util/sudo.py index 0cfca1619c210fbb975d9d91c35921c9829faeb9..fdc8d74feda4389e695926a1100adbf9238cdf26 100644 --- a/pulsar/managers/util/sudo.py +++ b/pulsar/managers/util/sudo.py @@ -1,12 +1,12 @@ import logging from subprocess import ( PIPE, - Popen + Popen, ) -SUDO_PATH = '/usr/bin/sudo' -SUDO_PRESERVE_ENVIRONMENT_ARG = '-E' -SUDO_USER_ARG = '-u' +SUDO_PATH = "/usr/bin/sudo" +SUDO_PRESERVE_ENVIRONMENT_ARG = "-E" +SUDO_USER_ARG = "-u" log = logging.getLogger(__name__) @@ -21,6 +21,6 @@ def sudo_popen(*args, **kwargs): if user: full_command.extend([SUDO_USER_ARG, user]) full_command.extend(args) - log.info("About to execute the following sudo command - [%s]" % ' '.join(full_command)) + log.info(f"About to execute the following sudo command - [{' '.join(full_command)}]") p = Popen(full_command, shell=False, stdout=PIPE, stderr=PIPE) return p diff --git a/pulsar/mesos/__init__.py b/pulsar/mesos/__init__.py index a6b1233d1e942f957443a6da7841b7f7fe1ce2c9..53e04e8fca2f430e5ac2ef4222733272d764a966 100644 --- a/pulsar/mesos/__init__.py +++ b/pulsar/mesos/__init__.py @@ -2,12 +2,12 @@ """ try: from mesos.interface import ( + Executor, Scheduler, - Executor ) from mesos.native import ( + MesosExecutorDriver, MesosSchedulerDriver, - MesosExecutorDriver ) except ImportError: Scheduler = object diff --git a/pulsar/mesos/framework.py b/pulsar/mesos/framework.py index dc774247222582001006ade424c93f44594e3419..efd6b7903718557c937f4fb6989d40f3acbf9e9b 100644 --- a/pulsar/mesos/framework.py +++ b/pulsar/mesos/framework.py @@ -3,9 +3,7 @@ import logging import os from pulsar.client.util import to_base64_json -from pulsar.main import ( - PULSAR_ROOT_DIR, -) +from pulsar.main import PULSAR_ROOT_DIR from pulsar.mesos import ( mesos_pb2, MesosSchedulerDriver, @@ -13,7 +11,6 @@ from pulsar.mesos import ( ) from pulsar.messaging import bind_amqp - log = logging.getLogger(__name__) diff --git a/pulsar/messaging/bind_amqp.py b/pulsar/messaging/bind_amqp.py index c23ee5f05147ab2e226d79430b34cd447888f475..4d7798e98bfca0494c1f3c93eaf7273ecd61a1c1 100644 --- a/pulsar/messaging/bind_amqp.py +++ b/pulsar/messaging/bind_amqp.py @@ -2,7 +2,10 @@ import functools import logging import threading -from galaxy.util import asbool, mask_password_from_url +from galaxy.util import ( + asbool, + mask_password_from_url, +) from pulsar import manager_endpoint_util from pulsar.client import amqp_exchange_factory diff --git a/pulsar/scripts/_conda_init.py b/pulsar/scripts/_conda_init.py index dad2b8b7ca0dd525cc0d326100283384972cf909..65ca66fbfc9251938c176b1ba40a44b8972a29a2 100644 --- a/pulsar/scripts/_conda_init.py +++ b/pulsar/scripts/_conda_init.py @@ -7,7 +7,10 @@ import os.path import sys from argparse import ArgumentParser -from galaxy.tool_util.deps.conda_util import CondaContext, install_conda +from galaxy.tool_util.deps.conda_util import ( + CondaContext, + install_conda, +) from galaxy.util import safe_makedirs diff --git a/pulsar/scripts/_configure_galaxy_cvmfs.py b/pulsar/scripts/_configure_galaxy_cvmfs.py index 97e0bc5abf027de65cec6600d21f24642b78edf4..e732230a29fd60eeae90fd028ed724284be5b294 100644 --- a/pulsar/scripts/_configure_galaxy_cvmfs.py +++ b/pulsar/scripts/_configure_galaxy_cvmfs.py @@ -1,6 +1,7 @@ -import pkg_resources import shutil +import pkg_resources + def main(): config_path = pkg_resources.resource_filename('pulsar.scripts', 'cvmfs_data/default.local') diff --git a/pulsar/scripts/chown_working_directory.py b/pulsar/scripts/chown_working_directory.py index 4b1ce755d6b6c39036b94cf57512644ea1a3dabd..c04654f189883af6f6b8dfb4fae9b63e6e7da29f 100644 --- a/pulsar/scripts/chown_working_directory.py +++ b/pulsar/scripts/chown_working_directory.py @@ -1,8 +1,14 @@ from os import system -from os.path import join, abspath -from pulsar.main import ArgumentParser, PulsarConfigBuilder -from pulsar.core import DEFAULT_STAGING_DIRECTORY +from os.path import ( + abspath, + join, +) +from pulsar.core import DEFAULT_STAGING_DIRECTORY +from pulsar.main import ( + ArgumentParser, + PulsarConfigBuilder, +) DESCRIPTION = "Change ownership of a job working directory." # Switch this to true to tighten up security somewhat in production mode, diff --git a/pulsar/scripts/config.py b/pulsar/scripts/config.py index c4236c5279715bbcec79ed08f4fc3855fcc45fbd..bfebe18ab663667277ced34b6507e7fe1242bf8d 100644 --- a/pulsar/scripts/config.py +++ b/pulsar/scripts/config.py @@ -7,7 +7,7 @@ import sys from pulsar.main import ( ArgumentParser, DEFAULT_APP_YAML, - DEFAULT_INI + DEFAULT_INI, ) try: diff --git a/pulsar/scripts/drmaa_kill.py b/pulsar/scripts/drmaa_kill.py index 2e6915ba7575004085ec5ca077d964faabf12ee6..beb7907ff3af65cae656b1008c102490d16d4a47 100644 --- a/pulsar/scripts/drmaa_kill.py +++ b/pulsar/scripts/drmaa_kill.py @@ -1,7 +1,7 @@ from json import load -from pulsar.managers.util.drmaa import DrmaaSessionFactory -from pulsar.main import ArgumentParser +from pulsar.main import ArgumentParser +from pulsar.managers.util.drmaa import DrmaaSessionFactory DESCRIPTION = "Kill a job via DRMAA interface." diff --git a/pulsar/scripts/drmaa_launch.py b/pulsar/scripts/drmaa_launch.py index 5a574af047cca6cbf49a9b62a4ea5731a88ee18b..cb746272a6b70b3caf1ce807d15d06ab79d73844 100644 --- a/pulsar/scripts/drmaa_launch.py +++ b/pulsar/scripts/drmaa_launch.py @@ -1,7 +1,7 @@ from json import load -from pulsar.managers.util.drmaa import DrmaaSessionFactory -from pulsar.main import ArgumentParser +from pulsar.main import ArgumentParser +from pulsar.managers.util.drmaa import DrmaaSessionFactory DESCRIPTION = "Submit a DRMAA job." diff --git a/pulsar/scripts/mesos_executor.py b/pulsar/scripts/mesos_executor.py index c89417ae5eb3e027d32e0e5cc7e3a437dccb6828..79e479c6f7d03e44ad56d2a0eb1b6b92af866528 100644 --- a/pulsar/scripts/mesos_executor.py +++ b/pulsar/scripts/mesos_executor.py @@ -1,25 +1,24 @@ +import logging import sys import threading +from pulsar.client.util import from_base64_json +from pulsar.main import ( + ArgumentParser, + PulsarManagerConfigBuilder, +) +from pulsar.manager_endpoint_util import submit_job from pulsar.mesos import ( + ensure_mesos_libs, Executor, - MesosExecutorDriver, mesos_pb2, - ensure_mesos_libs, + MesosExecutorDriver, ) -from pulsar.client.util import from_base64_json from pulsar.scripts.submit_util import ( manager_from_args, - wait_for_job + wait_for_job, ) -from pulsar.manager_endpoint_util import submit_job -from pulsar.main import ( - ArgumentParser, - PulsarManagerConfigBuilder, -) - -import logging log = logging.getLogger(__name__) DESCRIPTION = "Mesos executor for Pulsar" diff --git a/pulsar/scripts/mesos_framework.py b/pulsar/scripts/mesos_framework.py index 15d859e88e31a0ebc6acd0e69d08a064fbb83d6f..d5d13089effc36ecfe9bcf8b826a3a329715dc6c 100644 --- a/pulsar/scripts/mesos_framework.py +++ b/pulsar/scripts/mesos_framework.py @@ -1,12 +1,9 @@ -from pulsar.mesos import ( - ensure_mesos_libs -) -from pulsar.mesos.framework import run - from pulsar.main import ( ArgumentParser, PulsarManagerConfigBuilder, ) +from pulsar.mesos import ensure_mesos_libs +from pulsar.mesos.framework import run DESCRIPTION = "Pulsar Mesos Framework Entry Point." diff --git a/pulsar/scripts/run.py b/pulsar/scripts/run.py index ea6be32e2539b0b63ac539ea51dc31977a7eac2b..9d27076dc17d4b4ced969234b8f947ba01b66660 100644 --- a/pulsar/scripts/run.py +++ b/pulsar/scripts/run.py @@ -4,29 +4,30 @@ import fnmatch import sys import uuid -from pulsar.main import ArgumentParser -from pulsar.scripts.submit_util import ( - add_common_submit_args, - run_server_for_job, +from pulsar.client import ( + ClientJobDescription, + ClientOutputs, + finish_job, + PulsarOutputs, + submit_job, ) - from pulsar.client.test.check import ( - HELP_URL, - HELP_PRIVATE_TOKEN, - HELP_TRANSPORT, - HELP_SUPPRESS_OUTPUT, + client_info, + extract_client_options, HELP_DISABLE_CLEANUP, HELP_JOB_ID, - extract_client_options, - client_info, + HELP_PRIVATE_TOKEN, + HELP_SUPPRESS_OUTPUT, + HELP_TRANSPORT, + HELP_URL, Waiter, ) -from pulsar.client import ClientJobDescription -from pulsar.client import ClientOutputs -from pulsar.client import PulsarOutputs -from pulsar.client import submit_job -from pulsar.client import finish_job from pulsar.client.util import json_dumps +from pulsar.main import ArgumentParser +from pulsar.scripts.submit_util import ( + add_common_submit_args, + run_server_for_job, +) HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL." HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one." diff --git a/pulsar/scripts/submit.py b/pulsar/scripts/submit.py index 2fdbe2c00b8f81659fd8bc37d5e878dfe2cadc4f..626981b239bd87bc6cb1986b67377fd6ebd8067d 100644 --- a/pulsar/scripts/submit.py +++ b/pulsar/scripts/submit.py @@ -4,8 +4,8 @@ import sys from pulsar.main import ArgumentParser from pulsar.scripts.submit_util import ( - run_server_for_job, add_common_submit_args, + run_server_for_job, ) diff --git a/pulsar/scripts/submit_util.py b/pulsar/scripts/submit_util.py index 04f71973be5954eca9197e9a6a06267324121671..2dfc1bbacf04b22334df2b094746fb7306e8bd3a 100644 --- a/pulsar/scripts/submit_util.py +++ b/pulsar/scripts/submit_util.py @@ -7,7 +7,7 @@ import time from pulsar.client.util import from_base64_json from pulsar.main import ( load_pulsar_app, - PulsarManagerConfigBuilder + PulsarManagerConfigBuilder, ) from pulsar.manager_endpoint_util import submit_job from pulsar.managers.status import is_job_done diff --git a/pulsar/tools/toolbox.py b/pulsar/tools/toolbox.py index a48c05198cbd756bf76346b56dd5c18778d7d8d5..97fbe5b92240f2f3a966e571f504d0be4fd8b00b 100644 --- a/pulsar/tools/toolbox.py +++ b/pulsar/tools/toolbox.py @@ -1,10 +1,13 @@ from logging import getLogger -from os.path import abspath, dirname, join +from os.path import ( + abspath, + dirname, + join, +) from xml.etree import ElementTree from pulsar.tools.validator import ExpressionValidator - log = getLogger(__name__) diff --git a/pulsar/tools/validator.py b/pulsar/tools/validator.py index a11fc40a35e6aedb066d0fb94f5487bac16f3956..727d128f924e22a829de97b885db2167ab13ae9d 100644 --- a/pulsar/tools/validator.py +++ b/pulsar/tools/validator.py @@ -1,5 +1,8 @@ from os.path import join -from re import compile, escape +from re import ( + compile, + escape, +) from xml.etree.ElementTree import fromstring from galaxy.util import in_directory diff --git a/pulsar/util/pastescript/loadwsgi.py b/pulsar/util/pastescript/loadwsgi.py index 635c0cfbd3ae7a6c3574917a3be1940dc6004fe0..5270cb8ea6f00eb604ef986f3dcfacb254b2d49b 100644 --- a/pulsar/util/pastescript/loadwsgi.py +++ b/pulsar/util/pastescript/loadwsgi.py @@ -7,13 +7,18 @@ import inspect import os import re import sys -from typing import Callable, Dict, List, Optional, Union +from typing import ( + Callable, + Dict, + List, + Optional, + Union, +) from urllib.parse import unquote import pkg_resources from galaxy.util.properties import NicerConfigParser - __all__ = ('loadapp', 'loadserver', 'loadfilter', 'appconfig') # ---- from paste.deploy.compat -------------------------------------- diff --git a/pulsar/util/pastescript/serve.py b/pulsar/util/pastescript/serve.py index 1287731b26c604ff7fca26cab8b2d99411aad152..5eee811e1b70921c8642627e5e7ae1c675807189 100644 --- a/pulsar/util/pastescript/serve.py +++ b/pulsar/util/pastescript/serve.py @@ -38,8 +38,10 @@ from gettext import gettext as _ from logging.config import fileConfig from typing import Optional -from .loadwsgi import loadapp, loadserver - +from .loadwsgi import ( + loadapp, + loadserver, +) difflib = None diff --git a/pulsar/web/framework.py b/pulsar/web/framework.py index c19da7b33e2e6192edfeb1610d6eb1d591d211f7..20fb7164f5b9310630ef358dc9e4a0109490849e 100644 --- a/pulsar/web/framework.py +++ b/pulsar/web/framework.py @@ -6,9 +6,11 @@ import inspect import re from os.path import exists -from webob import Request -from webob import Response -from webob import exc +from webob import ( + exc, + Request, + Response, +) from pulsar.client.util import json_dumps @@ -183,6 +185,6 @@ class FileIterator: def __next__(self): buffer = self.input.read(1024) - if(buffer == b""): + if buffer == b"": raise StopIteration return buffer diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index 632dc8838fbdf71acb6cb4ab2c2955f703744ad8..4d6b075628a08c99ec9bed04ba8ff0e702e1f622 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -1,22 +1,21 @@ import logging import os - from json import loads from webob import exc -from pulsar.util import ( - copy_to_path, - copy_to_temp, -) -from pulsar.client.job_directory import verify_is_in_directory from pulsar.client.action_mapper import path_type +from pulsar.client.job_directory import verify_is_in_directory from pulsar.manager_endpoint_util import ( setup_job, status_dict, submit_job, ) from pulsar.manager_factory import DEFAULT_MANAGER_NAME +from pulsar.util import ( + copy_to_path, + copy_to_temp, +) from pulsar.web.framework import Controller log = logging.getLogger(__name__) diff --git a/pulsar/web/wsgi.py b/pulsar/web/wsgi.py index c3fa2370072a11e2ca5c21084a8080d323ccd7af..3bd7643cd6938a40305624456e61e6d301fbb1e7 100644 --- a/pulsar/web/wsgi.py +++ b/pulsar/web/wsgi.py @@ -2,12 +2,11 @@ import atexit import inspect import logging -from pulsar.main import load_app_configuration +import pulsar.web.routes from pulsar.core import PulsarApp +from pulsar.main import load_app_configuration from pulsar.web.framework import RoutingApp -import pulsar.web.routes - log = logging.getLogger(__name__) diff --git a/requirements.txt b/requirements.txt index 0737ad33939fee89c54ea5c03fbbeef712733f5e..803bd196ac7203ed52fa6f633d51cd192806b158 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ galaxy-objectstore>=19.9.0.dev1 galaxy-tool-util>=19.9.0.dev3 galaxy-util>=19.9.0.dev1 paramiko +typing-extensions ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/setup.cfg b/setup.cfg index 9816801927789b6f7a0d820f9309d1af6a0b09f2..d5f9f897ee6c73d78bd3de73ea77b283cbdb4790 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,6 +12,8 @@ max-complexity = 14 exclude = pulsar/util/pastescript import-order-style = smarkets application-import-names = pulsar +# E203 is whitespace before ':'; we follow black's formatting here. See https://black.readthedocs.io/en/stable/faq.html#why-are-flake8-s-e203-and-w503-violated +extend-ignore = E203 [bdist_wheel] universal = 1