Skip to content
Snippets Groups Projects
integration_test.py 11.9 KiB
Newer Older
import configparser
from os import environ, makedirs, system
from os.path import join
from typing import (
    Any,
    Dict,
    Optional,
)

from galaxy.util.bunch import Bunch

from pulsar.client.test.check import run
from .test_utils import (
    files_server,
    integration_test,
    skip_unless_any_module,
    skip_unless_environ,
    skip_unless_executable,
    skip_unless_module,
    skip_without_drmaa,
    TempDirectoryTestCase,
    test_pulsar_app,
    test_pulsar_server,
class BaseIntegrationTest(TempDirectoryTestCase):
    def _run(self, app_conf: Optional[Dict[str, Any]] = None, job_conf_props: Optional[Dict[str, str]] = None, **kwds):
        app_conf = app_conf or {}
        job_conf_props = job_conf_props or {}

        app_conf = app_conf.copy()
        job_conf_props = job_conf_props.copy()
            kwds["suppress_output"] = False
        self.__setup_job_properties(app_conf, job_conf_props)
        self.__setup_dependencies(app_conf)
    def _run_in_app(self, app_conf: Dict[str, Any], direct_interface: bool = False, inject_files_endpoint: bool = False, **kwds):
        inject_files_endpoint = direct_interface or inject_files_endpoint
        if inject_files_endpoint:
            # Client directory hasn't bee created yet, don't restrict where
            # test files written.
            # Can only run tests using files_server if not constructing a test
            # server for Pulsar - webtest doesn't seem to like having two test
            # servers alive at same time.
            with files_server("/") as test_files_server:
                files_endpoint = to_infrastructure_uri(test_files_server.application_url)
                if direct_interface:
                    self._run_direct(app_conf, files_endpoint=files_endpoint, **kwds)
                else:
                    self._run_in_test_server(app_conf, files_endpoint=files_endpoint, **kwds)
            self._run_in_test_server(app_conf, **kwds)

    def _run_in_test_server(self, app_conf, **kwds):
        with test_pulsar_server(app_conf=app_conf) as server:
            options = Bunch(url=server.application_url, **kwds)
John Chilton's avatar
John Chilton committed
            # TODO: sync Py 2 v 3 approach so following hack is unneeded.
            app = server.test_app
            if hasattr(app, "application"):
                app = app.application
            self._update_options_for_app(options, app, **kwds)
            run(options)

    def _run_direct(self, app_conf, **kwds):
        with test_pulsar_app({}, app_conf, {}) as app:
            options = Bunch(job_manager=next(iter(app.app.managers.values())), file_cache=app.app.file_cache, **kwds)
            self._update_options_for_app(options, app.app, **kwds)
            run(options)

    def _update_options_for_app(self, options, app, **kwds):
        if kwds.get("local_setup", False):
            staging_directory = app.staging_directory
            if kwds.get("k8s_enabled"):
                options["jobs_directory"] = "/pulsar_staging"
            else:
                options["jobs_directory"] = staging_directory
    def __setup_job_properties(self, app_conf, job_conf_props: Dict[str, str]):
        if job_conf_props:
            job_conf = join(self.temp_directory, "job_managers.ini")
            config = configparser.ConfigParser()
            section_name = "manager:_default_"
            config.add_section(section_name)
John Chilton's avatar
John Chilton committed
            for key, value in job_conf_props.items():
                config.set(section_name, key, value)
John Chilton's avatar
John Chilton committed
            with open(job_conf, "w") as configf:
                config.write(configf)

            app_conf["job_managers_config"] = job_conf

    def __setup_dependencies(self, app_conf):
        dependencies_dir = join(self.temp_directory, "dependencies")
        dep1_directory = join(dependencies_dir, "dep1", "1.1")
        makedirs(dep1_directory)
        try:
            # Let external users read/execute this directory for run as user
            # test.
            system("chmod 755 %s" % self.temp_directory)
            system("chmod -R 755 %s" % dependencies_dir)
        except Exception as e:
John Chilton's avatar
John Chilton committed
            print(e)
        env_file = join(dep1_directory, "env.sh")
        with open(env_file, "w") as env:
            env.write("MOO=moo_override; export MOO")
        app_conf["tool_dependency_dir"] = dependencies_dir

    default_kwargs = dict(direct_interface=False, test_requirement=True, test_unicode=True, test_env=True, test_rewrite_action=True)
    @integration_test
    def test_integration_no_requirement(self):
        self._run(private_token=None, **self.default_kwargs)
    @integration_test
    def test_integration_maximum_stream_size(self):
        app_conf = dict(maximum_stream_size=4)
        self._run(app_conf=app_conf, private_token=None, maximum_stream_size=4, **self.default_kwargs)

    @skip_without_drmaa
    @integration_test
    def test_integration_as_user(self):
        job_props = {'type': 'queued_external_drmaa', "production": "false"}
        self._run(job_conf_props=job_props, private_token=None, default_file_action="copy", user='u1', **self.default_kwargs)
    @integration_test
    def test_integration_local_setup(self):
        self._run(private_token=None, default_file_action="remote_copy", local_setup=True, **self.default_kwargs)
    @skip_unless_module("kombu")
    @integration_test
    def test_message_queue(self):
John Chilton's avatar
John Chilton committed
        self._run(
            app_conf=dict(message_queue_url="memory://test1"),
            private_token=None,
            default_file_action="remote_copy",
            local_setup=True,
            manager_url="memory://test1",
            **self.default_kwargs
        )
    @skip_unless_environ("PULSAR_TEST_KEY")
    @skip_unless_module("kombu")
    @integration_test
    def test_integration_scp(self):
        self._run(
            app_conf=dict(message_queue_url="memory://test2"),
            private_token=None,
            default_file_action="remote_scp_transfer",
            local_setup=True,
            manager_url="memory://test2",
            **self.default_kwargs
        )

    @skip_unless_environ("PULSAR_TEST_KEY")
    @skip_unless_module("kombu")
    @integration_test
    def test_integration_rsync(self):
        self._run(
            app_conf=dict(message_queue_url="memory://test3"),
            private_token=None,
            default_file_action="remote_rsync_transfer",
            local_setup=True,
            manager_url="memory://test3",
    @integration_test
    def test_integration_copy(self):
        self._run(private_token=None, default_file_action="copy", **self.default_kwargs)
    @integration_test
    def test_integration_no_transfer(self):
        self._run(private_token=None, default_file_action="none", **self.default_kwargs)
    @integration_test
        self._run(private_token=None, cache=True, **self.default_kwargs)
    @integration_test
    def test_integration_legacy_galaxy_json(self):
        self._run(private_token=None, legacy_galaxy_json=True, **self.default_kwargs)

    @integration_test
        self._run(private_token=None, **self.default_kwargs)
    @skip_unless_module("pycurl")
    @integration_test
        self._run(private_token=None, transport="curl", **self.default_kwargs)
    @integration_test
    def test_integration_explicit_tool_directory_includes(self):
        self._run(private_token=None, explicit_tool_declarations=True, **self.default_kwargs)

    @integration_test
    def test_integration_token(self):
        self._run(app_conf={"private_token": "testtoken"}, private_token="testtoken", **self.default_kwargs)
    @integration_test
    def test_integration_errors(self):
        self._run(app_conf={"private_token": "testtoken"}, private_token="testtoken", test_errors=True, **self.default_kwargs)
    @skip_without_drmaa
    @integration_test
    def test_integration_drmaa(self):
        self._run(app_conf={}, job_conf_props={'type': 'queued_drmaa'}, private_token=None, **self.default_kwargs)
    @skip_unless_executable("condor_submit")
    @integration_test
        self._run(app_conf={}, job_conf_props={'type': 'queued_condor'}, private_token=None, **self.default_kwargs)
    @skip_unless_executable("qsub")
    @integration_test
John Chilton's avatar
John Chilton committed
    def test_integration_cli_torque(self):
        self._run(app_conf={}, job_conf_props={'type': 'queued_cli', 'job_plugin': 'Torque'}, private_token=None, **self.default_kwargs)
    @skip_unless_executable("sbatch")
    @integration_test
John Chilton's avatar
John Chilton committed
    def test_integration_cli_slurm(self):
        self._run(app_conf={}, job_conf_props={'type': 'queued_cli', 'job_plugin': 'Slurm'}, private_token=None, **self.default_kwargs)

class ExternalQueueIntegrationTests(IntegrationTests):
    default_kwargs = dict(direct_interface=False, test_requirement=False, test_unicode=True, test_env=True, test_rewrite_action=True)

    @integration_test
    @skip_unless_environ("PULSAR_RABBIT_MQ_CONNECTION")
    def test_integration_external_rabbit(self):
        # e.g. amqp://guest:guest@localhost:5672//
        # TODO: nc docker.for.mac.localhost 5679
        message_queue_url = environ.get("PULSAR_RABBIT_MQ_CONNECTION")
        self._run(
            app_conf=dict(message_queue_url=message_queue_url),
            private_token=None,
            local_setup=True,
            default_file_action="remote_transfer",
            manager_url=message_queue_url,
            inject_files_endpoint=True,
            **self.default_kwargs
        )

    # PULSAR_RABBIT_MQ_CONNECTION="amqp://guest:guest@localhost:5672"
    # PULSAR_TEST_INFRASTRUCTURE_HOST="docker.for.mac.localhost"
    # Setup MQ and expose it on 0.0.0.0 by setting NODE_IP_ADDRESS= to empty string
    @integration_test
    @skip_unless_environ("PULSAR_RABBIT_MQ_CONNECTION")
    def test_integration_kubernetes(self):
        message_queue_url = environ.get("PULSAR_RABBIT_MQ_CONNECTION")
        remote_pulsar_app_config = {
            "staging_directory": "/pulsar_staging/",
            "message_queue_url": to_infrastructure_uri(message_queue_url),
            "manager": {
                "type": "coexecution",
            }
        }
        self._run(
            app_conf=dict(message_queue_url=message_queue_url),
            private_token=None,
            local_setup=True,
            default_file_action="remote_transfer",
            manager_url=message_queue_url,
            inject_files_endpoint=True,
            k8s_enabled=True,
            container="conda/miniconda3",
            remote_pulsar_app_config=remote_pulsar_app_config,
            **self.default_kwargs
        )


class DirectIntegrationTests(IntegrationTests):
    default_kwargs = dict(direct_interface=True, test_requirement=False)
    @skip_unless_any_module(["pycurl", "poster", "requests_toolbelt"])
    @integration_test
    def test_integration_remote_transfer(self):
        self._run(
            private_token=None,
            local_setup=True,
            default_file_action="remote_transfer",
            **self.default_kwargs
        )


def to_infrastructure_uri(uri):
    # remap MQ or file server URI hostnames for in-container versions, this is sloppy
    # should actually parse the URI and rebuild with correct host
    infrastructure_host = environ.get("PULSAR_TEST_INFRASTRUCTURE_HOST")
    infrastructure_uri = uri
    if infrastructure_host:
        if "0.0.0.0" in infrastructure_uri:
            infrastructure_uri = infrastructure_uri.replace("0.0.0.0", infrastructure_host)
        elif "localhost" in infrastructure_uri:
            infrastructure_uri = infrastructure_uri.replace("localhost", infrastructure_host)
        elif "127.0.0.1" in infrastructure_uri:
            infrastructure_uri = infrastructure_uri.replace("127.0.0.1", infrastructure_host)
    return infrastructure_uri