Commit c8551c16 authored by Gao, Shang's avatar Gao, Shang
Browse files

added endpoint autoactivation to globus transfers

parent aa8e9583
Loading
Loading
Loading
Loading
+42 −1
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import globus_sdk
import json
import sys
import os
import time

olcfatlas = 'ef1a9560-7ca1-11e5-992c-22000b96db58'

@@ -91,6 +92,9 @@ class crossbowGlobus(crossbowBase):
            self._save_tokens_to_file(token_file, tokens)
            self.transfer = self._get_transfer_client(self.client_id,tokens)

        #make sure endpoints are activated
        self._activate_endpoint(self.cadesdtn)

        super(crossbowGlobus, self).__init__(api_key,CKAN_url,user_agent)

    def _load_tokens_from_file(self,filepath):
@@ -125,6 +129,18 @@ class crossbowGlobus(crossbowBase):
        # return a set of tokens, organized by resource server name
        return token_response.by_resource_server
        
    def _activate_endpoint(self,ep_id):
        """
        function for manually activating an endpoint
        """
        r = self.transfer.endpoint_autoactivate(ep_id, if_expires_in=3600)
        while (r["code"] == "AutoActivationFailed"):
            print "Endpoint requires manual activation, please open the following \
                   URL in a browser to activate the endpoint:"
            print "https://www.globus.org/app/endpoints/%s/activate" % ep_id
            raw_input("Press ENTER after activating the endpoint:")
            r = self.transfer.endpoint_autoactivate(ep_id, if_expires_in=3600)
        
    def _get_transfer_client(self,client_id,tokens):
        """
        returns an authenticated Globus transfer client
@@ -181,6 +197,10 @@ class crossbowGlobus(crossbowBase):
        url = "file://CROSSBOW_NFS/%s/%s" % (package,filename)
        size = r['DATA'][0]['size']

        #make sure endpoints are activated
        self._activate_endpoint(source_endpoint)
        self._activate_endpoint(self.cadesdtn)

        #execute transfer
        tdata = globus_sdk.TransferData(self.transfer, source_endpoint, self.cadesdtn,
                                    label="Crossbow Transfer", sync_level="checksum")
@@ -196,7 +216,7 @@ class crossbowGlobus(crossbowBase):
        return transfer_result["task_id"]

    def download_resource(self,package,resource,dest_endpoint=olcfatlas,
                          dest_path='/~/'):
                          dest_path='/~/',wait_for_download=False,timeout=600):
        '''
        use globus to download the resource to a destination endpoint
        
@@ -209,6 +229,10 @@ class crossbowGlobus(crossbowBase):
            globus endpoint id of destination endpoint 
          - dest_path: string (default: "/~/")
            directory on endpoint to download resource into
          - wait_for_download: boolean (default: False)
            if set to true, script will pause until download completes
          - timeout: int (default: 600)
            if wait_for_download is set to True, the timeout for the download
        '''
        #check dest_path format
        if dest_path[-1] != '/':
@@ -221,6 +245,10 @@ class crossbowGlobus(crossbowBase):
        resource_path = resource_path.replace("file://CROSSBOW_NFS/", '/data/cades-crossbow/')
        filename = os.path.basename(resource_path)

        #make sure endpoints are activated
        self._activate_endpoint(dest_endpoint)
        self._activate_endpoint(self.cadesdtn)

        #execute transfer
        tdata = globus_sdk.TransferData(self.transfer, self.cadesdtn, dest_endpoint,
                                    label="Crossbow Transfer", sync_level="checksum")
@@ -228,6 +256,16 @@ class crossbowGlobus(crossbowBase):
        transfer_result = self.transfer.submit_transfer(tdata)
        print "task_id =", transfer_result["task_id"]
        
        #wait for download if enabled
        if wait_for_download:
            start = time.time()
            print "initializing file transfer"
            while self.check_task_status(transfer_result["task_id"]) != "SUCCEEDED":
                time.sleep(5)
                if (time.time() - start) > timeout:
                    raise Exception("file transfer timed out")
            print "transfer complete"
        
        return transfer_result["task_id"]

    def delete_resource(self,package,resource):
@@ -247,6 +285,9 @@ class crossbowGlobus(crossbowBase):
        resource_id = [metadata['id'] for metadata in pkg_metadata['resources'] \
                       if metadata['name']==resource][0]

        #make sure endpoints are activated
        self._activate_endpoint(self.cadesdtn)

        #delete from ckan
        self.ckan.action.resource_delete(id=resource_id)

+13 −0
Original line number Diff line number Diff line
import paramiko
import getpass

olcfdtn = 'dtn.ccs.ornl.gov'

username = raw_input("Please enter your OLCF username: ")
password = getpass.getpass("Please enter your PIN + TOKEN: ")

ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(olcfdtn, username=username, password=password)
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command("touch test.txt")
+2 −3
Original line number Diff line number Diff line
@@ -102,9 +102,8 @@ def test_edit_resource_description():
def test_download_resource():
    cbowMount.download_resource("nosetests1","testfile.csv",
                                dest_endpoint=testing_endpoint,
                                dest_path=globus_crossbow_path)
    
    time.sleep(15)
                                dest_path=globus_crossbow_path,
                                wait_for_download=True,timeout=15)
    assert os.path.isfile('testfile.csv')

def test_delete_resource():
+2 −11
Original line number Diff line number Diff line
@@ -16,14 +16,5 @@ if 'yelp_academic_dataset_review.json' not in resources:
endpoint = 'ef1a9560-7ca1-11e5-992c-22000b96db58'
path = '/~/crossbow/yelp_example'
dl_id = cbow.download_resource('yelp','yelp_academic_dataset_review.json',
                               dest_endpoint=endpoint,dest_path=path)
                               
#wait for transfer to complete
start = time.time()
timeout = 600
print "initializing file transfer"
while cbow.check_task_status(dl_id) != "SUCCEEDED":
    time.sleep(10)
    if (time.time() - start) > timeout:
        raise Exception("file transfer timed out")
print "transfer complete"
                               dest_endpoint=endpoint,dest_path=path,
                               wait_for_download=True,timeout=600)
Loading