Commit 0c8db446 authored by Gao, Shang's avatar Gao, Shang
Browse files

added basic Globus methods

parent be5da4da
Loading
Loading
Loading
Loading
+215 −0
Original line number Diff line number Diff line
from ckanapi import RemoteCKAN

class crossbowBase(object):
    '''
    oak ridge national labs cross-platform big-data operational workflow
    base class for crossbowMount and crossbowGlobus
    '''

    def __init__(self,api_key,CKAN_url="http://128.219.185.145:5000",
                 user_agent='crossbow'):

        #connect to CKAN webserver
        self.ckan = RemoteCKAN(CKAN_url, apikey=api_key, user_agent=user_agent)

    def list_packages(self):
        '''
        returns all packages (datasets) available on CKAN

        output: list
            list of package names available on CKAN
        '''
        return self.ckan.action.package_list()

    def get_package_details(self,package):
        '''
        prints and returns details for a specific package

        parameters:
          - package: string
            name of package

        output: dictionary
            metadata for specific package
        '''
        metadata = self.ckan.action.package_show(id=package)
        print "package name: %s" % package
        print "package title: %s" % metadata['title']
        print "package description: %s" % metadata['notes']
        print "package author: %s" % metadata['author']
        print "author email: %s" % metadata['author_email']
        print "package maintainer: %s" % metadata['maintainer']
        print "maintainer email: %s" % metadata['maintainer_email']
        print "version: %s" % metadata['version']
        print "created: %s" % metadata['metadata_created']
        print "last modified: %s" % metadata['metadata_modified']
        print "resources: ", self.list_resources(package)
        print "tags: %s" % metadata['tags']

        return metadata

    def create_package(self,package,owner_org,title=None,description=None,
                       author=None,author_email=None,maintainer=None,
                       maintainer_email=None,version=None,tags=[]):
        '''
        creates a new package (dataset)

        parameters:
          - package: string
            unique name for the package, should be lowercase with no spaces
          - owner_org: string
            name of organization that owns the package 
          - title: string (optional)
            title of the package to be displayed on CKAN website
          - description: string (optional)
            description of package
          - author: string (optional)
            author of package
          - author_email: string (optional)
            email address of author
          - maintainer: string (optional)
            maintainer for package
          - maintainer_email: string (optional)
            email address of maintainer
          - version: string (optional)
            version of package
          - tags: list (optional)
            list of tag dictionaries to add to the package
        '''
        if " " in package or any(x.isupper() for x in package):
            raise Exception('package must be lowercase and without spaces')
        self.ckan.action.package_create(
                name=package,owner_org=owner_org,title=title,author=author,
                author_email=author_email,maintainer=maintainer,
                maintainer_email=maintainer_email,notes=description,
                version=version,tags=tags)

    def edit_package(self,package,*args,**kwargs):
        '''
        edit an existing package (dataset)

        parameters:
          - package: string
            name of package
          see create_package() function for additional parameters
        '''
        #add args to kwargs dic
        keywords = ['owner_org','title','notes','author','author_email',\
                    'maintainer','maintainer_email','version','tags']
        for idx,arg in enumerate(args):
            kwargs[keywords[idx]] = arg
        
        #replace kwargs keywords that don't match ckanapi keywords 
        if 'description' in kwargs:
            kwargs['notes'] = kwargs['description']
            del kwargs['description']

        #get existing metadata
        metadata = self.ckan.action.package_show(id=package)

        #replace with new edits
        for key,val in kwargs.iteritems():
            metadata[key] = val

        self.ckan.action.package_update(**metadata)

    def list_resources(self,package):
        '''
        returns all resources for a particular package

        parameters:
          - package: string
            name of package

        output: list
            names of available resources in package
        '''
        metadata = self.ckan.action.package_show(id=package)
        resources = [resource['name'] for resource in metadata['resources']]
        return resources

    def list_dcd_resources(self,package):
        '''
        returns all .dcd resources for a particular package

        parameters:
          - package: string
            name of package

        output: list
            names of available resources in package
        '''
        metadata = self.ckan.action.package_show(id=package)
        resources = [resource['name'] for resource in metadata['resources'] \
                     if resource['url'][-4:]==".dcd"]
        return resources

    def list_non_dcd_resources(self,package):
        '''
        returns all non .dcd resources for a particular package

        parameters:
          - package: string
            name of package

        output: list
            names of available resources in package
        '''
        metadata = self.ckan.action.package_show(id=package)
        resources = [resource['name'] for resource in metadata['resources'] \
                     if resource['url'][-4:]!=".dcd"]
        return resources

    def get_resource_details(self,package,resource):
        '''
        prints and returns details for a specific resource

        parameters:
          - package: string
            name of package
          - resource: string
            name of resource

        output: dictionary
            metadata for specific resource
        '''
        pkg_metadata = self.ckan.action.package_show(id=package)
        rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \
                        if metadata['name']==resource][0]
        print "package name: %s" % package
        print "resource name: %s" % resource
        print "resource description: %s" % rsc_metadata['description']
        print "resource format: %s" % rsc_metadata['format']
        print "resource size: %s" % rsc_metadata['size']
        print "resource url: %s" % rsc_metadata['url']
        print "created: %s" % rsc_metadata['created']
        print "last modified: %s" % metadata['last_modified']

        return rsc_metadata
        
    def edit_resource(self,package,resource,new_name=None,new_description=None):
        '''
        edit the name or description for an existing resource

        parameters:
          - package: string
            name of package
          - resource: string
            name of resource
          - new_name: string (optional)
            new name for resource
          - new_description: string (optional)
            new description for resource
        '''
        #get current metadata
        pkg_metadata = self.ckan.action.package_show(id=package)
        rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \
                        if metadata['name']==resource][0]

        #update metadata with new edits
        if new_name:
            rsc_metadata['name'] = new_name
        if new_description:
            rsc_metadata['description'] = new_description

        self.ckan.action.resource_update(**rsc_metadata)
+125 −0
Original line number Diff line number Diff line
from ckanapi import RemoteCKAN
from crossbowBase import crossbowBase
import globus_sdk
import json
import sys
import os

olcfatlas = 'ef1a9560-7ca1-11e5-992c-22000b96db58'

class crossbowGlobus(crossbowBase):
    '''
    oak ridge national labs cross-platform big-data operational workflow
    this version is for file transfers through Globus
    '''

    def __init__(self,api_key,CKAN_url="http://128.219.185.145:5000",
                 user_agent='crossbow',token_file='refresh-tokens.json'):
        
        self.client_id = '18270179-b498-4032-9187-ca646d6e29af'
        self.cadesdtn = 'e6bcd4fa-ac1b-11e6-9aee-22000a1e3b52'

        try:
            # if we already have tokens, load and use them
            tokens = self._load_tokens_from_file(token_file)
            self.transfer = self._get_transfer_client(self.client_id,tokens)
            self.transfer.get_endpoint(self.cadesdtn)

        except:
            # if we need to get tokens, start the Native App authentication process
            tokens = self._native_app_authentication(self.client_id)
            self._save_tokens_to_file(token_file, tokens)
            self.transfer = self._get_transfer_client(self.client_id,tokens)

        super(crossbowGlobus, self).__init__(api_key,CKAN_url,user_agent)

    def _load_tokens_from_file(self,filepath):
        """Load a set of saved tokens."""
        with open(filepath, 'r') as f:
            tokens = json.load(f)
        return tokens

    def _save_tokens_to_file(self,filepath, tokens):
        """Save a set of tokens for later use."""
        with open(filepath, 'w') as f:
            json.dump(tokens, f)
            
    def _native_app_authentication(self,client_id):
        """
        Does a Native App authentication flow and returns a
        dict of tokens keyed by service name.
        """
        client = globus_sdk.NativeAppAuthClient(client_id=client_id)
        # pass refresh_tokens=True to request refresh tokens
        client.oauth2_start_flow(refresh_tokens=True)

        authorize_url = client.oauth2_get_authorize_url()

        print('Please go to this URL and login: {0}'.format(authorize_url))

        get_input = getattr(__builtins__, 'raw_input', input)
        auth_code = get_input(
            'Please enter the code you get after login here: ').strip()
        token_response = client.oauth2_exchange_code_for_tokens(auth_code)

        # return a set of tokens, organized by resource server name
        return token_response.by_resource_server
        
    def _get_transfer_client(self,client_id,tokens):
        transfer_tokens = tokens['transfer.api.globus.org']

        auth_client = globus_sdk.NativeAppAuthClient(client_id=client_id)

        authorizer = globus_sdk.RefreshTokenAuthorizer(
            transfer_tokens['refresh_token'],
            auth_client,
            access_token=transfer_tokens['access_token'],
            expires_at=transfer_tokens['expires_at_seconds'])

        transfer = globus_sdk.TransferClient(authorizer=authorizer)
        
        return transfer

    def upload_resource(self):
        pass

    def download_resource(self,package,resource,dest_endpoint,dest_path):
    
        #check dest_path format
        if dest_path[-1] != '/':
            dest_path = dest_path + '/'
        
        #get resource info
        pkg_metadata = self.ckan.action.package_show(id=package)
        resource_path = [metadata['url'] for metadata in pkg_metadata['resources'] \
                         if metadata['name']==resource][0]
        resource_path = resource_path.replace("file://CROSSBOW_NFS/", '/data/cades-crossbow/')
        filename = os.path.basename(resource_path)

        #execute transfer
        tdata = globus_sdk.TransferData(self.transfer, self.cadesdtn, dest_endpoint,
                                    label="Crossbow Transfer", sync_level="checksum")
        tdata.add_item(resource_path,dest_path+filename)
        transfer_result = self.transfer.submit_transfer(tdata)
        print("task_id =", transfer_result["task_id"])
        
        return transfer_result["task_id"]

    def delete_resource(self):
        ddata = globus_sdk.DeleteData(self.transfer, self.cadesdtn)
        ddata.add_item("/file/to/delete/file.txt")
        delete_result = self.transfer.submit_delete(ddata)
        print("task_id =", delete_result["task_id"])
        
    def check_task_status(self):
        pass

#add model api later
#add scheduling api later (for both filters and models)

if __name__ == "__main__":
    cbow = crossbowGlobus(api_key="eaabd7d9-3cb4-4014-85fe-73736e658472")
    packages = cbow.list_packages()
    pkg_meta = cbow.get_package_details(packages[0])
    resources = cbow.list_resources(packages[0])
    rsc_meta = cbow.get_resource_details(packages[0],resources[0])
+5 −229
Original line number Diff line number Diff line
from ckanapi import RemoteCKAN
from crossbowBase import crossbowBase
from progressbar import ProgressBar, Bar, Percentage
from clint.textui import progress
import os

class crossbowBase(object):
    '''
    oak ridge national labs cross-platform big-data operational workflow
    base class for crossbowMount and crossbowGlobus
    '''

    def __init__(self,api_key,CKAN_url="http://128.219.185.145:5000",
                 user_agent='crossbow'):

        #connect to CKAN webserver
        self.ckan = RemoteCKAN(CKAN_url, apikey=api_key, user_agent=user_agent)

    def list_packages(self):
        '''
        returns all packages (datasets) available on CKAN

        output: list
            list of package names available on CKAN
        '''
        return self.ckan.action.package_list()

    def get_package_details(self,package):
        '''
        prints and returns details for a specific package

        parameters:
          - package: string
            name of package

        output: dictionary
            metadata for specific package
        '''
        metadata = self.ckan.action.package_show(id=package)
        print "package name: %s" % package
        print "package title: %s" % metadata['title']
        print "package description: %s" % metadata['notes']
        print "package author: %s" % metadata['author']
        print "author email: %s" % metadata['author_email']
        print "package maintainer: %s" % metadata['maintainer']
        print "maintainer email: %s" % metadata['maintainer_email']
        print "version: %s" % metadata['version']
        print "created: %s" % metadata['metadata_created']
        print "last modified: %s" % metadata['metadata_modified']
        print "resources: ", self.list_resources(package)
        print "tags: %s" % metadata['tags']

        return metadata

    def create_package(self,package,owner_org,title=None,description=None,
                       author=None,author_email=None,maintainer=None,
                       maintainer_email=None,version=None,tags=[]):
        '''
        creates a new package (dataset)

        parameters:
          - package: string
            unique name for the package, should be lowercase with no spaces
          - owner_org: string
            name of organization that owns the package 
          - title: string (optional)
            title of the package to be displayed on CKAN website
          - description: string (optional)
            description of package
          - author: string (optional)
            author of package
          - author_email: string (optional)
            email address of author
          - maintainer: string (optional)
            maintainer for package
          - maintainer_email: string (optional)
            email address of maintainer
          - version: string (optional)
            version of package
          - tags: list (optional)
            list of tag dictionaries to add to the package
        '''
        if " " in package or any(x.isupper() for x in package):
            raise Exception('package must be lowercase and without spaces')
        self.ckan.action.package_create(
                name=package,owner_org=owner_org,title=title,author=author,
                author_email=author_email,maintainer=maintainer,
                maintainer_email=maintainer_email,notes=description,
                version=version,tags=tags)

    def edit_package(self,package,*args,**kwargs):
        '''
        edit an existing package (dataset)

        parameters:
          - package: string
            name of package
          see create_package() function for additional parameters
        '''
        #add args to kwargs dic
        keywords = ['owner_org','title','notes','author','author_email',\
                    'maintainer','maintainer_email','version','tags']
        for idx,arg in enumerate(args):
            kwargs[keywords[idx]] = arg
        
        #replace kwargs keywords that don't match ckanapi keywords 
        if 'description' in kwargs:
            kwargs['notes'] = kwargs['description']
            del kwargs['description']

        #get existing metadata
        metadata = self.ckan.action.package_show(id=package)

        #replace with new edits
        for key,val in kwargs.iteritems():
            metadata[key] = val

        self.ckan.action.package_update(**metadata)

    def list_resources(self,package):
        '''
        returns all resources for a particular package

        parameters:
          - package: string
            name of package

        output: list
            names of available resources in package
        '''
        metadata = self.ckan.action.package_show(id=package)
        resources = [resource['name'] for resource in metadata['resources']]
        return resources

    def list_dcd_resources(self,package):
        '''
        returns all .dcd resources for a particular package

        parameters:
          - package: string
            name of package

        output: list
            names of available resources in package
        '''
        metadata = self.ckan.action.package_show(id=package)
        resources = [resource['name'] for resource in metadata['resources'] \
                     if resource['url'][-4:]==".dcd"]
        return resources

    def list_non_dcd_resources(self,package):
        '''
        returns all non .dcd resources for a particular package

        parameters:
          - package: string
            name of package

        output: list
            names of available resources in package
        '''
        metadata = self.ckan.action.package_show(id=package)
        resources = [resource['name'] for resource in metadata['resources'] \
                     if resource['url'][-4:]!=".dcd"]
        return resources

    def get_resource_details(self,package,resource):
        '''
        prints and returns details for a specific resource

        parameters:
          - package: string
            name of package
          - resource: string
            name of resource

        output: dictionary
            metadata for specific resource
        '''
        pkg_metadata = self.ckan.action.package_show(id=package)
        rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \
                        if metadata['name']==resource][0]
        print "package name: %s" % package
        print "resource name: %s" % resource
        print "resource description: %s" % rsc_metadata['description']
        print "resource format: %s" % rsc_metadata['format']
        print "resource size: %s" % rsc_metadata['size']
        print "resource url: %s" % rsc_metadata['url']
        print "created: %s" % rsc_metadata['created']
        print "last modified: %s" % metadata['last_modified']

        return rsc_metadata

