Loading crossbow/crossbowGlobus.py +62 −6 Original line number Diff line number Diff line Loading @@ -66,6 +66,9 @@ class crossbowGlobus(crossbowBase): return token_response.by_resource_server def _get_transfer_client(self,client_id,tokens): """ returns an authenticated Globus transfer client """ transfer_tokens = tokens['transfer.api.globus.org'] auth_client = globus_sdk.NativeAppAuthClient(client_id=client_id) Loading @@ -83,8 +86,21 @@ class crossbowGlobus(crossbowBase): def upload_resource(self): pass def download_resource(self,package,resource,dest_endpoint,dest_path): def download_resource(self,package,resource,dest_endpoint=olcfatlas, dest_path='\~\'): ''' use globus to download the resource to a destination endpoint parameters: - package: string name of package - resource: string name of resource - dest_endpoint: string (default: olcf - 'ef1a9560-7ca1-11e5-992c-22000b96db58') globus endpoint id of destination endpoint - dest_path: string (default: "/~/") directory on endpoint to download resource into ''' #check dest_path format if dest_path[-1] != '/': dest_path = dest_path + '/' Loading @@ -105,14 +121,54 @@ class crossbowGlobus(crossbowBase): return transfer_result["task_id"] def delete_resource(self): def delete_resource(self,package,resource): ''' delete a resource from a package (Note: your api key must belong to a sysadmin or the owner of the resource to delete a resource) parameters: - package: string name of package - resource: string name of resource - delete_from_nfs: boolean (default: False) if True, also delete the resource file from NFS ''' pkg_metadata = self.ckan.action.package_show(id=package) resource_id = [metadata['id'] for metadata in pkg_metadata['resources'] \ if metadata['name']==resource][0] #delete from ckan self.ckan.action.resource_delete(id=resource_id) #delete file using globus resource_path = [metadata['url'] for metadata in pkg_metadata['resources'] \ if metadata['name']==resource][0] resource_path = resource_path.replace("file://CROSSBOW_NFS/", '/data/cades-crossbow/') ddata = globus_sdk.DeleteData(self.transfer, self.cadesdtn) ddata.add_item("/file/to/delete/file.txt") ddata.add_item(resource_path) delete_result = self.transfer.submit_delete(ddata) print("task_id =", delete_result["task_id"]) def check_task_status(self): pass return delete_result["task_id"] def check_task_status(self,task_id): ''' check on the status of a globus task parameters: - task_id: string id of task to check ''' r = self.transfer.get_task(submit_result[task_id]) print ("Label:", r["label"]) print ("Status:", r["status"]) print ("Transfer: {} -> {}".format(r["source_endpoint_display_name"], r["destination_endpoint_display_name"])) if r.data["status"] == "SUCCEEDED": print ("Bytes transferred:", r["bytes_transferred"]) print ("Files transferred:", r["files_transferred"]) print ("Transfer rate:", r["effective_bytes_per_second"], "bps") #add model api later #add scheduling api later (for both filters and models) Loading Loading
crossbow/crossbowGlobus.py +62 −6 Original line number Diff line number Diff line Loading @@ -66,6 +66,9 @@ class crossbowGlobus(crossbowBase): return token_response.by_resource_server def _get_transfer_client(self,client_id,tokens): """ returns an authenticated Globus transfer client """ transfer_tokens = tokens['transfer.api.globus.org'] auth_client = globus_sdk.NativeAppAuthClient(client_id=client_id) Loading @@ -83,8 +86,21 @@ class crossbowGlobus(crossbowBase): def upload_resource(self): pass def download_resource(self,package,resource,dest_endpoint,dest_path): def download_resource(self,package,resource,dest_endpoint=olcfatlas, dest_path='\~\'): ''' use globus to download the resource to a destination endpoint parameters: - package: string name of package - resource: string name of resource - dest_endpoint: string (default: olcf - 'ef1a9560-7ca1-11e5-992c-22000b96db58') globus endpoint id of destination endpoint - dest_path: string (default: "/~/") directory on endpoint to download resource into ''' #check dest_path format if dest_path[-1] != '/': dest_path = dest_path + '/' Loading @@ -105,14 +121,54 @@ class crossbowGlobus(crossbowBase): return transfer_result["task_id"] def delete_resource(self): def delete_resource(self,package,resource): ''' delete a resource from a package (Note: your api key must belong to a sysadmin or the owner of the resource to delete a resource) parameters: - package: string name of package - resource: string name of resource - delete_from_nfs: boolean (default: False) if True, also delete the resource file from NFS ''' pkg_metadata = self.ckan.action.package_show(id=package) resource_id = [metadata['id'] for metadata in pkg_metadata['resources'] \ if metadata['name']==resource][0] #delete from ckan self.ckan.action.resource_delete(id=resource_id) #delete file using globus resource_path = [metadata['url'] for metadata in pkg_metadata['resources'] \ if metadata['name']==resource][0] resource_path = resource_path.replace("file://CROSSBOW_NFS/", '/data/cades-crossbow/') ddata = globus_sdk.DeleteData(self.transfer, self.cadesdtn) ddata.add_item("/file/to/delete/file.txt") ddata.add_item(resource_path) delete_result = self.transfer.submit_delete(ddata) print("task_id =", delete_result["task_id"]) def check_task_status(self): pass return delete_result["task_id"] def check_task_status(self,task_id): ''' check on the status of a globus task parameters: - task_id: string id of task to check ''' r = self.transfer.get_task(submit_result[task_id]) print ("Label:", r["label"]) print ("Status:", r["status"]) print ("Transfer: {} -> {}".format(r["source_endpoint_display_name"], r["destination_endpoint_display_name"])) if r.data["status"] == "SUCCEEDED": print ("Bytes transferred:", r["bytes_transferred"]) print ("Files transferred:", r["files_transferred"]) print ("Transfer rate:", r["effective_bytes_per_second"], "bps") #add model api later #add scheduling api later (for both filters and models) Loading