Commit 9aec2bd2 authored by Gao, Shang's avatar Gao, Shang
Browse files

added tests for crossbowGlobus class

parent 4d87f0ed
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
refresh-tokens.json
*.pyc
+134 −0
Original line number Diff line number Diff line
from crossbow.crossbowGlobus import crossbowGlobus
from ckanapi import RemoteCKAN
import time
import os

'''
Note that tests assume crossbow nfs is mounted to /data
'''

testing_endpoint = '6a3ced3a-0d9a-11e7-bb38-22000b9a448b'
globus_crossbow_path = '/~/Desktop/crossbow/'

def setup_module():
    #create cbowMount class for testing
    global cbowMount
    cbowMount = crossbowGlobus(
            api_key="eaabd7d9-3cb4-4014-85fe-73736e658472",
            token_file='./crossbow/refresh-tokens.json')
    
    #create direct ckan connection for environment setup
    global ckan 
    ckan = RemoteCKAN(
            "http://128.219.185.145:5000",
            apikey="eaabd7d9-3cb4-4014-85fe-73736e658472",user_agent="nosetests")

    #create package for testing
    if "nosetests1" in ckan.action.package_list():
        ckan.action.dataset_purge(id="nosetests1")
    ckan.action.package_create(
            name="nosetests1",owner_org="test",title="nosetests1",
            author="nosetests1",notes="nosetests1")
    
    #create resource for testing
    ckan.action.resource_create(
            package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.csv",
            name="testfile.csv",description="myresource1")
    ckan.action.resource_create(
            package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.dcd",
            name="testfile.dcd",description="myresource2")

    #create temporary files for testing
    tempfile = open('/data/nosetests1/testfile.csv', 'w+')
    tempfile.close()
    tempfile = open('/data/nosetests1/testfile.dcd', 'w+')
    tempfile.close()
    tempfile = open('./testfile2.dcd', 'w+')
    tempfile.close()

def test_list_packages():
    assert "nosetests1" in cbowMount.list_packages()

def test_get_package_details():
    assert cbowMount.get_package_details("nosetests1")['title'] == "nosetests1"

def test_create_package():
    cbowMount.create_package(
            "nosetests2",owner_org="test",title="nosetests2",description="nosetests2",
            author="nosetests2")
    assert "nosetests2" in ckan.action.package_list()

def test_edit_package():
    cbowMount.edit_package("nosetests1","test",title="nosetests1b")
    metadata = ckan.action.package_show(id="nosetests1")
    resources = [resource['name'] for resource in metadata['resources']]
    assert metadata["title"] == "nosetests1b"
    assert "testfile.csv" in resources

def test_list_resources():
    assert "testfile.csv" in cbowMount.list_resources("nosetests1")
    assert "testfile.dcd" in cbowMount.list_resources("nosetests1")

def test_list_dcd_resources():
    assert "testfile.dcd" in cbowMount.list_dcd_resources("nosetests1")

def test_list_non_dcd_resources():
    assert "testfile.csv" in cbowMount.list_non_dcd_resources("nosetests1")

def test_get_resource_details():
    assert cbowMount.get_resource_details("nosetests1","testfile.csv")['url'] \
           == "file://CROSSBOW_NFS/nosetests1/testfile.csv"

def test_upload_resource():
    cbowMount.upload_resource(
            "nosetests1",globus_crossbow_path+"/testfile2.dcd",
            source_endpoint=testing_endpoint,description="myresource3")
    time.sleep(15)
    metadata = ckan.action.package_show(id="nosetests1")
    resources = [resource['name'] for resource in metadata['resources']]
    resource_urls = [resource['url'] for resource in metadata['resources']]
    assert "testfile2.dcd" in resources
    assert "file://CROSSBOW_NFS/nosetests1/testfile2.dcd" in resource_urls
    assert os.path.isfile('/data/nosetests1/testfile2.dcd')

def test_edit_resource_description():
    cbowMount.edit_resource_description("nosetests1","testfile.csv",
                                        new_description="myresource1b")
    pkg_metadata = ckan.action.package_show(id="nosetests1")
    rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \
                    if metadata['name']=="testfile.csv"][0]
    assert rsc_metadata['description'] == "myresource1b"

def test_download_resource():
    cbowMount.download_resource("nosetests1","testfile.csv",
                                dest_endpoint=testing_endpoint,
                                dest_path=globus_crossbow_path)
    
    time.sleep(15)
    assert os.path.isfile('testfile.csv')

def test_delete_resource():
    cbowMount.delete_resource("nosetests1","testfile.csv")
    time.sleep(5)
    metadata = ckan.action.package_show(id="nosetests1")
    resources = [resource['name'] for resource in metadata['resources']]
    assert "testfile.csv" not in resources
    assert not os.path.isfile('/data/nosetests1/testfile.csv')

def teardown_module():
    #purge all packages used for testing
    ckan.action.dataset_purge(id="nosetests1")
    if "nosetests2" in ckan.action.package_list():
        ckan.action.dataset_purge(id="nosetests2")

    #delete temporary files if they exist
    if os.path.isfile('/data/nosetests1/testfile.csv'):
       os.remove('/data/nosetests1/testfile.csv')
    if os.path.isfile('/data/nosetests1/testfile.dcd'):
       os.remove('/data/nosetests1/testfile.dcd')
    if os.path.isfile('/data/nosetests1/testfile2.dcd'):
       os.remove('/data/nosetests1/testfile2.dcd')
    if os.path.isfile('testfile.csv'):
       os.remove('testfile.csv')
    if os.path.isfile('testfile2.dcd'):
       os.remove('testfile2.dcd')
+18 −15
Original line number Diff line number Diff line
from crossbow.crossbowMount import crossbowMount
from crossbow.crossbowGlobus import crossbowGlobus
from ckanapi import RemoteCKAN
import os

'''
Note that tests assume crossbow nfs is mounted to /data
'''

def setup_module():
    #create cbowMount class for testing
    global cbowMount
@@ -26,10 +29,10 @@ def setup_module():
    #create resource for testing
    ckan.action.resource_create(
            package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.csv",
            name="myresource1",description="myresource1")
            name="testfile.csv",description="myresource1")
    ckan.action.resource_create(
            package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.dcd",
            name="myresource2",description="myresource2")
            name="testfile.dcd",description="myresource2")

    #create temporary files for testing
    tempfile = open('/data/nosetests1/testfile.csv', 'w+')
@@ -56,20 +59,20 @@ def test_edit_package():
    metadata = ckan.action.package_show(id="nosetests1")
    resources = [resource['name'] for resource in metadata['resources']]
    assert metadata["title"] == "nosetests1b"
    assert "myresource1" in resources
    assert "testfile.csv" in resources

def test_list_resources():
    assert "myresource1" in cbowMount.list_resources("nosetests1")
    assert "myresource2" in cbowMount.list_resources("nosetests1")
    assert "testfile.csv" in cbowMount.list_resources("nosetests1")
    assert "testfile.dcd" in cbowMount.list_resources("nosetests1")

def test_list_dcd_resources():
    assert "myresource2" in cbowMount.list_dcd_resources("nosetests1")
    assert "testfile.dcd" in cbowMount.list_dcd_resources("nosetests1")

def test_list_non_dcd_resources():
    assert "myresource1" in cbowMount.list_non_dcd_resources("nosetests1")
    assert "testfile.csv" in cbowMount.list_non_dcd_resources("nosetests1")

def test_get_resource_details():
    assert cbowMount.get_resource_details("nosetests1","myresource1")['url'] \
    assert cbowMount.get_resource_details("nosetests1","testfile.csv")['url'] \
           == "file://CROSSBOW_NFS/nosetests1/testfile.csv"

def test_upload_resource():
@@ -83,26 +86,26 @@ def test_upload_resource():
    assert os.path.isfile('/data/nosetests1/testfile2.dcd')

def test_edit_resource_description():
    cbowMount.edit_resource_description("nosetests1","myresource1",
    cbowMount.edit_resource_description("nosetests1","testfile.csv",
                                        new_description="myresource1b")
    pkg_metadata = ckan.action.package_show(id="nosetests1")
    rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \
                    if metadata['name']=="myresource1"][0]
                    if metadata['name']=="testfile.csv"][0]
    assert rsc_metadata['description'] == "myresource1b"

def test_get_resource_path():
     assert cbowMount.get_resource_path("nosetests1","myresource1") \
     assert cbowMount.get_resource_path("nosetests1","testfile.csv") \
            == "/data/nosetests1/testfile.csv"

def test_download_resource():
    cbowMount.download_resource("nosetests1","myresource1")
    cbowMount.download_resource("nosetests1","testfile.csv")
    assert os.path.isfile('testfile.csv')

def test_delete_resource():
    cbowMount.delete_resource("nosetests1","myresource1")
    cbowMount.delete_resource("nosetests1","testfile.csv")
    metadata = ckan.action.package_show(id="nosetests1")
    resources = [resource['name'] for resource in metadata['resources']]
    assert "myresource1" not in resources
    assert "testfile.csv" not in resources
    assert not os.path.isfile('/data/nosetests1/testfile.csv')

def teardown_module():