class crossbowMount(crossbowBase):
    '''
    oak ridge national labs cross-platform big-data operational workflow
@@ -313,6 +127,10 @@ class crossbowMount(crossbowBase):
        if os.path.isfile(nfs_path):
            raise Exception("File already exists on NFS, try using a different filename")

        #create directory if necessary
        if not os.path.exists(self.NFS_path + package):
            os.makedirs(self.NFS_path + package)

        #copy file to crossbow NFS
        prog = ProgressBar(widgets=[Percentage(), Bar()], maxval=100).start()
        self._copy_with_progress(
@@ -325,33 +143,6 @@ class crossbowMount(crossbowBase):
                package_id=package,url=url,name=resource,description=description,
                format=format,size=size)

    def edit_resource(self,package,resource,new_name=None,new_description=None):
        '''
        edit the name or description for an existing resource

        parameters:
          - package: string
            name of package
          - resource: string
            name of resource
          - new_name: string (optional)
            new name for resource
          - new_description: string (optional)
            new description for resource
        '''
        #get current metadata
        pkg_metadata = self.ckan.action.package_show(id=package)
        rsc_metadata = [metadata for metadata in pkg_metadata['resources'] \
                        if metadata['name']==resource][0]

        #update metadata with new edits
        if new_name:
            rsc_metadata['name'] = new_name
        if new_description:
            rsc_metadata['description'] = new_description

        self.ckan.action.resource_update(**rsc_metadata)

    def get_resource_path(self,package,resource):
         '''
         return the path to a resource on the local NFS mount
@@ -420,21 +211,6 @@ class crossbowMount(crossbowBase):
        resource_path = resource_path.replace("file://CROSSBOW_NFS/", self.NFS_path)
        os.remove(resource_path)

class crossbowGlobus(crossbowBase):
    '''
    oak ridge national labs cross-platform big-data operational workflow
    this version is for file transfers through Globus
    '''

    def upload_resource(self):
        pass
    def edit_resource(self):
        pass
    def download_resource(self):
        pass
    def delete_resource(self):
        pass

#add model api later
#add scheduling api later (for both filters and models)

+1 −1
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ wget https://repo.continuum.io/archive/Anaconda2-4.2.0-Linux-x86_64.sh
sudo bash Anaconda2-4.2.0-Linux-x86_64.sh
    #install path: /usr/share/anaconda2
    #don't add to path, manually add later
export PATH="/usr/share/anaconda2/bin:$PATH"
export PATH="/usr/share/anaconda2/bin:/usr/local/anaconda2/lib/python2.7/site-packages:$PATH"
vi ~/.bashrc
    export PATH="/usr/share/anaconda2/bin:/usr/local/anaconda2/lib/python2.7/site-packages:$PATH"
sudo chmod -R 777 /usr/share/anaconda2
+11 −0
Original line number Diff line number Diff line
module unload PE-intel/14.0.4-1.8.4
module load PE-gnu/4.7.1
module load cudatoolkit

wget https://repo.continuum.io/archive/Anaconda2-4.3.0-Linux-x86_64.sh
bash Anaconda2-4.3.0-Linux-x86_64.sh 
export PATH=/ccs/home/iamshang/anaconda2/bin:/ccs/home/iamshang/anaconda2/lib/python2.7/site-packages:$PATH
pip install theano
conda update --all

qsub pbs_file