Commit e425f3ec authored by JoshuaSBrown's avatar JoshuaSBrown
Browse files

Initial adjustment to allow async transfers with Globus

parent c55f328d
Pipeline #267425 failed with stage
in 0 seconds
......@@ -347,31 +347,36 @@ class Globus(Plugin):
def __runTransfer(self, transfer: dict):
"""transfer dict must have the following format
{
"source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"destination_collection_UUID": "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
"type": "synchronous",
"items": [
{ "source": {
"type": "globus relative",
"path": "/file1.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file1.txt"
}
},
{ "source": {
"type": "globus relative",
"path": "/file2.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file2.txt"
}
}
]
}
:Example:
>>> {
>>> "source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
>>> "destination_collection_UUID": "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
>>> "type": "synchronous",
>>> "items": [
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
If the type is asynchrouns a runTransfer will return a callback action
that can be executed to check the status of the generated task
"""
tdata = globus_sdk.TransferData(
self.__tc,
......@@ -386,12 +391,16 @@ class Globus(Plugin):
clean_destination_path = re.sub("/+", "/", item["destination"]["path"])
tdata.add_item(clean_source_path, clean_destination_path)
transfer_result = self.__tc.submit_transfer(tdata)
transfer_result = None
if "synchronous" == transfer["type"].lower():
transfer_result = self.__tc.submit_transfer(tdata)
task_id = transfer_result["task_id"]
while not self.__tc.task_wait(task_id, timeout=60):
print("Another minute went by without {0} terminating".format(task_id))
elif "asynchronous" == transfer["type"].lower():
transfer_result["callback"]["get_task_status"] = \
self.__tc.submit_transfer(tdata)
return transfer_result
def __runMoveToGlobusCollection(self, action_package: dict):
......@@ -452,6 +461,38 @@ class Globus(Plugin):
shutil.copyfile(source, destination)
def __runTransferSanityCheck(self, action_package: dict) -> (bool, str):
"""Checks to ensure that the action_package has the right format and
checks for errors.
:Example:
>>> {
>>> "source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
>>> "destination_collection_UUID": "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
>>> "type": "synchronous",
>>> "items": [
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
"""
# Any agent with the globus plugin can submit a job to globus if it
# has access to the globus cloud
if not self.__access_to_globus_cloud:
......@@ -461,6 +502,13 @@ class Globus(Plugin):
return False, "'source_collection_UUID' missing from 'transfer'."
if "destination_collection_UUID" not in action_package:
return False, "'destination_collection_UUID' missing from 'transfer'."
if "type" not in action_package:
return False, "'type' missing from 'transfer'."
if action_package["type"] != "synchronous":
if action_package["type"] != "asynchronous":
return False, "Unsupported 'type' detected. Supported types are \
synchronous and asynchronous you have specified {action_package['type']}"
if "items" not in action_package:
return False, "'items' missing from 'transfer'"
......@@ -883,20 +931,20 @@ class Globus(Plugin):
print("Running Globus plugin")
return_values = {}
for action_obj in arguments:
# Make sure that the action is supported
for key in action_obj:
print(key)
if key not in self.__supported_actions:
raise Exception(f"{key} is not supported.")
if key == "transfer":
self.__runTransfer(action_obj[key])
return_values[key] = self.__runTransfer(action_obj[key])
elif key == "move_to_globus_collection":
self.__runMoveToGlobusCollection(action_obj[key])
elif key == "move_from_globus_collection":
self.__runMoveFromGlobusCollection(action_obj[key])
return {}
return return_values
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment