Newer
Older
import subprocess
import threading
from .test_utils import (
BaseManagerTestCase,
)
from pulsar.managers.unqueued import CoexecutionManager
class Coexecutor(object):
"""Mimic shell script in other container of coexecutor pod-like environment."""
def __init__(self, manager):
self.manager = manager
self.has_command_line = False
self.command_line = None
def monitor(self):
while not self.has_command_line:
try:
command_line = self.manager.read_command_line("123")
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
except (IOError, ValueError):
continue
if not command_line:
# might be partially written... need to be make this atomic I think.
continue
self.command_line = command_line
self.has_command_line = True
subprocess.call(command_line, shell=True)
# we are ignoring this exit code and just trusting the one in the job script...
# I'm not sure what to do about that.
class CoexecutionManagerTest(BaseManagerTestCase):
def setUp(self):
super(CoexecutionManagerTest, self).setUp()
self._set_manager()
def tearDown(self):
super(CoexecutionManagerTest, self).setUp()
def _set_manager(self, **kwds):
self.manager = CoexecutionManager('_default_', self.app, **kwds)
def test_simple_execution(self):
coexecutor = Coexecutor(self.manager)
t = threading.Thread(target=coexecutor.monitor)
t.start()
try:
self._test_simple_execution(self.manager, timeout=5)
finally:
coexecutor.has_command_line = True
t.join(2)