Commit 5f080ec6 authored by JoshuaSBrown's avatar JoshuaSBrown
Browse files

Simplify functions

parent f24c9a25
Pipeline #266962 passed with stage
in 43 seconds
......@@ -19,8 +19,10 @@ agent_app = typer.Typer()
def start(
log_path: str = typer.Option("", help="Path to logs on disk."),
debug: bool = typer.Option(False, help="If debug logs are enabled."),
zmq_activity_port: int = typer.Option(0, help="Port at which activities are received."),
zmq_heartbeat_port: int = typer.Option(0, help="Port for agent heartbeat communication."),
zmq_activity_port: int = typer.Option(
0, help="Port at which activities are received."),
zmq_heartbeat_port: int = typer.Option(
0, help="Port for agent heartbeat communication."),
config_path: str = typer.Option("", help="Path to config.")):
"""
Start the agent (set logger and ZMQ ports).
......
......@@ -20,6 +20,43 @@ from typing import Optional
import logging
def check_path(path_name: str, dest_path: str, msg: str, success: bool) -> (bool, str):
if not dest_path:
msg = msg + f"\nError {path_name} path cannot be empty"
return False, msg
if dest_path.endswith("/"):
msg = msg + f"\nError {path_name} path must end with a filename"
return False, msg
return success, msg
def contains_subkey(action_name, obj, key, subkey, is_success, msg) -> (bool, str):
"""Checks if the the object contains a subkey
:param obj: The object being checked
:type obj: The object is a dict
:param key: The objects key, where we will be checking for a subkey
:type key: str
:param subkey: The subkey of the object that is being checked
:type subkey: str
:param is_success: The parameter that determines if previous checks have
been successful.
:type is_success: bool
:param msg: The error messages that have been accumulated.
:type msg: str
This function checks to see if a subkey exists, if no problems are detected
then the original 'is_success' and 'msg' will be returned, if problems
are detected, then an error message will be appeneded to 'msg' and a 'False'
value will be returned.
"""
if subkey not in obj[key]:
msg = msg + f"\n'{subkey}' key not found in '{key}' in {action_name} action"
return False, msg
return is_success, msg
def checkEndpoint(endpoint_obj: dict, allowed_types) -> (bool, str):
"""Will make sure that the endpoint is a valid type and checks the path if
possible.
......@@ -30,6 +67,7 @@ def checkEndpoint(endpoint_obj: dict, allowed_types) -> (bool, str):
>>> }
"""
class Git(Plugin):
def __init__(self, logger: Optional[logging.Logger] = None) -> None:
self.__name = "git"
......@@ -225,45 +263,21 @@ class Git(Plugin):
'download' action.",
)
check_success = True
if "path" not in action_obj["source"]:
msg = msg + "\n'path' key not found in 'source' in 'download' action"
check_success = False
if "type" not in action_obj["source"]:
msg = msg + "\n'type' key not found in 'source' in 'download' action"
check_success = False
success = True
keys = ["source", "source", "destination", "destination", "credentials"]
subkeys = ["path", "type", "path", "type", "access_token"]
for (key, subkey) in zip(keys, subkeys):
success, msg = contains_subkey("download action", action_obj, key,
subkey, success, msg)
if "path" not in action_obj["destination"]:
msg = msg + "\n'path' key not found in 'destination' in 'download' action"
check_success = False
if "type" not in action_obj["destination"]:
msg = msg + "\n'type' key not found in 'destination' in 'download' action"
check_success = False
if "access_token" not in action_obj["credentials"]:
msg = (
msg
+ "\n'access_token' key not found in 'credentials' in \
'download' action"
)
check_success = False
if check_success:
if success:
owner_exists, error_msg = self.__checkRepoOwnerExists(action_obj["owner"])
if not owner_exists:
msg = msg + error_msg
check_success = False
success = False
source_path = action_obj["source"]["path"]
if not source_path:
msg = msg + "\nError source path cannot be empty"
check_success = False
if source_path.endswith("/"):
msg = msg + "\nError source path must end with a filename"
check_success = False
success, msg = check_path("source", source_path, msg, success)
# Only run these checks if previous checks have all passed
token = action_obj["credentials"]["access_token"]
......@@ -277,9 +291,9 @@ class Git(Plugin):
+ f" \nUnable to verify the existance of the 'repo':\
{action_obj['repo']} in 'download' action"
)
check_success = False
success = False
return check_success, msg
return success, msg
def __checkCommit(self, action_obj: dict) -> (bool, str):
"""Function ensures that the action_obj is provided with the right fields
......@@ -342,47 +356,19 @@ class Git(Plugin):
'commit' action.",
)
check_success = True
if "path" not in action_obj["source"]:
msg = msg + "\n'path' key not found in 'source' in 'commit' action"
check_success = False
if "type" not in action_obj["source"]:
msg = msg + "\n'type' key not found in 'source' in 'commit' action"
check_success = False
if "path" not in action_obj["destination"]:
msg = msg + "\n'path' key not found in 'destination' in 'commit' action"
check_success = False
if "type" not in action_obj["destination"]:
msg = msg + "\n'type' key not found in 'destination' in 'commit' action"
check_success = False
if "user_name" not in action_obj["credentials"]:
msg = msg + "\n'user_name' key not found in 'credentials' in "
msg = msg + "'commit' action"
check_success = False
success = True
keys = ["source", "source", "destination", "destination", "credentials",
"credentials", "credentials"]
subkeys = ["path", "type", "path", "type", "user_name", "access_token",
"email"]
for (key, subkey) in zip(keys, subkeys):
success, msg = contains_subkey("commit action", action_obj, key,
subkey, success, msg)
access_token = None
if "access_token" not in action_obj["credentials"]:
msg = (
msg
+ "\n'access_token' key not found in 'credentials' in \
'commit' action"
)
check_success = False
else:
if success:
access_token = action_obj["credentials"]["access_token"]
if "email" not in action_obj["credentials"]:
msg = (
msg
+ "'access_token' key not found in 'credentials' in \
'commit' action"
)
check_success = False
if "branch" in action_obj:
# Check if branch exists
branch_exists, error_msg = self.__checkBranchExists(
......@@ -398,24 +384,19 @@ class Git(Plugin):
+ f"'branch' {action_obj['branch']} does not exist on GitHub repo "
+ f"{action_obj['repo']} for owner {action_obj['owner']} "
)
check_success = False
success = False
if check_success:
if success:
owner_exists, error_msg = self.__checkRepoOwnerExists(
action_obj["owner"], access_token)
action_obj["owner"], access_token
)
if not owner_exists:
msg = msg + error_msg
check_success = False
success = False
dest_path = action_obj["destination"]["path"]
if not dest_path:
msg = msg + "\nError destionation path cannot be empty"
check_success = False
if dest_path.endswith("/"):
msg = msg + "\nError destination path must end with a filename"
check_success = False
success, msg = check_path("destination", dest_path, msg, success)
# Only run these checks if previous checks have all passed
repo_exists, error_msg = self.__checkRepoExists(
......@@ -428,9 +409,9 @@ class Git(Plugin):
+ f" \nUnable to verify the existance of the 'repo':\
{action_obj['repo']} in 'commit' action"
)
check_success = False
success = False
return check_success, msg
return success, msg
def __fileExistsOnRepo(self, action_obj, file_obj) -> (bool, dict):
"""Function for getting file if it exists
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment