Skip to content
Snippets Groups Projects
client_test.py 6.57 KiB
Newer Older
John Chilton's avatar
John Chilton committed
import tempfile
import os

from six import text_type, binary_type

from pulsar.client.client import JobClient
from pulsar.client.manager import HttpPulsarInterface
from pulsar.client.transport import Urllib2Transport
from pulsar.client.decorators import retry, MAX_RETRY_COUNT


def test_with_retry():
    i = []

    @retry()
    def func():
        i.append(0)
        raise Exception
    exception_raised = False
    try:
        func()
    except Exception:
        exception_raised = True
    assert exception_raised
    assert len(i) == MAX_RETRY_COUNT, len(i)
John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
class FakeResponse(object):
    """ Object meant to simulate a Response object as returned by
    urllib.open """

    def __init__(self, body):
        self.body = body
        self.first_read = True

John Chilton's avatar
John Chilton committed
    def read(self, bytes=1024):
John Chilton's avatar
John Chilton committed
        if self.first_read:
            result = self.body
        else:
John Chilton's avatar
John Chilton committed
        self.first_read = False
        return result

John Chilton's avatar
John Chilton committed

class TestTransport(Urllib2Transport):
    """ Implements mock of HTTP transport layer for TestClient tests."""

    def __init__(self, test_client):
        self.test_client = test_client

    def _url_open(self, request, data):
        (checker, response) = self.test_client.expects.pop()
        checker(request, data)
        return FakeResponse(response)


class TestClient(JobClient):
John Chilton's avatar
John Chilton committed
    """ A dervative of the Client class that replaces the url_open
    method so that requests can be inspected and responses faked."""

John Chilton's avatar
John Chilton committed
    def __init__(self):
        JobClient.__init__(self, {}, "543", HttpPulsarInterface({"url": "http://test:803/"}, TestTransport(self)))
John Chilton's avatar
John Chilton committed

    def expect_open(self, checker, response):
        self.expects.appendleft((checker, response))
John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
class RequestChecker(object):
    """ Class that tests request objects produced by the Client class.
    """
John Chilton's avatar
John Chilton committed
    def __init__(self, action, args={}, data=None):
John Chilton's avatar
John Chilton committed
        args['job_id'] = "543"
        self.action = action
        self.expected_args = args
        self.data = data
        self.called = False

    def check_url(self, opened_url):
        expected_url_prefix = "http://test:803/%s?" % self.action
        assert opened_url.startswith(expected_url_prefix)
        url_suffix = opened_url[len(expected_url_prefix):]
        actual_args = dict([key_val_combo.split("=") for key_val_combo in url_suffix.split("&")])
        statement = "Expected args %s, obtained %s" % (self.expected_args, actual_args)
        assert self.expected_args == actual_args, statement

    def check_data(self, data):
        if data is None:
            assert self.data is None
        elif type(data) in (binary_type, text_type):
            assert self.data == data
John Chilton's avatar
John Chilton committed
        else:
            data_read = data.read(1024)
            assert data_read == self.data, "data_read %s is not expected data %s" % (data_read, self.data)
John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
    def __call__(self, request, data=None):
John Chilton's avatar
John Chilton committed
        self.called = True
        self.check_url(request.get_full_url())
        self.check_data(data)

    def assert_called(self):
        assert self.called

John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
def test_setup():
    """ Test the setup method of Client """
    client = TestClient()
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs")
    response_json = b'{"working_directory":"C:\\\\home\\\\dir","outputs_directory" : "C:\\\\outputs","path_separator" : "\\\\"}'
John Chilton's avatar
John Chilton committed
    client.expect_open(request_checker, response_json)
    setup_response = client.setup()
    request_checker.assert_called()
    assert setup_response['working_directory'] == "C:\\home\\dir"
    assert setup_response['outputs_directory'] == "C:\\outputs"
    assert setup_response['path_separator'] == '\\'
John Chilton's avatar
John Chilton committed


John Chilton's avatar
John Chilton committed
def test_launch():
    """ Test the launch method of client. """
    client = TestClient()
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543/submit", {"command_line": "python"})
John Chilton's avatar
John Chilton committed
    client.expect_open(request_checker, 'OK')
John Chilton's avatar
John Chilton committed
    client.launch("python")
    request_checker.assert_called()

John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
def __test_upload(upload_type):
    client = TestClient()
    (temp_fileno, temp_file_path) = tempfile.mkstemp()
    temp_file = os.fdopen(temp_fileno, 'w')
John Chilton's avatar
John Chilton committed
    try:
John Chilton's avatar
John Chilton committed
    finally:
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543/files", {"name": os.path.basename(temp_file_path), "type": upload_type}, b"Hello World!")
    client.expect_open(request_checker, b'{"path" : "C:\\\\tools\\\\foo"}')
John Chilton's avatar
John Chilton committed

        upload_result = client.put_file(temp_file_path, 'tool')
John Chilton's avatar
John Chilton committed
    else:
        upload_result = client.put_file(temp_file_path, 'input')
John Chilton's avatar
John Chilton committed

    request_checker.assert_called()
    assert upload_result["path"] == "C:\\tools\\foo"

John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
def test_upload_tool():
John Chilton's avatar
John Chilton committed


John Chilton's avatar
John Chilton committed
def test_upload_input():
John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed

def test_upload_config():
    client = TestClient()
    (temp_fileno, temp_file_path) = tempfile.mkstemp()
    temp_file = os.fdopen(temp_fileno, 'w')
    try:
        temp_file.write("Hello World!")
    finally:
        temp_file.close()
John Chilton's avatar
John Chilton committed
    modified_contents = b"Hello World! <Modified>"
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543/files", {"name": os.path.basename(temp_file_path), "type": "config"}, modified_contents)
    client.expect_open(request_checker, b'{"path" : "C:\\\\tools\\\\foo"}')
    upload_result = client.put_file(temp_file_path, 'config', contents=modified_contents)
    request_checker.assert_called()
    assert upload_result["path"] == "C:\\tools\\foo"


John Chilton's avatar
John Chilton committed
def test_download_output():
    """ Test the download output method of Client. """
    client = TestClient()
    temp_file = tempfile.NamedTemporaryFile()
    temp_file.close()
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543/files", {"name": os.path.basename(temp_file.name), "type": "output"})
    client.expect_open(request_checker, b"test output contents")
    client._fetch_output(temp_file.name)
John Chilton's avatar
John Chilton committed

    with open(temp_file.name, "r") as f:
        contents = f.read(1024)
        assert contents == "test output contents", "Unxpected contents %s" % contents
John Chilton's avatar
John Chilton committed


def test_get_status_queued():
    client = TestClient()
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543/status")
    client.expect_open(request_checker, b'{"complete": "false", "status" : "queued"}')
    assert client.get_status() == "queued"
    request_checker.assert_called()


John Chilton's avatar
John Chilton committed
def test_kill():
    client = TestClient()
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543/cancel")
John Chilton's avatar
John Chilton committed
    client.expect_open(request_checker, 'OK')
John Chilton's avatar
John Chilton committed
    client.kill()
    request_checker.assert_called()

John Chilton's avatar
John Chilton committed

John Chilton's avatar
John Chilton committed
def test_clean():
    client = TestClient()
John Chilton's avatar
John Chilton committed
    request_checker = RequestChecker("jobs/543")
John Chilton's avatar
John Chilton committed
    client.expect_open(request_checker, 'OK')
John Chilton's avatar
John Chilton committed
    client.clean()
    request_checker.assert_called()