Commit 2ac5a8af authored by Brown, Joshua's avatar Brown, Joshua
Browse files

Merge

parents 89b09763 a3239a42
......@@ -6,6 +6,7 @@ import os
import pytest
import random
import socket
import time
GITLAB_RUNNER_UUIDs = ["f4e5e85c-3a35-455f-9d91-1ee3a0564935"]
......@@ -167,7 +168,7 @@ def test_globus_basic1():
assert globus_plugin.configured
assert len(globus_plugin.supportedActions) == 1
assert len(globus_plugin.supportedActions) == 2
# We assume that we at least have access to the globus cloud
assert "transfer" in globus_plugin.supportedActions
......@@ -214,7 +215,7 @@ def test_globus_basic2():
globus_plugin.configure(configuration)
assert len(globus_plugin.supportedActions) == 3
assert len(globus_plugin.supportedActions) == 4
@pytest.mark.globus
......@@ -288,7 +289,6 @@ def test_globus_move_check():
]
checked_actions = globus_plugin.check(package)
print(checked_actions)
assert checked_actions["move_to_globus_collection"][0]
......@@ -606,7 +606,121 @@ def test_globus_process_from_esnet():
# This test is designed to move a file to the globus endpoint
# So before we get started we are going to make sure that a file
# does not already exist at that location
abs_path_destination = path_to_endpoint + sub_folder + "/1M.dat"
abs_path_destination = (
path_to_endpoint
+ sub_folder
+ '/1M.dat'
)
if os.path.exists(abs_path_destination):
os.remove(abs_path_destination)
if globus_plugin.check(package):
result = globus_plugin.process(package)
# After processing we should verify that the file exists at the final location
assert os.path.exists(abs_path_destination)
def test_globus_process_async():
required_env_variables = [
"ZAMBEZE_CI_TEST_GLOBUS_CLIENT_ID",
"ZAMBEZE_CI_TEST_GLOBUS_APP_KEY",
"ZAMBEZE_CI_TEST_GLOBUS_COLLECTION_UUID",
"ZAMBEZE_CI_TEST_GLOBUS_COLLECTION_SHARED_UUID",
]
for env_var in required_env_variables:
if env_var not in os.environ:
raise Exception(
"Globus test cannot be run if the env variable"
f" {env_var} is not defined and a local "
"globus-connect-server and endpoint have not been"
" set up."
)
path_to_endpoint = "/home/cades/Collections/default"
path_to_endpoint_shared = "/home/cades/Collections/default/shared"
configuration = {
"client_id": os.getenv(required_env_variables[0]),
"authentication_flow": {
"type": "client credential",
"secret": os.getenv(required_env_variables[1]),
},
"collections": [
{
"UUID": os.getenv(required_env_variables[2]),
"path": path_to_endpoint,
"type": "mapped",
},
{
"UUID": os.getenv(required_env_variables[3]),
"path": path_to_endpoint_shared,
"type": "guest",
},
],
}
globus_plugin = globus.Globus()
globus_plugin.configure(configuration)
# Create a file on the local posix system
file_name = "demofile_for_globus1.txt"
f = open(file_name, "w")
original_number = random.randint(0, 100000000000)
f.write(str(original_number))
f.close()
current_valid_path = os.getcwd()
file_path = current_valid_path + "/" + file_name
relative_destination_file_path = "/"
sub_folder = ""
if os.getenv(required_env_variables[0]) in GITLAB_RUNNER_UUIDs:
sub_folder = "runner/"
# action items in the list should be executed in order
package = [
{
"move_to_globus_collection": {
"destination_collection_UUID": os.getenv(required_env_variables[2]),
"source_host_name": socket.gethostname(),
"items": [
{
"source": {"type": "posix absolute", "path": file_path},
"destination": {
"type": "globus relative",
"path": relative_destination_file_path + sub_folder,
},
}
],
}
},
{
"transfer": {
"source_collection_UUID": os.getenv(required_env_variables[2]),
"destination_collection_UUID": os.getenv(required_env_variables[3]),
"type": "asynchronous",
"items": [
{
"source": {
"type": "globus relative",
"path": "/" + sub_folder + file_name,
},
"destination": {
"type": "globus relative",
"path": relative_destination_file_path
+ sub_folder
+ file_name,
},
}
],
}
},
]
abs_path_destination = path_to_endpoint + sub_folder + file_name
if os.path.exists(abs_path_destination):
os.remove(abs_path_destination)
......@@ -620,3 +734,35 @@ def test_globus_process_from_esnet():
# After processing we should verify that the file exists at the final location
assert os.path.exists(abs_path_destination)
abs_path_destination = (
path_to_endpoint
+ relative_destination_file_path
+ sub_folder
+ os.path.basename(file_path)
)
# After it gets transferred using globus it should end up moving to the subfolder
abs_path_destination_shared = (
path_to_endpoint
+ relative_destination_file_path
+ "shared/"
+ sub_folder
+ os.path.basename(file_path)
)
if os.path.exists(abs_path_destination):
os.remove(abs_path_destination)
if os.path.exists(abs_path_destination_shared):
os.remove(abs_path_destination_shared)
if globus_plugin.check(package):
result = globus_plugin.process(package)
result = globus_plugin.process([result["transfer"]["callback"]])
while result["get_task_status"]["result"]["status"] != "SUCCEEDED":
print("waiting...")
time.sleep(1)
result = globus_plugin.process([result["get_task_status"]["callback"]])
print("complete")
print(result["get_task_status"])
# After processing we should verify that the file exists at the final location
assert os.path.exists(abs_path_destination_shared)
......@@ -222,6 +222,7 @@ class Globus(Plugin):
"transfer": False,
"move_to_globus_collection": False,
"move_from_globus_collection": False,
"get_task_status": False,
}
pass
......@@ -285,6 +286,7 @@ class Globus(Plugin):
# possible
if self.__access_to_globus_cloud:
self.__supported_actions["transfer"] = True
self.__supported_actions["get_task_status"] = True
# If we have no errors at this point then and there is at least one collection
# then we can move to and from them
if len(self.__collections):
......@@ -352,31 +354,36 @@ class Globus(Plugin):
def __runTransfer(self, transfer: dict):
"""transfer dict must have the following format
{
"source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"destination_collection_UUID": "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
"type": "synchronous",
"items": [
{ "source": {
"type": "globus relative",
"path": "/file1.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file1.txt"
}
},
{ "source": {
"type": "globus relative",
"path": "/file2.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file2.txt"
}
}
]
}
:Example:
>>> {
>>> "source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
>>> "destination_collection_UUID": "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
>>> "type": "synchronous",
>>> "items": [
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
If the type is asynchrouns a runTransfer will return a callback action
that can be executed to check the status of the generated task
"""
tdata = globus_sdk.TransferData(
self.__tc,
......@@ -391,13 +398,37 @@ class Globus(Plugin):
clean_destination_path = re.sub("/+", "/", item["destination"]["path"])
tdata.add_item(clean_source_path, clean_destination_path)
transfer_result = self.__tc.submit_transfer(tdata)
transfer_result = {}
if "synchronous" == transfer["type"].lower():
transfer_result = self.__tc.submit_transfer(tdata)
task_id = transfer_result["task_id"]
while not self.__tc.task_wait(task_id, timeout=60):
print("Another minute went by without {0} terminating".format(task_id))
elif "asynchronous" == transfer["type"].lower():
result = self.__tc.submit_transfer(tdata)
transfer_result = {
"callback": {"get_task_status": {"task_id": result["task_id"]}},
"result": {"status": result["code"], "message": result["message"]},
}
return transfer_result
def __runGetTaskStatus(self, action_package: dict):
"""Method will check the status of a task
:Example:
>>> action_package = {
>>> "task_id": result["task_id"]
>>> }
"""
result = self.__tc.get_task(action_package["task_id"])
get_status_result = {
"callback": {"get_task_status": {"task_id": result["task_id"]}},
"result": {"status": result["status"], "message": result["nice_status"]},
}
return get_status_result
def __runMoveToGlobusCollection(self, action_package: dict):
"""Method is designed to move a local file to a Globus collection
......@@ -405,32 +436,32 @@ class Globus(Plugin):
"action_package" dict must have the following format
action_package = {
"source_host_name": "",
"destination_collection_UUID": ""
"items": [
{
"source": {
"type": "posix users home",
"path": "/file1.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file1.txt"
}
},
{
"source": {
"type": "posix users home",
"path": "/file2.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file2.txt"
}
}
]
}
>>> action_package = {
>>> "source_host_name": "",
>>> "destination_collection_UUID": ""
>>> "items": [
>>> {
>>> "source": {
>>> "type": "posix users home",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> {
>>> "source": {
>>> "type": "posix users home",
>>> "path": "/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
"""
endpoint_path = ""
for endpoint in self.__collections:
......@@ -454,6 +485,38 @@ class Globus(Plugin):
shutil.copyfile(source, destination)
def __runTransferSanityCheck(self, action_package: dict) -> (bool, str):
"""Checks to ensure that the action_package has the right format and
checks for errors.
:Example:
>>> {
>>> "source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
>>> "destination_collection_UUID": "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
>>> "type": "synchronous",
>>> "items": [
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> { "source": {
>>> "type": "globus relative",
>>> "path": "/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
"""
# Any agent with the globus plugin can submit a job to globus if it
# has access to the globus cloud
if not self.__access_to_globus_cloud:
......@@ -463,6 +526,16 @@ class Globus(Plugin):
return False, "'source_collection_UUID' missing from 'transfer'."
if "destination_collection_UUID" not in action_package:
return False, "'destination_collection_UUID' missing from 'transfer'."
if "type" not in action_package:
return False, "'type' missing from 'transfer'."
if action_package["type"] != "synchronous":
if action_package["type"] != "asynchronous":
return (
False,
"Unsupported 'type' detected. Supported types are \
synchronous and asynchronous you have specified {action_package['type']}",
)
if "items" not in action_package:
return False, "'items' missing from 'transfer'"
......@@ -513,6 +586,26 @@ class Globus(Plugin):
supported_destination_path_types,
)
def __runGetTaskStatusSanityCheck(self, action_package: dict) -> (bool, str):
"""Checks that the get_task_status action is correctly configured
:Example:
>>> action_package = {
>>> "task_id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
>>> }
"""
if "task_id" not in action_package:
return (False, "Missing 'task_id' in get_task_status action")
if not validUUID(action_package["task_id"]):
return (
False,
f"Invalid 'task_id' detected in 'get_task_status': \
{action_package['task_id']}",
)
return (True, "")
def __runMoveFromGlobusSanityCheck(self, action_package: dict) -> (bool, str):
"""Run a sanity check for the action "move_from_globus_collection"
......@@ -631,7 +724,6 @@ class Globus(Plugin):
if self.__flow == "native":
self.__nativeAuthFlow()
elif self.__flow == "client credential":
print("Client credential authorization")
self.__clientCredentialAuthFlow(config)
self.__access_to_globus_cloud = True
......@@ -767,6 +859,10 @@ class Globus(Plugin):
checks[action] = self.__runMoveFromGlobusSanityCheck(
package[index][action]
)
elif action == "get_task_status":
checks[action] = self.__runGetTaskStatusSanityCheck(
package[index][action]
)
else:
checks[action] = (False, "Unrecognized action keyworkd")
return checks
......@@ -779,115 +875,118 @@ class Globus(Plugin):
Example 1
arguments = [
"transfer":
{
"source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"destination_collection_UUID":
"YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
"items": [
{
"source": {
"type": "globus relative",
"path": "/file1.txt"
},
"destination": {
"type": globus relative",
"path": "dest/file1.txt"
}
},
{
"source": {
"type": "globus relative",
"path": "/file2.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file2.txt"
}
}
]
}
}
]
>>> arguments = [
>>> "transfer":
>>> {
>>> "source_collection_UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
>>> "destination_collection_UUID":
>>> "YYYYYYYY-YYYY-YYYY-YYYY-YYYYYYYYYYYY",
>>> "items": [
>>> {
>>> "source": {
>>> "type": "globus relative",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> {
>>> "source": {
>>> "type": "globus relative",
>>> "path": "/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
>>> }
>>> ]
Example 2
arguments = [
"move_to_globus_collection": {
"source_host_name": "",
"destination_collection_UUID": "",
"items": [
{
"source": {
"type": "posix user home",
"path": "/file1.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file1.txt"
}
},
{
"source": {
"type": "posix absolute",
"path": "/home/cades/file2.txt"
},
"destination": {
"type": "globus relative",
"path": "dest/file2.txt"
}
}
]
}
]
>>> arguments = [
>>> "move_to_globus_collection": {
>>> "source_host_name": "",
>>> "destination_collection_UUID": "",
>>> "items": [
>>> {
>>> "source": {
>>> "type": "posix user home",
>>> "path": "/file1.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file1.txt"
>>> }
>>> },
>>> {
>>> "source": {
>>> "type": "posix absolute",
>>> "path": "/home/cades/file2.txt"
>>> },
>>> "destination": {
>>> "type": "globus relative",
>>> "path": "dest/file2.txt"
>>> }
>>> }
>>> ]
>>> }
>>> ]
Example 3
arguments = [
"move_from_globus_collection": {
"source_host_name": "",
"destination_collection_UUID": "",
"items": [
{
"source": {
"type": "globus relative",
"path": "dest/file1.txt"
},
"destination": {
"type": "posix user home",
"path": "/file1.txt"
}
},
{
"source": {
"type": "globus relative",
"path": "dest/file2.txt"
},
"destination": {
"type": "posix user home",
"path": "/file2.txt"
}
}
]
}
]
>>> arguments = [
>>> "move_from_globus_collection": {