Loading .gitignore +1 −0 Original line number Diff line number Diff line refresh-tokens.json *.pyc tests/test_crossbowGlobus.py 0 → 100644 +134 −0 Original line number Diff line number Diff line from crossbow.crossbowGlobus import crossbowGlobus from ckanapi import RemoteCKAN import time import os ''' Note that tests assume crossbow nfs is mounted to /data ''' testing_endpoint = '6a3ced3a-0d9a-11e7-bb38-22000b9a448b' globus_crossbow_path = '/~/Desktop/crossbow/' def setup_module(): #create cbowMount class for testing global cbowMount cbowMount = crossbowGlobus( api_key="eaabd7d9-3cb4-4014-85fe-73736e658472", token_file='./crossbow/refresh-tokens.json') #create direct ckan connection for environment setup global ckan ckan = RemoteCKAN( "http://128.219.185.145:5000", apikey="eaabd7d9-3cb4-4014-85fe-73736e658472",user_agent="nosetests") #create package for testing if "nosetests1" in ckan.action.package_list(): ckan.action.dataset_purge(id="nosetests1") ckan.action.package_create( name="nosetests1",owner_org="test",title="nosetests1", author="nosetests1",notes="nosetests1") #create resource for testing ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.csv", name="testfile.csv",description="myresource1") ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.dcd", name="testfile.dcd",description="myresource2") #create temporary files for testing tempfile = open('/data/nosetests1/testfile.csv', 'w+') tempfile.close() tempfile = open('/data/nosetests1/testfile.dcd', 'w+') tempfile.close() tempfile = open('./testfile2.dcd', 'w+') tempfile.close() def test_list_packages(): assert "nosetests1" in cbowMount.list_packages() def test_get_package_details(): assert cbowMount.get_package_details("nosetests1")['title'] == "nosetests1" def test_create_package(): cbowMount.create_package( "nosetests2",owner_org="test",title="nosetests2",description="nosetests2", author="nosetests2") assert "nosetests2" in ckan.action.package_list() def test_edit_package(): cbowMount.edit_package("nosetests1","test",title="nosetests1b") metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert metadata["title"] == "nosetests1b" assert "testfile.csv" in resources def test_list_resources(): assert "testfile.csv" in cbowMount.list_resources("nosetests1") assert "testfile.dcd" in cbowMount.list_resources("nosetests1") def test_list_dcd_resources(): assert "testfile.dcd" in cbowMount.list_dcd_resources("nosetests1") def test_list_non_dcd_resources(): assert "testfile.csv" in cbowMount.list_non_dcd_resources("nosetests1") def test_get_resource_details(): assert cbowMount.get_resource_details("nosetests1","testfile.csv")['url'] \ == "file://CROSSBOW_NFS/nosetests1/testfile.csv" def test_upload_resource(): cbowMount.upload_resource( "nosetests1",globus_crossbow_path+"/testfile2.dcd", source_endpoint=testing_endpoint,description="myresource3") time.sleep(15) metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] resource_urls = [resource['url'] for resource in metadata['resources']] assert "testfile2.dcd" in resources assert "file://CROSSBOW_NFS/nosetests1/testfile2.dcd" in resource_urls assert os.path.isfile('/data/nosetests1/testfile2.dcd') def test_edit_resource_description(): cbowMount.edit_resource_description("nosetests1","testfile.csv", new_description="myresource1b") pkg_metadata = ckan.action.package_show(id="nosetests1") rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \ if metadata['name']=="testfile.csv"][0] assert rsc_metadata['description'] == "myresource1b" def test_download_resource(): cbowMount.download_resource("nosetests1","testfile.csv", dest_endpoint=testing_endpoint, dest_path=globus_crossbow_path) time.sleep(15) assert os.path.isfile('testfile.csv') def test_delete_resource(): cbowMount.delete_resource("nosetests1","testfile.csv") time.sleep(5) metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert "testfile.csv" not in resources assert not os.path.isfile('/data/nosetests1/testfile.csv') def teardown_module(): #purge all packages used for testing ckan.action.dataset_purge(id="nosetests1") if "nosetests2" in ckan.action.package_list(): ckan.action.dataset_purge(id="nosetests2") #delete temporary files if they exist if os.path.isfile('/data/nosetests1/testfile.csv'): os.remove('/data/nosetests1/testfile.csv') if os.path.isfile('/data/nosetests1/testfile.dcd'): os.remove('/data/nosetests1/testfile.dcd') if os.path.isfile('/data/nosetests1/testfile2.dcd'): os.remove('/data/nosetests1/testfile2.dcd') if os.path.isfile('testfile.csv'): os.remove('testfile.csv') if os.path.isfile('testfile2.dcd'): os.remove('testfile2.dcd') tests/test_crossbow.py→tests/test_crossbowMount.py +18 −15 Original line number Diff line number Diff line from crossbow.crossbowMount import crossbowMount from crossbow.crossbowGlobus import crossbowGlobus from ckanapi import RemoteCKAN import os ''' Note that tests assume crossbow nfs is mounted to /data ''' def setup_module(): #create cbowMount class for testing global cbowMount Loading @@ -26,10 +29,10 @@ def setup_module(): #create resource for testing ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.csv", name="myresource1",description="myresource1") name="testfile.csv",description="myresource1") ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.dcd", name="myresource2",description="myresource2") name="testfile.dcd",description="myresource2") #create temporary files for testing tempfile = open('/data/nosetests1/testfile.csv', 'w+') Loading @@ -56,20 +59,20 @@ def test_edit_package(): metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert metadata["title"] == "nosetests1b" assert "myresource1" in resources assert "testfile.csv" in resources def test_list_resources(): assert "myresource1" in cbowMount.list_resources("nosetests1") assert "myresource2" in cbowMount.list_resources("nosetests1") assert "testfile.csv" in cbowMount.list_resources("nosetests1") assert "testfile.dcd" in cbowMount.list_resources("nosetests1") def test_list_dcd_resources(): assert "myresource2" in cbowMount.list_dcd_resources("nosetests1") assert "testfile.dcd" in cbowMount.list_dcd_resources("nosetests1") def test_list_non_dcd_resources(): assert "myresource1" in cbowMount.list_non_dcd_resources("nosetests1") assert "testfile.csv" in cbowMount.list_non_dcd_resources("nosetests1") def test_get_resource_details(): assert cbowMount.get_resource_details("nosetests1","myresource1")['url'] \ assert cbowMount.get_resource_details("nosetests1","testfile.csv")['url'] \ == "file://CROSSBOW_NFS/nosetests1/testfile.csv" def test_upload_resource(): Loading @@ -83,26 +86,26 @@ def test_upload_resource(): assert os.path.isfile('/data/nosetests1/testfile2.dcd') def test_edit_resource_description(): cbowMount.edit_resource_description("nosetests1","myresource1", cbowMount.edit_resource_description("nosetests1","testfile.csv", new_description="myresource1b") pkg_metadata = ckan.action.package_show(id="nosetests1") rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \ if metadata['name']=="myresource1"][0] if metadata['name']=="testfile.csv"][0] assert rsc_metadata['description'] == "myresource1b" def test_get_resource_path(): assert cbowMount.get_resource_path("nosetests1","myresource1") \ assert cbowMount.get_resource_path("nosetests1","testfile.csv") \ == "/data/nosetests1/testfile.csv" def test_download_resource(): cbowMount.download_resource("nosetests1","myresource1") cbowMount.download_resource("nosetests1","testfile.csv") assert os.path.isfile('testfile.csv') def test_delete_resource(): cbowMount.delete_resource("nosetests1","myresource1") cbowMount.delete_resource("nosetests1","testfile.csv") metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert "myresource1" not in resources assert "testfile.csv" not in resources assert not os.path.isfile('/data/nosetests1/testfile.csv') def teardown_module(): Loading Loading
tests/test_crossbowGlobus.py 0 → 100644 +134 −0 Original line number Diff line number Diff line from crossbow.crossbowGlobus import crossbowGlobus from ckanapi import RemoteCKAN import time import os ''' Note that tests assume crossbow nfs is mounted to /data ''' testing_endpoint = '6a3ced3a-0d9a-11e7-bb38-22000b9a448b' globus_crossbow_path = '/~/Desktop/crossbow/' def setup_module(): #create cbowMount class for testing global cbowMount cbowMount = crossbowGlobus( api_key="eaabd7d9-3cb4-4014-85fe-73736e658472", token_file='./crossbow/refresh-tokens.json') #create direct ckan connection for environment setup global ckan ckan = RemoteCKAN( "http://128.219.185.145:5000", apikey="eaabd7d9-3cb4-4014-85fe-73736e658472",user_agent="nosetests") #create package for testing if "nosetests1" in ckan.action.package_list(): ckan.action.dataset_purge(id="nosetests1") ckan.action.package_create( name="nosetests1",owner_org="test",title="nosetests1", author="nosetests1",notes="nosetests1") #create resource for testing ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.csv", name="testfile.csv",description="myresource1") ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.dcd", name="testfile.dcd",description="myresource2") #create temporary files for testing tempfile = open('/data/nosetests1/testfile.csv', 'w+') tempfile.close() tempfile = open('/data/nosetests1/testfile.dcd', 'w+') tempfile.close() tempfile = open('./testfile2.dcd', 'w+') tempfile.close() def test_list_packages(): assert "nosetests1" in cbowMount.list_packages() def test_get_package_details(): assert cbowMount.get_package_details("nosetests1")['title'] == "nosetests1" def test_create_package(): cbowMount.create_package( "nosetests2",owner_org="test",title="nosetests2",description="nosetests2", author="nosetests2") assert "nosetests2" in ckan.action.package_list() def test_edit_package(): cbowMount.edit_package("nosetests1","test",title="nosetests1b") metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert metadata["title"] == "nosetests1b" assert "testfile.csv" in resources def test_list_resources(): assert "testfile.csv" in cbowMount.list_resources("nosetests1") assert "testfile.dcd" in cbowMount.list_resources("nosetests1") def test_list_dcd_resources(): assert "testfile.dcd" in cbowMount.list_dcd_resources("nosetests1") def test_list_non_dcd_resources(): assert "testfile.csv" in cbowMount.list_non_dcd_resources("nosetests1") def test_get_resource_details(): assert cbowMount.get_resource_details("nosetests1","testfile.csv")['url'] \ == "file://CROSSBOW_NFS/nosetests1/testfile.csv" def test_upload_resource(): cbowMount.upload_resource( "nosetests1",globus_crossbow_path+"/testfile2.dcd", source_endpoint=testing_endpoint,description="myresource3") time.sleep(15) metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] resource_urls = [resource['url'] for resource in metadata['resources']] assert "testfile2.dcd" in resources assert "file://CROSSBOW_NFS/nosetests1/testfile2.dcd" in resource_urls assert os.path.isfile('/data/nosetests1/testfile2.dcd') def test_edit_resource_description(): cbowMount.edit_resource_description("nosetests1","testfile.csv", new_description="myresource1b") pkg_metadata = ckan.action.package_show(id="nosetests1") rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \ if metadata['name']=="testfile.csv"][0] assert rsc_metadata['description'] == "myresource1b" def test_download_resource(): cbowMount.download_resource("nosetests1","testfile.csv", dest_endpoint=testing_endpoint, dest_path=globus_crossbow_path) time.sleep(15) assert os.path.isfile('testfile.csv') def test_delete_resource(): cbowMount.delete_resource("nosetests1","testfile.csv") time.sleep(5) metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert "testfile.csv" not in resources assert not os.path.isfile('/data/nosetests1/testfile.csv') def teardown_module(): #purge all packages used for testing ckan.action.dataset_purge(id="nosetests1") if "nosetests2" in ckan.action.package_list(): ckan.action.dataset_purge(id="nosetests2") #delete temporary files if they exist if os.path.isfile('/data/nosetests1/testfile.csv'): os.remove('/data/nosetests1/testfile.csv') if os.path.isfile('/data/nosetests1/testfile.dcd'): os.remove('/data/nosetests1/testfile.dcd') if os.path.isfile('/data/nosetests1/testfile2.dcd'): os.remove('/data/nosetests1/testfile2.dcd') if os.path.isfile('testfile.csv'): os.remove('testfile.csv') if os.path.isfile('testfile2.dcd'): os.remove('testfile2.dcd')
tests/test_crossbow.py→tests/test_crossbowMount.py +18 −15 Original line number Diff line number Diff line from crossbow.crossbowMount import crossbowMount from crossbow.crossbowGlobus import crossbowGlobus from ckanapi import RemoteCKAN import os ''' Note that tests assume crossbow nfs is mounted to /data ''' def setup_module(): #create cbowMount class for testing global cbowMount Loading @@ -26,10 +29,10 @@ def setup_module(): #create resource for testing ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.csv", name="myresource1",description="myresource1") name="testfile.csv",description="myresource1") ckan.action.resource_create( package_id="nosetests1",url="file://CROSSBOW_NFS/nosetests1/testfile.dcd", name="myresource2",description="myresource2") name="testfile.dcd",description="myresource2") #create temporary files for testing tempfile = open('/data/nosetests1/testfile.csv', 'w+') Loading @@ -56,20 +59,20 @@ def test_edit_package(): metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert metadata["title"] == "nosetests1b" assert "myresource1" in resources assert "testfile.csv" in resources def test_list_resources(): assert "myresource1" in cbowMount.list_resources("nosetests1") assert "myresource2" in cbowMount.list_resources("nosetests1") assert "testfile.csv" in cbowMount.list_resources("nosetests1") assert "testfile.dcd" in cbowMount.list_resources("nosetests1") def test_list_dcd_resources(): assert "myresource2" in cbowMount.list_dcd_resources("nosetests1") assert "testfile.dcd" in cbowMount.list_dcd_resources("nosetests1") def test_list_non_dcd_resources(): assert "myresource1" in cbowMount.list_non_dcd_resources("nosetests1") assert "testfile.csv" in cbowMount.list_non_dcd_resources("nosetests1") def test_get_resource_details(): assert cbowMount.get_resource_details("nosetests1","myresource1")['url'] \ assert cbowMount.get_resource_details("nosetests1","testfile.csv")['url'] \ == "file://CROSSBOW_NFS/nosetests1/testfile.csv" def test_upload_resource(): Loading @@ -83,26 +86,26 @@ def test_upload_resource(): assert os.path.isfile('/data/nosetests1/testfile2.dcd') def test_edit_resource_description(): cbowMount.edit_resource_description("nosetests1","myresource1", cbowMount.edit_resource_description("nosetests1","testfile.csv", new_description="myresource1b") pkg_metadata = ckan.action.package_show(id="nosetests1") rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \ if metadata['name']=="myresource1"][0] if metadata['name']=="testfile.csv"][0] assert rsc_metadata['description'] == "myresource1b" def test_get_resource_path(): assert cbowMount.get_resource_path("nosetests1","myresource1") \ assert cbowMount.get_resource_path("nosetests1","testfile.csv") \ == "/data/nosetests1/testfile.csv" def test_download_resource(): cbowMount.download_resource("nosetests1","myresource1") cbowMount.download_resource("nosetests1","testfile.csv") assert os.path.isfile('testfile.csv') def test_delete_resource(): cbowMount.delete_resource("nosetests1","myresource1") cbowMount.delete_resource("nosetests1","testfile.csv") metadata = ckan.action.package_show(id="nosetests1") resources = [resource['name'] for resource in metadata['resources']] assert "myresource1" not in resources assert "testfile.csv" not in resources assert not os.path.isfile('/data/nosetests1/testfile.csv') def teardown_module(): Loading