Newer
Older
from typing import (
Any,
Dict,
Optional,
)
from galaxy.util.bunch import Bunch
from pulsar.client.test.check import run
skip_unless_any_module,
skip_unless_environ,
TempDirectoryTestCase,
test_pulsar_app,
test_pulsar_server,
John Chilton
committed
John Chilton
committed
class BaseIntegrationTest(TempDirectoryTestCase):
John Chilton
committed
def _run(self, app_conf: Optional[Dict[str, Any]] = None, job_conf_props: Optional[Dict[str, str]] = None, **kwds):
app_conf = app_conf or {}
job_conf_props = job_conf_props or {}
John Chilton
committed
app_conf = app_conf.copy()
job_conf_props = job_conf_props.copy()
John Chilton
committed
if "suppress_output" not in kwds:
kwds["suppress_output"] = False
John Chilton
committed
self.__setup_job_properties(app_conf, job_conf_props)
self.__setup_dependencies(app_conf)
self._run_in_app(app_conf, **kwds)
John Chilton
committed
def _run_in_app(self, app_conf: Dict[str, Any], direct_interface: bool = False, inject_files_endpoint: bool = False, **kwds):
inject_files_endpoint = direct_interface or inject_files_endpoint
# Client directory hasn't bee created yet, don't restrict where
# test files written.
# Can only run tests using files_server if not constructing a test
# server for Pulsar - webtest doesn't seem to like having two test
# servers alive at same time.
with files_server("/") as test_files_server:
files_endpoint = to_infrastructure_uri(test_files_server.application_url)
if direct_interface:
self._run_direct(app_conf, files_endpoint=files_endpoint, **kwds)
else:
self._run_in_test_server(app_conf, files_endpoint=files_endpoint, **kwds)
John Chilton
committed
else:
self._run_in_test_server(app_conf, **kwds)
def _run_in_test_server(self, app_conf, **kwds):
with test_pulsar_server(app_conf=app_conf) as server:
options = Bunch(url=server.application_url, **kwds)
# TODO: sync Py 2 v 3 approach so following hack is unneeded.
app = server.test_app
if hasattr(app, "application"):
app = app.application
self._update_options_for_app(options, app, **kwds)
run(options)
def _run_direct(self, app_conf, **kwds):
with test_pulsar_app({}, app_conf, {}) as app:
options = Bunch(job_manager=next(iter(app.app.managers.values())), file_cache=app.app.file_cache, **kwds)
self._update_options_for_app(options, app.app, **kwds)
run(options)
def _update_options_for_app(self, options, app, **kwds):
if kwds.get("local_setup", False):
staging_directory = app.staging_directory
if kwds.get("k8s_enabled"):
options["jobs_directory"] = "/pulsar_staging"
else:
options["jobs_directory"] = staging_directory
John Chilton
committed
def __setup_job_properties(self, app_conf, job_conf_props: Dict[str, str]):
if job_conf_props:
job_conf = join(self.temp_directory, "job_managers.ini")
config = configparser.ConfigParser()
section_name = "manager:_default_"
config.add_section(section_name)
config.set(section_name, key, value)
config.write(configf)
app_conf["job_managers_config"] = job_conf
def __setup_dependencies(self, app_conf):
dependencies_dir = join(self.temp_directory, "dependencies")
dep1_directory = join(dependencies_dir, "dep1", "1.1")
makedirs(dep1_directory)
try:
# Let external users read/execute this directory for run as user
# test.
system("chmod 755 %s" % self.temp_directory)
system("chmod -R 755 %s" % dependencies_dir)
except Exception as e:
env_file = join(dep1_directory, "env.sh")
with open(env_file, "w") as env:
env.write("MOO=moo_override; export MOO")
app_conf["tool_dependency_dir"] = dependencies_dir
John Chilton
committed
class IntegrationTests(BaseIntegrationTest):
default_kwargs = dict(direct_interface=False, test_requirement=True, test_unicode=True, test_env=True, test_rewrite_action=True)
def test_integration_no_requirement(self):
self._run(private_token=None, **self.default_kwargs)
John Chilton
committed
@integration_test
def test_integration_maximum_stream_size(self):
app_conf = dict(maximum_stream_size=4)
self._run(app_conf=app_conf, private_token=None, maximum_stream_size=4, **self.default_kwargs)
def test_integration_as_user(self):
job_props = {'type': 'queued_external_drmaa', "production": "false"}
self._run(job_conf_props=job_props, private_token=None, default_file_action="copy", user='u1', **self.default_kwargs)
def test_integration_local_setup(self):
self._run(private_token=None, default_file_action="remote_copy", local_setup=True, **self.default_kwargs)
self._run(
app_conf=dict(message_queue_url="memory://test1"),
private_token=None,
default_file_action="remote_copy",
local_setup=True,
manager_url="memory://test1",
**self.default_kwargs
)
@skip_unless_environ("PULSAR_TEST_KEY")
@skip_unless_module("kombu")
@integration_test
def test_integration_scp(self):
self._run(
app_conf=dict(message_queue_url="memory://test2"),
private_token=None,
default_file_action="remote_scp_transfer",
local_setup=True,
**self.default_kwargs
)
@skip_unless_environ("PULSAR_TEST_KEY")
@skip_unless_module("kombu")
@integration_test
def test_integration_rsync(self):
self._run(
app_conf=dict(message_queue_url="memory://test3"),
private_token=None,
default_file_action="remote_rsync_transfer",
local_setup=True,
**self.default_kwargs
)
def test_integration_copy(self):
John Chilton
committed
self._run(private_token=None, default_file_action="copy", **self.default_kwargs)
John Chilton
committed
def test_integration_no_transfer(self):
John Chilton
committed
self._run(private_token=None, default_file_action="none", **self.default_kwargs)
John Chilton
committed
John Chilton
committed
def test_integration_cached(self):
John Chilton
committed
self._run(private_token=None, cache=True, **self.default_kwargs)
John Chilton
committed
John Chilton
committed
@integration_test
def test_integration_legacy_galaxy_json(self):
self._run(private_token=None, legacy_galaxy_json=True, **self.default_kwargs)
John Chilton
committed
def test_integration_default(self):
John Chilton
committed
self._run(private_token=None, **self.default_kwargs)
John Chilton
committed
John Chilton
committed
def test_integration_curl(self):
John Chilton
committed
self._run(private_token=None, transport="curl", **self.default_kwargs)
John Chilton
committed
@integration_test
def test_integration_explicit_tool_directory_includes(self):
self._run(private_token=None, explicit_tool_declarations=True, **self.default_kwargs)
def test_integration_token(self):
self._run(app_conf={"private_token": "testtoken"}, private_token="testtoken", **self.default_kwargs)
def test_integration_errors(self):
self._run(app_conf={"private_token": "testtoken"}, private_token="testtoken", test_errors=True, **self.default_kwargs)
def test_integration_drmaa(self):
John Chilton
committed
self._run(app_conf={}, job_conf_props={'type': 'queued_drmaa'}, private_token=None, **self.default_kwargs)
John Chilton
committed
John Chilton
committed
def test_integration_condor(self):
John Chilton
committed
self._run(app_conf={}, job_conf_props={'type': 'queued_condor'}, private_token=None, **self.default_kwargs)
John Chilton
committed
self._run(app_conf={}, job_conf_props={'type': 'queued_cli', 'job_plugin': 'Torque'}, private_token=None, **self.default_kwargs)
John Chilton
committed
def test_integration_cli_slurm(self):
self._run(app_conf={}, job_conf_props={'type': 'queued_cli', 'job_plugin': 'Slurm'}, private_token=None, **self.default_kwargs)
class ExternalQueueIntegrationTests(IntegrationTests):
default_kwargs = dict(direct_interface=False, test_requirement=False, test_unicode=True, test_env=True, test_rewrite_action=True)
@integration_test
@skip_unless_environ("PULSAR_RABBIT_MQ_CONNECTION")
def test_integration_external_rabbit(self):
# e.g. amqp://guest:guest@localhost:5672//
# TODO: nc docker.for.mac.localhost 5679
message_queue_url = environ.get("PULSAR_RABBIT_MQ_CONNECTION")
self._run(
app_conf=dict(message_queue_url=message_queue_url),
private_token=None,
local_setup=True,
default_file_action="remote_transfer",
manager_url=message_queue_url,
inject_files_endpoint=True,
**self.default_kwargs
)
# PULSAR_RABBIT_MQ_CONNECTION="amqp://guest:guest@localhost:5672"
# PULSAR_TEST_INFRASTRUCTURE_HOST="docker.for.mac.localhost"
# Setup MQ and expose it on 0.0.0.0 by setting NODE_IP_ADDRESS= to empty string
@integration_test
@skip_unless_environ("PULSAR_RABBIT_MQ_CONNECTION")
def test_integration_kubernetes(self):
message_queue_url = environ.get("PULSAR_RABBIT_MQ_CONNECTION")
remote_pulsar_app_config = {
"staging_directory": "/pulsar_staging/",
"message_queue_url": to_infrastructure_uri(message_queue_url),
"manager": {
"type": "coexecution",
}
}
self._run(
app_conf=dict(message_queue_url=message_queue_url),
private_token=None,
local_setup=True,
default_file_action="remote_transfer",
manager_url=message_queue_url,
inject_files_endpoint=True,
k8s_enabled=True,
remote_pulsar_app_config=remote_pulsar_app_config,
**self.default_kwargs
)
John Chilton
committed
class DirectIntegrationTests(IntegrationTests):
default_kwargs = dict(direct_interface=True, test_requirement=False)
@skip_unless_any_module(["pycurl", "poster", "requests_toolbelt"])
def test_integration_remote_transfer(self):
self._run(
private_token=None,
local_setup=True,
default_file_action="remote_transfer",
**self.default_kwargs
)
def to_infrastructure_uri(uri):
# remap MQ or file server URI hostnames for in-container versions, this is sloppy
# should actually parse the URI and rebuild with correct host
infrastructure_host = environ.get("PULSAR_TEST_INFRASTRUCTURE_HOST")
infrastructure_uri = uri
if infrastructure_host:
if "0.0.0.0" in infrastructure_uri:
infrastructure_uri = infrastructure_uri.replace("0.0.0.0", infrastructure_host)
elif "localhost" in infrastructure_uri:
infrastructure_uri = infrastructure_uri.replace("localhost", infrastructure_host)
elif "127.0.0.1" in infrastructure_uri:
infrastructure_uri = infrastructure_uri.replace("127.0.0.1", infrastructure_host)
return infrastructure_uri