Commit 480ce7a3 authored by Ferreira Da Silva, Rafael's avatar Ferreira Da Silva, Rafael
Browse files

fixing env vars issue

parent 19500b9e
Pipeline #268075 canceled with stage
......@@ -49,7 +49,7 @@ class ShellActivity(Activity):
if "env_vars" in kwargs:
self.env_vars = kwargs.get("env_vars")
else:
self.env_vars = None
self.env_vars = []
def generate_message(self) -> dict:
return {
......
......@@ -10,7 +10,6 @@ import logging
import zmq
import pickle
from .activities.abstract_activity import Activity
from zambeze.orchestration.agent.commands import agent_start
......@@ -35,16 +34,17 @@ class Campaign:
logger: Optional[logging.Logger] = None,
) -> None:
"""Create an object that represents a science campaign."""
self.logger: logging.Logger = (
self._logger: logging.Logger = (
logging.getLogger(__name__) if logger is None else logger
)
self.name: str = name
self.activities: list[Activity] = activities
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.REQ)
self.zmq_socket.connect("tcp://127.0.0.1:5555")
agent_start(self.logger)
self._zmq_context = zmq.Context()
self._zmq_socket = self._zmq_context.socket(zmq.REQ)
self._zmq_socket.connect("tcp://localhost:5555")
# agent_start(self._logger)
def add_activity(self, activity: Activity) -> None:
"""Add an activity to the campaign.
......@@ -52,17 +52,18 @@ class Campaign:
:param activity: An activity object.
:type activity: Activity
"""
self.logger.debug(f"Adding activity: {activity.name}")
self._logger.debug(f"Adding activity: {activity.name}")
self.activities.append(activity)
def dispatch(self) -> None:
"""Dispatch the set of current activities in the campaign."""
self.logger.info(f"Number of activities to dispatch: {len(self.activities)}")
self._logger.info(f"Number of activities to dispatch: {len(self.activities)}")
for activity in self.activities:
self.logger.debug(f"Running activity: {activity.name}")
self._logger.debug(f"Running activity: {activity.name}")
# Dump dict into string (.dumps) and serialize string
# as bytestring (.encode)
serial_activity = pickle.dumps(activity)
self.zmq_socket.send(serial_activity)
self.logger.info(f"REPLY: {self.zmq_socket.recv()}")
self._zmq_socket.send(serial_activity)
self._logger.info(f"REPLY: {self._zmq_socket.recv()}")
......@@ -14,9 +14,9 @@ import zmq
from typing import Optional
from uuid import uuid4
from zambeze.orchestration.processor import Processor, MessageType
from zambeze.campaign.activities.abstract_activity import Activity, ActivityStatus
from zambeze.settings import ZambezeSettings
from ..processor import Processor, MessageType
from ...campaign.activities.abstract_activity import Activity, ActivityStatus
from ...settings import ZambezeSettings
class Agent:
......@@ -38,31 +38,31 @@ class Agent:
logging.getLogger(__name__) if logger is None else logger
)
self.agent_id = uuid4()
self._agent_id = uuid4()
self._settings = ZambezeSettings(conf_file=conf_file, logger=self._logger)
self._processor = Processor(settings=self._settings, logger=self._logger)
self._processor.start()
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.REP)
self.zmq_socket.bind("tcp://*:5555")
self._zmq_context = zmq.Context()
self._zmq_socket = self._zmq_context.socket(zmq.REP)
self._zmq_socket.bind(self._settings.get_zmq_connection_uri())
while True:
self._logger.info("Waiting for new activities from campaign(s)...")
self.receive_activity_from_campaign()
self._receive_activity_from_campaign()
def receive_activity_from_campaign(self):
def _receive_activity_from_campaign(self) -> None:
"""
Receive activity messages via ZMQ
"""
# Receive and unwrap the activity message from ZMQ.
activity_message = pickle.loads((self.zmq_socket.recv()))
activity_message = pickle.loads((self._zmq_socket.recv()))
self._logger.info(f"Received message from campaign: {activity_message}")
# Dispatch the activity!
self.dispatch_activity(activity_message)
self.zmq_socket.send(b"Agent successfully dispatched task!")
self._zmq_socket.send(b"Agent successfully dispatched task!")
@property
def processor(self) -> Processor:
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Oak Ridge National Laboratory.
......@@ -101,6 +100,5 @@ class Shell(Plugin):
# (dev note 2: shell=False can lead to shell injection attacks
# if cmd coming from untrusted source. See:
# https://stackoverflow.com/questions/21009416/python-subprocess-security)
# shell_exec = subprocess.Popen(shell_cmd, shell=True, env=parent_env)
shell_exec = subprocess.Popen(shell_cmd, shell=True)
shell_exec = subprocess.Popen(shell_cmd, shell=True, env=parent_env)
shell_exec.wait()
......@@ -54,9 +54,10 @@ class ZambezeSettings:
# set default values
if not self.settings:
self.settings = {"nats": {}, "plugins": {}}
self.settings = {"nats": {}, "zmq": {}, "plugins": {}}
self.__set_default("host", "localhost", self.settings["nats"])
self.__set_default("port", 4222, self.settings["nats"])
self.__set_default("port", 5555, self.settings["zmq"])
self.__set_default("plugins", {}, self.settings)
self.__save()
......@@ -86,6 +87,16 @@ class ZambezeSettings:
port = self.settings["nats"]["port"]
return f"nats://{host}:{port}"
def get_zmq_connection_uri(self) -> str:
"""
Get the ZMQ connection URI.
:return: ZMQ connection URI
:rtype: str
"""
port = self.settings["zmq"]["port"]
return f"tcp://*:{port}"
def is_plugin_configured(self, plugin_name: str) -> bool:
"""
Check whether a plugin has been configured.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment