Newer
Older
from os.path import join
from os import makedirs, system
from six import next, itervalues
from six.moves import configparser
John Chilton
committed
from .test_utils import TempDirectoryTestCase, skipUnlessExecutable, skipUnlessModule
John Chilton
committed
from galaxy.util.bunch import Bunch
John Chilton
committed
from .check import run
John Chilton
committed
class BaseIntegrationTest(TempDirectoryTestCase):
John Chilton
committed
John Chilton
committed
def _run(self, app_conf={}, job_conf_props={}, **kwds):
app_conf = app_conf.copy()
job_conf_props = job_conf_props.copy()
John Chilton
committed
if "suppress_output" not in kwds:
kwds["suppress_output"] = False
John Chilton
committed
self.__setup_job_properties(app_conf, job_conf_props)
self.__setup_dependencies(app_conf)
John Chilton
committed
def update_options_for_app(options, app):
if kwds.get("local_setup", False):
staging_directory = app.staging_directory
options["jobs_directory"] = staging_directory
John Chilton
committed
if kwds.get("direct_interface", None):
from .test_utils import test_app
with test_app({}, app_conf, {}) as app:
options = Bunch(job_manager=next(itervalues(app.app.managers)), file_cache=app.app.file_cache, **kwds)
update_options_for_app(options, app.app)
John Chilton
committed
run(options)
else:
from .test_utils import test_server
with test_server(app_conf=app_conf) as server:
options = Bunch(url=server.application_url, **kwds)
update_options_for_app(options, server.test_app.application)
John Chilton
committed
run(options)
def __setup_job_properties(self, app_conf, job_conf_props):
if job_conf_props:
job_conf = join(self.temp_directory, "job_managers.ini")
config = configparser.ConfigParser()
section_name = "manager:_default_"
config.add_section(section_name)
for key, value in job_conf_props.iteritems():
config.set(section_name, key, value)
with open(job_conf, "wb") as configf:
config.write(configf)
app_conf["job_managers_config"] = job_conf
def __setup_dependencies(self, app_conf):
dependencies_dir = join(self.temp_directory, "dependencies")
dep1_directory = join(dependencies_dir, "dep1", "1.1")
makedirs(dep1_directory)
try:
# Let external users read/execute this directory for run as user
# test.
system("chmod 755 %s" % self.temp_directory)
system("chmod -R 755 %s" % dependencies_dir)
except Exception as e:
print e
env_file = join(dep1_directory, "env.sh")
with open(env_file, "w") as env:
env.write("MOO=moo_override; export MOO")
app_conf["tool_dependency_dir"] = dependencies_dir
John Chilton
committed
class IntegrationTests(BaseIntegrationTest):
default_kwargs = dict(direct_interface=False, test_requirement=True)
def test_integration_no_requirement(self):
self._run(private_token=None, **self.default_kwargs)
John Chilton
committed
@skipUnlessModule("drmaa")
def test_integration_as_user(self):
job_props = {'type': 'queued_external_drmaa', "production": "false"}
self._run(job_conf_props=job_props, private_token=None, default_file_action="copy", user='u1', **self.default_kwargs)
# Still doesn't work because trying of inputs handling...
def test_integration_local_setup(self):
self._run(private_token=None, default_file_action="remote_copy", local_setup=True, **self.default_kwargs)
def test_integration_copy(self):
John Chilton
committed
self._run(private_token=None, default_file_action="copy", **self.default_kwargs)
John Chilton
committed
def test_integration_no_transfer(self):
John Chilton
committed
self._run(private_token=None, default_file_action="none", **self.default_kwargs)
John Chilton
committed
John Chilton
committed
def test_integration_cached(self):
John Chilton
committed
self._run(private_token=None, cache=True, **self.default_kwargs)
John Chilton
committed
def test_integration_default(self):
John Chilton
committed
self._run(private_token=None, **self.default_kwargs)
John Chilton
committed
@skipUnlessModule("pycurl")
John Chilton
committed
def test_integration_curl(self):
John Chilton
committed
self._run(private_token=None, transport="curl", **self.default_kwargs)
John Chilton
committed
def test_integration_token(self):
self._run(app_conf={"private_key": "testtoken"}, private_token="testtoken", **self.default_kwargs)
def test_integration_errors(self):
self._run(app_conf={"private_key": "testtoken"}, private_token="testtoken", test_errors=True, **self.default_kwargs)
John Chilton
committed
@skipUnlessModule("drmaa")
def test_integration_drmaa(self):
John Chilton
committed
self._run(app_conf={}, job_conf_props={'type': 'queued_drmaa'}, private_token=None, **self.default_kwargs)
John Chilton
committed
John Chilton
committed
@skipUnlessExecutable("condor_submit")
John Chilton
committed
def test_integration_condor(self):
John Chilton
committed
self._run(app_conf={}, job_conf_props={'type': 'queued_condor'}, private_token=None, **self.default_kwargs)
@skipUnlessExecutable("qsub")
John Chilton
committed
def test_integration_cli(self):
John Chilton
committed
self._run(app_conf={}, job_conf_props={'type': 'queued_cli', 'job_plugin': 'Torque'}, private_token=None, **self.default_kwargs)
John Chilton
committed
John Chilton
committed
class DirectIntegrationTests(IntegrationTests):
default_kwargs = dict(direct_interface=True, test_requirement=False)