diff --git a/.coveragerc b/.coveragerc index 2875d8da6a21cab6a7544517a3a9f1aeee55fdf7..6ab516934e50469064878604ec48628b7b01f2d2 100644 --- a/.coveragerc +++ b/.coveragerc @@ -11,4 +11,4 @@ exclude_lines = ignore_errors = True [html] -directory = coverage_html_report \ No newline at end of file +directory = coverage_html_report diff --git a/lwr/lwr_client/client.py b/lwr/lwr_client/client.py index bd717e0db3d35d57dd46e4fe7c290dc6e5b29640..bf3f629157a30b2b34f03e82e83d931e5b486e98 100644 --- a/lwr/lwr_client/client.py +++ b/lwr/lwr_client/client.py @@ -278,17 +278,17 @@ class InputCachingClient(Client): input_path = None return self._raw_execute(action, args, contents, input_path) else: + event_holder = self.client_manager.event_manager.acquire_event(input_path) cache_required = self.cache_required(input_path) if cache_required: self.client_manager.queue_transfer(self, input_path) while True: - # Use Conditions to make sleep a timed wait. available = self.file_available(input_path) if available['ready']: token = available['token'] args["cache_token"] = token return self._raw_execute(action, args) - sleep(CACHE_WAIT_SECONDS) + event_holder.event.wait(30) @parseJson() def cache_required(self, path): diff --git a/lwr/lwr_client/manager.py b/lwr/lwr_client/manager.py index 1f50bf42ff2b3f122c256dbef85f10e73e4f08ef..957a4c1a490a3e170cd300c2828689eb88035036 100644 --- a/lwr/lwr_client/manager.py +++ b/lwr/lwr_client/manager.py @@ -3,6 +3,7 @@ from threading import Thread from .client import Client, InputCachingClient from .transport import get_transport +from .util import TransferEventManager DEFAULT_TRANSFER_THREADS = 2 @@ -14,6 +15,7 @@ class ClientManager(object): """ def __init__(self, **kwds): self.transport = get_transport(kwds.get('transport_type', None)) + self.event_manager = TransferEventManager() cache = kwds.get('cache', False) if cache: self.client_class = InputCachingClient @@ -33,7 +35,9 @@ class ClientManager(object): def __perform_transfer(self, transfer_info): (client, path) = transfer_info + event_holder = self.event_manager.acquire_event(path, force_clear=True) client.cache_insert(path) + event_holder.event.set() def __init_transfer_threads(self, num_transfer_threads): self.transfer_queue = Queue() diff --git a/lwr/lwr_client/util.py b/lwr/lwr_client/util.py index fdd6e64320b06fc3bcc224b562630b07847b920e..82a46da89b96e672587c0f8749f8376f6c6f35f5 100644 --- a/lwr/lwr_client/util.py +++ b/lwr/lwr_client/util.py @@ -1,40 +1,37 @@ -from threading import Lock, Condition +from threading import Lock, Event -class ConditionManager(object): +class TransferEventManager(object): def __init__(self): - self.__conditions = dict() - self.__conditions_count = dict() - self.__conditions_lock = Lock() - - def acquire_condition(self, path): - with self.__conditions_lock: - if path in self.__conditions: - condition = self.__conditions[path] - cur_count = self.__conditions_count[path] - self.__conditions_count[path] = cur_count + 1 + self.events = dict() + self.events_lock = Lock() + + def acquire_event(self, path, force_clear=False): + with self.events_lock: + if path in self.events: + event_holder = self.events[path] else: - condition = Condition() - self.__conditions[path] = condition - self.__conditions_count[path] = 1 - return condition + event_holder = EventHolder(Event(), path, self) + self.events[path] = event_holder + if force_clear: + event_holder.event.clear() + return event_holder - def release_condition(self, path): - with self.__conditions_lock: - cur_count = self.__conditions_count[path] - self.__conditions_count[path] = cur_count - 1 - if cur_count == 0: - del self.__conditions_count[path] - del self.__conditions[path] + def free_event(self, path): + with self.events_lock: + del self.events[path] -class ConditionHolder(object): +class EventHolder(object): - def __init__(self, condition, path, condition_manager): - self.condition = condition + def __init__(self, event, path, condition_manager): + self.event = event self.path = path self.condition_manager = condition_manager + def release(self): + self.event.set() + def __del__(self): - self.condition_manager.release_condition(self.path) + self.condition_manager.free_event(self.path) diff --git a/run_client_tests.py b/run_client_tests.py index 77efdc99e1cddfb68229b0221aeaae19329a139c..82fc4f1f99302b7d92bf25a302bcbef48bceeb45 100644 --- a/run_client_tests.py +++ b/run_client_tests.py @@ -1,102 +1,4 @@ -import shutil -import tempfile -import os -import optparse -import traceback - -from lwr.lwr_client import ClientManager -from lwr.lwr_client import FileStager - - -class MockTool(object): - - def __init__(self, tool_dir): - self.id = "client_test" - self.version = "1.0" - self.tool_dir = tool_dir - - -def main(): - """ Exercises a running lwr server application with the lwr client. """ - parser = optparse.OptionParser() - parser.add_option('--url', dest='url', default='http://localhost:8913/') - parser.add_option('--private_token', default=None) - parser.add_option('--transport', default=None) # set to curl to use pycurl - parser.add_option('--cache', default=False, action="store_true") - parser.add_option('--test_errors', default=False, action="store_true") - (options, args) = parser.parse_args() - - try: - temp_directory = tempfile.mkdtemp() - temp_work_dir = os.path.join(temp_directory, "w") - temp_tool_dir = os.path.join(temp_directory, "t") - for dir in [temp_tool_dir, temp_work_dir]: - os.makedirs(dir) - - temp_input_path = os.path.join(temp_directory, "input.txt") - temp_config_path = os.path.join(temp_work_dir, "config.txt") - temp_tool_path = os.path.join(temp_directory, "t", "script.py") - temp_output_path = os.path.join(temp_directory, "output") - - temp_input_file = open(temp_input_path, "w") - temp_config_file = open(temp_config_path, "w") - temp_tool_file = open(temp_tool_path, "w") - try: - temp_input_file.write("Hello world input!!@!") - temp_config_file.write("hello world output") - temp_tool_file.write(""" -import sys -output = open(sys.argv[3], 'w') -input_input = open(sys.argv[2], 'r') -config_input = open(sys.argv[1], 'r') -try: - assert input_input.read() == "Hello world input!!@!" - contents = config_input.read(1024) - output.write(contents) -finally: - output.close() - config_input.close() -""") - finally: - temp_input_file.close() - temp_tool_file.close() - temp_config_file.close() - - command_line = 'python %s "%s" "%s" "%s"' % (temp_tool_path, temp_config_path, temp_input_path, temp_output_path) - config_files = [temp_config_path] - input_files = [temp_input_path] - output_files = [temp_output_path] - - manager_args = {} - if options.cache: - manager_args['cache'] = True - if options.transport: - manager_args['transport'] = options.transport - - client = ClientManager(**manager_args).get_client({"url": options.url, "private_token": options.private_token}, "123456") - stager = FileStager(client, MockTool(temp_tool_dir), command_line, config_files, input_files, output_files, temp_work_dir) - new_command = stager.get_rewritten_command_line() - client.launch(new_command) - client.wait() - client.download_output(temp_output_path, temp_directory) - if options.test_errors: - try: - client.download_output(temp_output_path + "x", temp_directory) - except BaseException, e: - traceback.print_exc(e) - output_file = open(temp_output_path, 'r') - try: - output_contents = output_file.read() - assert output_contents == "hello world output", "Invalid output_contents: %s" % output_contents - print 'Test Successful!' - finally: - output_file.close() - except BaseException, e: - print "Exception: %s\n" % str(e) - traceback.print_exc(e) - finally: - shutil.rmtree(temp_directory) - # client.clean() +from test.check import main if __name__ == "__main__": main() diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/app_test.py b/test/app_test.py index 04d2ce0dea2c0472437d999d778f16743b857dfb..024c433ca803a4866ec39c828ea5e40da489dcd7 100644 --- a/test/app_test.py +++ b/test/app_test.py @@ -1,23 +1,17 @@ -from webtest import TestApp -from lwr.app import app_factory -import tempfile import os -import shutil import simplejson import urllib import time -def test_app(): +def test_standard_requests(): """ Tests app controller methods. These tests should be compartmentalized. Also these methods should be made to not retest the behavior of the associated Manager class. """ - - staging_directory = tempfile.mkdtemp() - try: - app = app_factory({}, staging_directory=staging_directory) - test_app = TestApp(app, extra_environ={"REMOTE_ADDR": "127.101.101.98"}) - setup_response = test_app.get("/setup?job_id=12345") + from test_utils import test_app + with test_app(test_conf={"extra_environ": {"REMOTE_ADDR": "127.101.101.98"}}) as app: + staging_directory = app.app.staging_directory + setup_response = app.get("/setup?job_id=12345") setup_config = simplejson.loads(setup_response.body) assert setup_config["working_directory"].startswith(staging_directory) outputs_directory = setup_config["outputs_directory"] @@ -27,7 +21,7 @@ def test_app(): def test_upload(upload_type): url = "/upload_%s?job_id=%s&name=input1" % (upload_type, job_id) - upload_input_response = test_app.post(url, "Test Contents") + upload_input_response = app.post(url, "Test Contents") upload_input_config = simplejson.loads(upload_input_response.body) staged_input_path = upload_input_config["path"] staged_input = open(staged_input_path, "r") @@ -43,37 +37,30 @@ def test_app(): test_output.write("Hello World!") finally: test_output.close() - download_response = test_app.get("/download_output?job_id=%s&name=test_output" % job_id) + download_response = app.get("/download_output?job_id=%s&name=test_output" % job_id) assert download_response.body == "Hello World!" try: - test_app.get("/download_output?job_id=%s&name=test_output2" % job_id) + app.get("/download_output?job_id=%s&name=test_output2" % job_id) assert False # Should throw exception except: pass command_line = urllib.quote("""python -c "import sys; sys.stdout.write('test_out')" """) - launch_response = test_app.get("/launch?job_id=%s&command_line=%s" % (job_id, command_line)) + launch_response = app.get("/launch?job_id=%s&command_line=%s" % (job_id, command_line)) assert launch_response.body == 'OK' - time.sleep(2) + time.sleep(1) - check_response = test_app.get("/check_complete?job_id=%s" % job_id) + check_response = app.get("/check_complete?job_id=%s" % job_id) check_config = simplejson.loads(check_response.body) assert check_config['returncode'] == 0 assert check_config['stdout'] == "test_out" assert check_config['stderr'] == "" - kill_response = test_app.get("/kill?job_id=%s" % job_id) + kill_response = app.get("/kill?job_id=%s" % job_id) assert kill_response.body == 'OK' - clean_response = test_app.get("/clean?job_id=%s" % job_id) + clean_response = app.get("/clean?job_id=%s" % job_id) assert clean_response.body == 'OK' assert os.listdir(staging_directory) == [] - - finally: - try: - app.shutdown() - except: - pass - shutil.rmtree(staging_directory) diff --git a/test/check.py b/test/check.py new file mode 100644 index 0000000000000000000000000000000000000000..093dd9bb2a1f56aa7656e19251f060ab52e73ff4 --- /dev/null +++ b/test/check.py @@ -0,0 +1,111 @@ +import shutil +import tempfile +import os +import optparse +import traceback + +from lwr.lwr_client import ClientManager +from lwr.lwr_client import FileStager + + +class MockTool(object): + + def __init__(self, tool_dir): + self.id = "client_test" + self.version = "1.0" + self.tool_dir = tool_dir + + +def run(options): + try: + temp_directory = tempfile.mkdtemp() + temp_work_dir = os.path.join(temp_directory, "w") + temp_tool_dir = os.path.join(temp_directory, "t") + for dir in [temp_tool_dir, temp_work_dir]: + os.makedirs(dir) + + temp_input_path = os.path.join(temp_directory, "input.txt") + temp_config_path = os.path.join(temp_work_dir, "config.txt") + temp_tool_path = os.path.join(temp_directory, "t", "script.py") + temp_output_path = os.path.join(temp_directory, "output") + + temp_input_file = open(temp_input_path, "w") + temp_config_file = open(temp_config_path, "w") + temp_tool_file = open(temp_tool_path, "w") + try: + temp_input_file.write("Hello world input!!@!") + temp_config_file.write("hello world output") + temp_tool_file.write(""" +import sys +output = open(sys.argv[3], 'w') +input_input = open(sys.argv[2], 'r') +config_input = open(sys.argv[1], 'r') +try: + assert input_input.read() == "Hello world input!!@!" + contents = config_input.read(1024) + output.write(contents) + open("workdir_output", "w").write("WORK DIR OUTPUT") +finally: + output.close() + config_input.close() +""") + finally: + temp_input_file.close() + temp_tool_file.close() + temp_config_file.close() + + command_line = 'python %s "%s" "%s" "%s"' % (temp_tool_path, temp_config_path, temp_input_path, temp_output_path) + config_files = [temp_config_path] + input_files = [temp_input_path] + output_files = [temp_output_path] + + manager_args = {} + if options.cache: + manager_args['cache'] = True + if options.transport: + manager_args['transport_type'] = options.transport + + client = ClientManager(**manager_args).get_client({"url": options.url, "private_token": options.private_token}, "123456") + stager = FileStager(client, MockTool(temp_tool_dir), command_line, config_files, input_files, output_files, temp_work_dir) + new_command = stager.get_rewritten_command_line() + client.launch(new_command) + client.wait() + client.download_output(temp_output_path, temp_directory) + if options.test_errors: + try: + client.download_output(temp_output_path + "x", temp_directory) + except BaseException, e: + traceback.print_exc(e) + output_file = open(temp_output_path, 'r') + try: + output_contents = output_file.read() + assert output_contents == "hello world output", "Invalid output_contents: %s" % output_contents + print 'Test Successful!' + finally: + output_file.close() + #if os.path.exists("workdir_output"): + # os.remove("workdir_output") + #client.download_work_dir_output("workdir_output") + #assert os.path.exists("workdir_output") + except BaseException, e: + print "Exception: %s\n" % str(e) + traceback.print_exc(e) + raise e + finally: + shutil.rmtree(temp_directory) + # client.clean() + + +def main(): + """ Exercises a running lwr server application with the lwr client. """ + parser = optparse.OptionParser() + parser.add_option('--url', dest='url', default='http://localhost:8913/') + parser.add_option('--private_token', default=None) + parser.add_option('--transport', default=None) # set to curl to use pycurl + parser.add_option('--cache', default=False, action="store_true") + parser.add_option('--test_errors', default=False, action="store_true") + (options, args) = parser.parse_args() + run(options) + +if __name__ == "__main__": + main() diff --git a/test/integration_test.py b/test/integration_test.py new file mode 100644 index 0000000000000000000000000000000000000000..ddadf7ee7ef50f4b98ed1f350945cea90c961255 --- /dev/null +++ b/test/integration_test.py @@ -0,0 +1,22 @@ +from unittest import TestCase + +from lwr.util import Bunch +from .check import run + + +class IntegrationTest(TestCase): + + def test_integration_cached(self): + self.__run(private_token=None, transport=None, cache=True, test_errors=False) + + def test_integration_default(self): + self.__run(private_token=None, transport=None, cache=False, test_errors=False) + + def test_integration_curl(self): + self.__run(private_token=None, transport="curl", cache=False, test_errors=False) + + def __run(self, **kwds): + from test_utils import test_server + with test_server() as server: + options = Bunch(url=server.application_url, **kwds) + run(options) diff --git a/test/test_utils.py b/test/test_utils.py index 4a0f81246f7367187cd7c0a0993c25356540c820..e3f2377827949edf9c31860fd473e53f2be8b3e4 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,15 +1,19 @@ +from contextlib import contextmanager from os import pardir from os.path import join, dirname - -from unittest import TestCase from tempfile import mkdtemp from shutil import rmtree +from unittest import TestCase -from contextlib import contextmanager +from webtest import TestApp +from webtest.http import StopableWSGIServer from lwr.tools import ToolBox from lwr.util import JobDirectory +TEST_DIR = dirname(__file__) +ROOT_DIR = join(TEST_DIR, pardir) + class TempDirectoryTestCase(TestCase): @@ -80,6 +84,43 @@ class TestAuthorization(object): raise Exception +@contextmanager +def test_server(global_conf={}, app_conf={}, test_conf={}): + with test_app(global_conf, app_conf, test_conf) as app: + from paste.exceptions.errormiddleware import ErrorMiddleware + error_app = ErrorMiddleware(app.app, debug=True, error_log="errors") + server = StopableWSGIServer.create(error_app) + try: + server.wait() + yield server + finally: + server.shutdown() + + +@contextmanager +def test_app(global_conf={}, app_conf={}, test_conf={}): + staging_directory = mkdtemp() + cache_directory = mkdtemp() + try: + app_conf["staging_directory"] = staging_directory + app_conf["file_cache_dir"] = cache_directory + from lwr.app import app_factory + + app = app_factory(global_conf, **app_conf) + test_app = TestApp(app, **test_conf) + yield test_app + finally: + try: + app.shutdown() + except: + pass + for directory in [staging_directory, cache_directory]: + try: + rmtree(directory) + except: + pass + + class TestAuthorizer(object): def __init__(self):