Commit f24c9a25 authored by JoshuaSBrown's avatar JoshuaSBrown
Browse files

Merge branch 'main' into JoshuaSBrown-git

parents bf247d2c 63d4a26f
......@@ -11,18 +11,51 @@ import logging
import pathlib
import typer
from zambeze.orchestration.agent import Agent, ZambezeSettings
from zambeze.orchestration.agent.agent import ZambezeSettings
from zambeze.orchestration.agent.commands import agent_start, agent_stop, agent_get_status
# Typer configuration (including 'agent' sub-app).
agent_app = typer.Typer()
app = typer.Typer()
app.add_typer(agent_app, name="agent")
# Logging and state configuration.
logger = logging.getLogger()
state = {"conf_file": None}
app = typer.Typer()
@app.command()
def agent():
logger.info("Starting Zambeze Agent")
agent = Agent(conf_file=state["conf_file"], logger=logger)
agent.processor.join()
@agent_app.command()
def start():
"""
Start Zambeze agent as its own daemonized subprocess. This will write logs to a user's ~/.zambeze directory and
automatically select ports for both data and heartbeat communications.
The 'start' command will resemble the following CLI command:
# zambeze-agent start --log-path /Users/tylerskluzacek/.zambeze/the_log.log --debug --zmq-heartbeat-port 5002 \
# --zmq-activity-port 5001
"""
agent_start(logger)
@agent_app.command()
def stop():
"""
Stop a user's running Zambeze agent.
"""
agent_stop(logger)
@agent_app.command()
def status():
"""
Stop a user's running Zambeze agent.
"""
agent_get_status(logger)
@app.command()
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Oak Ridge National Laboratory.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the MIT License.
import logging
import typer
from zambeze.orchestration.agent.agent import Agent
agent_state = {"conf_file": None}
agent_app = typer.Typer()
@agent_app.command()
def start(
log_path: str = typer.Option("", help="Path to logs on disk."),
debug: bool = typer.Option(False, help="If debug logs are enabled."),
zmq_activity_port: int = typer.Option(0, help="Port at which activities are received."),
zmq_heartbeat_port: int = typer.Option(0, help="Port for agent heartbeat communication."),
config_path: str = typer.Option("", help="Path to config.")):
"""
Start the agent (set logger and ZMQ ports).
"""
print(f"Log path: {log_path}")
agent_logger = logging.getLogger('agent')
fh = logging.FileHandler(log_path)
formatter = logging.Formatter(
"[Zambeze Agent] [%(levelname)s] %(asctime)s - %(message)s"
)
fh.setFormatter(formatter)
if debug:
agent_logger.setLevel(logging.DEBUG)
else:
agent_logger.setLevel(logging.INFO)
agent_logger.addHandler(fh)
agent_logger.info("*****************************************************")
agent_logger.info("*****************************************************")
agent_logger.info("Creating Zambeze agent subprocess with configuration:")
agent_logger.info("*****************************************************")
agent_logger.info(f"Log Path:\t\t{log_path}")
agent_logger.info(f"Config Path:\t\t{config_path}")
agent_logger.info(f"Debug Logs:\t\t{debug}")
agent_logger.info(f"ZMQ activity port:\t{zmq_activity_port}")
agent_logger.info(f"ZMQ heartbeat port:\t{zmq_heartbeat_port}")
agent_logger.info("*****************************************************")
agent_logger.info("*****************************************************")
# Quick fix to address empty string in CL-args.
if config_path == "":
config_path = None
Agent(conf_file=config_path, logger=agent_logger)
if __name__ == "__main__":
agent_app()
Getting Started
===============
In order to set up and start using Zambeze, you will need the following dependencies:
* A running nats server, see `nats documentation <https://docs.nats.io>`_.
* Python >= 3.7. We suggest using `Anaconda <https://www.anaconda.com>`_.
* (Optional) ImageMagick to try a first mock example use-case. It is available `here <https://imagemagick.org/>`_.
NATS Server Setup
------------------
.. code-block:: console
$ curl -L https://github.com/nats-io/nats-server/releases/download/v2.8.4/nats-server-v2.8.4-linux-amd64.zip -o nats-server.zip
$ unzip nats-server.zip
$ sudo cp nats-server /usr/bin/
Start the nats-server as follows:
.. code-block:: console
$ nats-server
If all goes well, a message similar to the following should appear:
.. code-block:: console
[26982] 2022/07/26 23:42:39.928152 [INF] Starting nats-server
[26982] 2022/07/26 23:42:39.928370 [INF] Version: 2.8.4
[26982] 2022/07/26 23:42:39.928378 [INF] Git: [66524ed]
[26982] 2022/07/26 23:42:39.928385 [INF] Name: NCVC6Y3EYJEFPMO7IJGFB2ZMXMMNTPKNFRMR57FKA7FDV46R2VFWDNCF
[26982] 2022/07/26 23:42:39.928391 [INF] ID: NCVC6Y3EYJEFPMO7IJGFB2ZMXMMNTPKNFRMR57FKA7FDV46R2VFWDNCF
[26982] 2022/07/26 23:42:39.930477 [INF] Listening for client connections on 0.0.0.0:4222
[26982] 2022/07/26 23:42:39.931565 [INF] Server is ready
Installing Zambeze
-------------------
......@@ -14,5 +14,7 @@ support@zambeze.org.
:maxdepth: 1
:hidden:
getting_started
user_api_reference
dev_api_reference
......@@ -43,6 +43,8 @@ activity = ShellActivity(
"a.gif",
],
logger=logger,
# Uncomment if running on M1 Mac.
env_vars=[("PATH", "$PATH:/opt/homebrew/bin")],
)
campaign.add_activity(activity)
......
......@@ -3,5 +3,6 @@ nats-py>=2.1.0
requests
pytest>=6.2.4
pyyaml>=5.4.1
pyzmq>=23.2.0
setuptools>=49.3.1
typer>=0.4.2
......@@ -42,5 +42,5 @@ setup(
"Topic :: System :: Distributed Computing",
],
python_requires=">=3.7",
scripts=["bin/zambeze"],
scripts=["bin/zambeze", "bin/zambeze-agent"],
)
......@@ -4,7 +4,7 @@ import nats
async def main():
# Connect to NATS!
nc = await nats.connect("172.22.1.67")
nc = await nats.connect("0.0.0.0")
# Receive messages on 'foo'
sub = await nc.subscribe("foo")
......
......@@ -6,10 +6,10 @@
# This program is free software: you can redistribute it and/or modify
# it under the terms of the MIT License.
from .version import __version__
from .version import __version__ # noqa: F401
from .campaign import Activity, Campaign
from .campaign.activities import ShellActivity
from .campaign import Activity, Campaign # noqa: F401
from .campaign.activities import ShellActivity # noqa: F401
__author__ = "https://zambeze.org"
__credits__ = "Oak Ridge National Laboratory"
......
......@@ -6,5 +6,5 @@
# This program is free software: you can redistribute it and/or modify
# it under the terms of the MIT License.
from .activities.abstract_activity import Activity
from .campaign import Campaign
from .activities.abstract_activity import Activity # noqa: F401
from .campaign import Campaign # noqa: F401
......@@ -6,5 +6,5 @@
# This program is free software: you can redistribute it and/or modify
# it under the terms of the MIT License.
from .abstract_activity import Activity
from .shell import ShellActivity
from .abstract_activity import Activity # noqa: F401
from .shell import ShellActivity # noqa: F401
......@@ -43,6 +43,7 @@ class Activity(ABC):
command: Optional[str] = None,
arguments: Optional[list[str]] = [],
logger: Optional[logging.Logger] = None,
**kwargs
) -> None:
"""Create an object that represents a science campaign activity."""
self.logger: logging.Logger = (
......@@ -53,6 +54,7 @@ class Activity(ABC):
self.command: str = command
self.arguments: list[str] = arguments
self.status: ActivityStatus = ActivityStatus.CREATED
self.__dict__.update(kwargs)
def add_files(self, files: list[str]) -> None:
"""Add a list of files to the dataset.
......
......@@ -16,12 +16,16 @@ class ShellActivity(Activity):
:param name: Campaign activity name.
:type name: str
:param files: List of file URIs.
:type files: Optional[list[str]]
:param command: Action's command.
:type command: Optional[str]
:param arguments: List of arguments.
:type arguments: Optional[list[str]]
:param logger: The logger where to log information/warning or errors.
:type logger: Optional[logging.Logger]
"""
......@@ -33,6 +37,7 @@ class ShellActivity(Activity):
command: Optional[str] = None,
arguments: Optional[list[str]] = [],
logger: Optional[logging.Logger] = None,
**kwargs
) -> None:
"""Create an object of a unix shell activity."""
super().__init__(name, files, command, arguments, logger)
......@@ -40,9 +45,21 @@ class ShellActivity(Activity):
logger if logger else logging.getLogger(__name__)
)
# Pull out environment variables, IF users submitted them.
if "env_vars" in kwargs:
self.env_vars = kwargs.get("env_vars")
else:
self.env_vars = None
def generate_message(self) -> dict:
return {
"plugin": "shell",
"files": self.files,
"cmd": {"bash": {"program": self.command, "args": self.arguments}},
"cmd": {
"bash": {
"program": self.command,
"args": self.arguments,
"env_vars": self.env_vars,
}
},
}
......@@ -7,9 +7,12 @@
# it under the terms of the MIT License.
import logging
import zmq
import pickle
from .activities.abstract_activity import Activity
from ..orchestration.agent import Agent
from zambeze.orchestration.agent.commands import agent_start
from typing import Optional
......@@ -37,7 +40,11 @@ class Campaign:
)
self.name: str = name
self.activities: list[Activity] = activities
self.agent = Agent(logger=self.logger)
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.REQ)
self.zmq_socket.connect("tcp://127.0.0.1:5555")
agent_start(self.logger)
def add_activity(self, activity: Activity) -> None:
"""Add an activity to the campaign.
......@@ -50,6 +57,12 @@ class Campaign:
def dispatch(self) -> None:
"""Dispatch the set of current activities in the campaign."""
self.logger.info(f"Number of activities to dispatch: {len(self.activities)}")
for activity in self.activities:
self.logger.debug(f"Running activity: {activity.name}")
self.agent.dispatch_activity(activity)
# Dump dict into string (.dumps) and serialize string
# as bytestring (.encode)
serial_activity = pickle.dumps(activity)
self.zmq_socket.send(serial_activity)
self.logger.info(f"REPLY: {self.zmq_socket.recv()}")
......@@ -9,11 +9,14 @@
import asyncio
import logging
import pathlib
import pickle
import zmq
from typing import Optional
from .processor import Processor, MessageType
from ..campaign.activities.abstract_activity import Activity, ActivityStatus
from ..settings import ZambezeSettings
from uuid import uuid4
from zambeze.orchestration.processor import Processor, MessageType
from zambeze.campaign.activities.abstract_activity import Activity, ActivityStatus
from zambeze.settings import ZambezeSettings
class Agent:
......@@ -34,12 +37,35 @@ class Agent:
self._logger: logging.Logger = (
logging.getLogger(__name__) if logger is None else logger
)
self.agent_id = uuid4()
self._settings = ZambezeSettings(conf_file=conf_file, logger=self._logger)
self._processor = Processor(settings=self._settings, logger=self._logger)
self._processor.start()
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.REP)
self.zmq_socket.bind("tcp://*:5555")
while True:
self._logger.info("Waiting for new activities from campaign(s)...")
self.receive_activity_from_campaign()
def receive_activity_from_campaign(self):
"""
Receive activity messages via ZMQ
"""
# Receive and unwrap the activity message from ZMQ.
activity_message = pickle.loads((self.zmq_socket.recv()))
self._logger.info(f"Received message from campaign: {activity_message}")
# Dispatch the activity!
self.dispatch_activity(activity_message)
self.zmq_socket.send(b"Agent successfully dispatched task!")
@property
def processor(self) -> None:
def processor(self) -> Processor:
return self._processor
def dispatch_activity(self, activity: Activity) -> None:
......@@ -49,6 +75,7 @@ class Agent:
:param activity: An activity object.
:type activity: Activity
"""
self._logger.error("Received activity for dispatch...")
asyncio.run(
self.processor.send(MessageType.COMPUTE.value, activity.generate_message())
)
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Oak Ridge National Laboratory.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the MIT License.
import json
import logging
import os
import pathlib
import subprocess
import zmq
from datetime import datetime
from signal import SIGKILL
# Pass stdout/stderr to devnull (in subprocesses) to avoid memory issues.
devnull = open(os.devnull, "wb")
zambeze_base_dir = pathlib.Path.home().joinpath(".zambeze")
state_path = zambeze_base_dir.joinpath("agent.state")
def agent_start(logger: logging.Logger) -> None:
"""
Start the agent via the local zambeze-agent utility and save initial state.
:param logger: The agent logger that writes to ~/.zambeze/logs
:type logger: logging.Logger
"""
logger.info("Initializing Zambeze Agent...")
# Create user dir and make sure the base logging path exists.
logs_base_dir = zambeze_base_dir.joinpath("logs")
# First check to make sure no agents already running!
if state_path.is_file():
with state_path.open("r") as f:
old_state = json.load(f)
if old_state["status"] == "RUNNING":
logger.info(
"[ERROR] Agent already running. "
"Please stop agent before running a new one!"
)
# Ensure that we have a folder in which to write logs.
try:
logs_base_dir.mkdir(exist_ok=True)
except OSError:
logger.error(f"Failed to create log directory: {logs_base_dir}")
return
# Create a random identifier for logs (UUID).
# Users can list them in order of date to see which one is latest.
zambeze_log_path = logs_base_dir.joinpath(
datetime.utcnow().strftime("%Y_%m_%d-%H_%M_%S_%f")[:-3]
)
# Randomly select two ports...
# Both ports should be available, because we're binding
# (i.e., making the 1st unavailable to choose the 2nd)
hb_socket = zmq.Context().socket(zmq.REP)
# hb_port = hb_socket.bind_to_random_port(
# "tcp://*", min_port=50000, max_port=60000, max_tries=100
# )
data_socket = zmq.Context().socket(zmq.REP)
# data_port = data_socket.bind_to_random_port(
# "tcp://*", min_port=50000, max_port=60000, max_tries=100
# )
# Technically creating a small RACE CONDITION to re-bind in agent.
# Will want to explore ways to avoid this.
data_socket.close()
hb_socket.close()
# *********** #
# TODO: Use ZMQ utilities to auto-find port.
data_port = 5555
hb_port = 5556
# *********** #
arg_list = [
"zambeze-agent",
"--log-path",
str(zambeze_log_path.resolve()),
"--debug",
"--zmq-heartbeat-port",
str(hb_port),
"--zmq-activity-port",
str(data_port),
]
logger.info(f"Command: {' '.join(arg_list)}")
# Open the subprocess and save the process state to file (for future access).
proc = subprocess.Popen(arg_list, stdout=devnull, stderr=devnull)
logger.info(f"Started agent with PID: {proc.pid}")
agent_state = {
"pid": proc.pid,
"log_path": str(zambeze_log_path.resolve()),
"zmq_heartbeat_port": hb_port,
"zmq_activity_port": data_port,
"status": "RUNNING",
}
with state_path.open("w") as f:
json.dump(agent_state, f)
def agent_stop(logger: logging.Logger) -> None:
"""
Stop the agent by killing its system process and updating the state file.
:param logger: The agent logger that writes to ~/.zambeze/logs
:type logger: logging.Logger
"""
logger.info("Received stop signal.")
# Check to make sure agent is *supposed to be* running.
if state_path.is_file():
with state_path.open("r") as f:
old_state = json.load(f)
if old_state["status"] in ["STOPPED", "INITIALIZED"]:
logger.info("Agent is already STOPPED. Exiting...")
return
# Sends kill signal and wait for child process to die.
logger.info(f"Killing the agent with PID: {old_state['pid']}")
try:
os.kill(old_state["pid"], SIGKILL)
try:
os.waitpid(old_state["pid"], 0)
except ChildProcessError:
# This block just means that 'kill' won the *race*.
pass
except ProcessLookupError:
logger.debug(
"Process ID does not exist: agent already terminated. Cleaning up..."
)
# Reset state to be correct.
old_state["status"] = "STOPPED"
old_state["pid"] = None
old_state["zmq_heartbeat_port"] = None
old_state["zmq_activity_port"] = None
# Flush state to disk.
with state_path.open("w") as f:
json.dump(old_state, f)
logger.info("Agent successfully stopped!\n")
def agent_get_status(logger: logging.Logger) -> None:
"""
Get the status of the user's local agent and print it to the console
(and do nothing else).
:param logger: The agent logger that writes to ~/.zambeze/logs
:type logger: logging.Logger
"""
if not state_path.is_file():
logger.info(
"Agent does not exist. You can start an agent with 'zambeze agent start'."
)
with state_path.open("r") as f:
old_state = json.load(f)
logger.info(f"Agent Status: {old_state['status']}.")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Oak Ridge National Laboratory.
......@@ -7,6 +8,7 @@
# it under the terms of the MIT License.
import logging
import os
import subprocess
# Local imports
......@@ -78,6 +80,27 @@ class Shell(Plugin):
for data in arguments:
cmd = data["bash"]["args"]
cmd.insert(0, data["bash"]["program"])
self._logger.debug(f"Running SHELL command: {' '.join(cmd)}")
shell_exec = subprocess.Popen(cmd)
# Take an image of the parent environment
# -- then add the environment variables to it.
parent_env = os.environ.copy()
try:
# env_tup : (key, value)
for env_tup in data["bash"]["env_vars"]:
parent_env[env_tup[0]] = env_tup[1]
except ValueError as e:
self._logger.error(f"Caught ValueError: {e}")
shell_cmd = " ".join(cmd)