From e7f356b93c9782084955bbcc9a69ee5603f954c1 Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Thu, 25 Apr 2019 12:45:40 -0400
Subject: [PATCH] Implement coexecution manager.

This manager is designed to just handle one job. It should be able to stage a job, write a command line to the job directory, and wait for an exit code to appear. Meant to be used alongside a container that actually runs the resulting tool.
---
 pulsar/managers/base/directory.py |  12 +++
 pulsar/managers/queued.py         |   4 +-
 pulsar/managers/unqueued.py       | 138 +++++++++++++++++++++---------
 test/manager_coexecution_test.py  |  58 +++++++++++++
 test/test_utils.py                |   9 +-
 5 files changed, 177 insertions(+), 44 deletions(-)
 create mode 100644 test/manager_coexecution_test.py

diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py
index 71136995..4a463dcd 100644
--- a/pulsar/managers/base/directory.py
+++ b/pulsar/managers/base/directory.py
@@ -18,6 +18,7 @@ JOB_FILE_STANDARD_ERROR = "stderr"
 JOB_FILE_TOOL_ID = "tool_id"
 JOB_FILE_TOOL_VERSION = "tool_version"
 JOB_FILE_CANCELLED = "cancelled"
+JOB_FILE_COMMAND_LINE = "command_line"
 
 
 class DirectoryBaseManager(BaseManager):
@@ -35,6 +36,14 @@ class DirectoryBaseManager(BaseManager):
     def stderr_contents(self, job_id):
         return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, size=self.maximum_stream_size, default=b"")
 
+    def read_command_line(self, job_id):
+        command_line = self._read_job_file(job_id, JOB_FILE_COMMAND_LINE)
+        if command_line.startswith(b'"'):
+            # legacy JSON...
+            import json
+            command_line = json.loads(command_line)
+        return command_line
+
     def _stdout_path(self, job_id):
         return self._job_file(job_id, JOB_FILE_STANDARD_OUTPUT)
 
@@ -70,6 +79,9 @@ class DirectoryBaseManager(BaseManager):
         job_directory.store_metadata(JOB_FILE_TOOL_ID, tool_id)
         job_directory.store_metadata(JOB_FILE_TOOL_VERSION, tool_version)
 
+    def _write_command_line(self, job_id, command_line):
+        self._write_job_file(job_id, JOB_FILE_COMMAND_LINE, command_line)
+
     def enable_metadata_directory(self, job_id):
         self._job_directory(job_id).enable_metadata_directory()
 
diff --git a/pulsar/managers/queued.py b/pulsar/managers/queued.py
index 37518780..688392fd 100644
--- a/pulsar/managers/queued.py
+++ b/pulsar/managers/queued.py
@@ -54,13 +54,13 @@ class QueueManager(Manager):
             setup_params=setup_params
         )
         try:
-            self._job_directory(job_id).store_metadata(JOB_FILE_COMMAND_LINE, command_line)
+            self._write_command_line(job_id, command_line)
         except Exception:
             log.info("Failed to persist command line for job %s, will not be able to recover." % job_id)
         self.work_queue.put((RUN, (job_id, command_line)))
 
     def _recover_active_job(self, job_id):
-        command_line = self._job_directory(job_id).load_metadata(JOB_FILE_COMMAND_LINE, None)
+        command_line = self.read_command_line(job_id)
         if command_line:
             self.work_queue.put((RUN, (job_id, command_line)))
         else:
diff --git a/pulsar/managers/unqueued.py b/pulsar/managers/unqueued.py
index 5aba3619..8ebeaced 100644
--- a/pulsar/managers/unqueued.py
+++ b/pulsar/managers/unqueued.py
@@ -1,10 +1,11 @@
 import os
-from subprocess import Popen
+import platform
 try:
     import thread
 except ImportError:
     import _thread as thread  # Py3K changed it.
-import platform
+import time
+from subprocess import Popen
 
 from .util import kill_pid
 from pulsar.managers.base.directory import DirectoryBaseManager
@@ -17,6 +18,44 @@ JOB_FILE_SUBMITTED = "submitted"
 JOB_FILE_PID = "pid"
 
 
+class BaseUnqueuedManager(DirectoryBaseManager):
+
+    def _record_submission(self, job_id):
+        self._job_directory(job_id).store_metadata(JOB_FILE_SUBMITTED, 'true')
+
+    def _get_status(self, job_id):
+        job_directory = self._job_directory(job_id)
+        if self._was_cancelled(job_id):
+            job_status = status.CANCELLED
+        elif job_directory.has_metadata(JOB_FILE_PID):
+            job_status = status.RUNNING
+        elif job_directory.has_metadata(JOB_FILE_SUBMITTED):
+            job_status = status.QUEUED
+        else:
+            job_status = status.COMPLETE
+        return job_status
+
+    def _finish_execution(self, job_id):
+        self._job_directory(job_id).remove_metadata(JOB_FILE_SUBMITTED)
+
+    def _prepare_run(self, job_id, command_line, dependencies_description, env, setup_params=None):
+        self._check_execution_with_tool_file(job_id, command_line)
+        self._record_submission(job_id)
+        if platform.system().lower() == "windows":
+            # TODO: Don't ignore requirements and env without warning. Ideally
+            # process them or at least warn about them being ignored.
+            command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory)
+        else:
+            command_line = self._setup_job_file(
+                job_id,
+                command_line,
+                dependencies_description=dependencies_description,
+                env=env,
+                setup_params=setup_params
+            )
+        return command_line
+
+
 # Job Locks (for status updates). Following methods are locked.
 #    _finish_execution(self, job_id)
 #    _get_status(self, job_id)
@@ -24,7 +63,7 @@ JOB_FILE_PID = "pid"
 #    _record_pid(self, job_id, pid)
 #    _get_pid_for_killing_or_cancel(self, job_id)
 #
-class Manager(DirectoryBaseManager):
+class Manager(BaseUnqueuedManager):
     """
     A simple job manager that just directly runs jobs as given (no
     queueing). Preserved for compatibilty with older versions of Pulsar
@@ -37,9 +76,6 @@ class Manager(DirectoryBaseManager):
     def __init__(self, name, app, **kwds):
         super(Manager, self).__init__(name, app, **kwds)
 
-    def _record_submission(self, job_id):
-        self._job_directory(job_id).store_metadata(JOB_FILE_SUBMITTED, 'true')
-
     def __get_pid(self, job_id):
         pid = None
         try:
@@ -87,21 +123,12 @@ class Manager(DirectoryBaseManager):
 
     # with job lock
     def _finish_execution(self, job_id):
-        self._job_directory(job_id).remove_metadata(JOB_FILE_SUBMITTED)
+        super(Manager, self)._finish_execution(job_id)
         self._job_directory(job_id).remove_metadata(JOB_FILE_PID)
 
     # with job lock
     def _get_status(self, job_id):
-        job_directory = self._job_directory(job_id)
-        if self._was_cancelled(job_id):
-            job_status = status.CANCELLED
-        elif job_directory.has_metadata(JOB_FILE_PID):
-            job_status = status.RUNNING
-        elif job_directory.has_metadata(JOB_FILE_SUBMITTED):
-            job_status = status.QUEUED
-        else:
-            job_status = status.COMPLETE
-        return job_status
+        return super(Manager, self)._get_status(job_id)
 
     # with job lock
     def _was_cancelled(self, job_id):
@@ -127,6 +154,16 @@ class Manager(DirectoryBaseManager):
         with self._get_job_lock(job_id):
             if self._was_cancelled(job_id):
                 return
+
+        proc, stdout, stderr = self._proc_for_job_id(job_id, command_line)
+        with self._get_job_lock(job_id):
+            self._record_pid(job_id, proc.pid)
+        if background:
+            thread.start_new_thread(self._monitor_execution, (job_id, proc, stdout, stderr))
+        else:
+            self._monitor_execution(job_id, proc, stdout, stderr)
+
+    def _proc_for_job_id(self, job_id, command_line):
         job_directory = self.job_directory(job_id)
         working_directory = job_directory.working_directory()
         stdout = self._open_standard_output(job_id)
@@ -135,33 +172,56 @@ class Manager(DirectoryBaseManager):
                        working_directory=working_directory,
                        stdout=stdout,
                        stderr=stderr)
-        with self._get_job_lock(job_id):
-            self._record_pid(job_id, proc.pid)
-        if background:
-            thread.start_new_thread(self._monitor_execution, (job_id, proc, stdout, stderr))
-        else:
-            self._monitor_execution(job_id, proc, stdout, stderr)
+        return proc, stdout, stderr
 
     def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None):
         command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params)
         self._run(job_id, command_line)
 
-    def _prepare_run(self, job_id, command_line, dependencies_description, env, setup_params=None):
-        self._check_execution_with_tool_file(job_id, command_line)
-        self._record_submission(job_id)
-        if platform.system().lower() == "windows":
-            # TODO: Don't ignore requirements and env without warning. Ideally
-            # process them or at least warn about them being ignored.
-            command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory)
-        else:
-            command_line = self._setup_job_file(
-                job_id,
-                command_line,
-                dependencies_description=dependencies_description,
-                env=env,
-                setup_params=setup_params
-            )
-        return command_line
+
+class CoexecutionManager(BaseUnqueuedManager):
+    """Manager that managers one job in a pod-like environment.
+
+    Assume some process in another container will execute the command.
+    """
+    manager_type = "coexecution"
+
+    def __init__(self, name, app, **kwds):
+        super(CoexecutionManager, self).__init__(name, app, **kwds)
+        self.singleton_job_id = "0"
+
+    def setup_job(self, input_job_id, tool_id, tool_version):
+        return self._setup_job_for_job_id(self.singleton_job_id, tool_id, tool_version)
+
+    def get_status(self, job_id):
+        return self._get_status(job_id)
+
+    def kill(self, job_id):
+        log.info("Attempting to kill job with job_id %s - unimplemented in CoexecutionManager..." % job_id)
+
+    def _monitor_execution(self, job_id):
+        return_code_path = self._return_code_path(job_id)
+        try:
+            while not os.path.exists(return_code_path):
+                time.sleep(0.1)
+                print("monitoring for %s" % return_code_path)
+                continue
+            print("found return code path...")
+            time.sleep(1)
+        finally:
+            self._finish_execution(job_id)
+
+    def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None):
+        command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params)
+        job_directory = self.job_directory(job_id)
+        working_directory = job_directory.working_directory()
+        command_line += " > '%s' 2> '%s'" % (
+            self._stdout_path(job_id),
+            self._stderr_path(job_id),
+        )
+        command_line = "cd '%s'; sh %s" % (working_directory, command_line)
+        self._write_command_line(job_id, command_line)
+        self._monitor_execution(job_id)
 
 
 def execute(command_line, working_directory, stdout, stderr):
diff --git a/test/manager_coexecution_test.py b/test/manager_coexecution_test.py
new file mode 100644
index 00000000..4e48d02d
--- /dev/null
+++ b/test/manager_coexecution_test.py
@@ -0,0 +1,58 @@
+import subprocess
+import threading
+
+from .test_utils import (
+    BaseManagerTestCase,
+)
+
+from pulsar.managers.unqueued import CoexecutionManager
+
+
+class Coexecutor(object):
+    """Mimic shell script in other container of coexecutor pod-like environment."""
+
+    def __init__(self, manager):
+        self.manager = manager
+        self.has_command_line = False
+        self.command_line = None
+
+    def monitor(self):
+        singleton_job_id = "0"
+
+        while not self.has_command_line:
+            try:
+                command_line = self.manager.read_command_line(singleton_job_id)
+            except (IOError, ValueError):
+                continue
+            if not command_line:
+                # might be partially written... need to be make this atomic I think.
+                continue
+            self.command_line = command_line
+            self.has_command_line = True
+
+        subprocess.call(command_line, shell=True)
+        # we are ignoring this exit code and just trusting the one in the job script...
+        # I'm not sure what to do about that.
+
+
+class CoexecutionManagerTest(BaseManagerTestCase):
+
+    def setUp(self):
+        super(CoexecutionManagerTest, self).setUp()
+        self._set_manager()
+
+    def tearDown(self):
+        super(CoexecutionManagerTest, self).setUp()
+
+    def _set_manager(self, **kwds):
+        self.manager = CoexecutionManager('_default_', self.app, **kwds)
+
+    def test_simple_execution(self):
+        coexecutor = Coexecutor(self.manager)
+        t = threading.Thread(target=coexecutor.monitor)
+        t.start()
+        try:
+            self._test_simple_execution(self.manager, timeout=5)
+        finally:
+            coexecutor.has_command_line = True
+            t.join(2)
diff --git a/test/test_utils.py b/test/test_utils.py
index 6cbbde6f..a5603e52 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -165,12 +165,15 @@ class BaseManagerTestCase(TestCase):
         rmtree(self.staging_directory)
 
     @nottest
-    def _test_simple_execution(self, manager):
-        command = """python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stderr.write(\'moo\')" """
+    def _test_simple_execution(self, manager, timeout=None):
+        command = """python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stdout.flush(); sys.stderr.write(\'moo\'); sys.stderr.flush()" """
         job_id = manager.setup_job("123", "tool1", "1.0.0")
         manager.launch(job_id, command)
+
+        time_end = None if timeout is None else time.time() + timeout
         while manager.get_status(job_id) not in ['complete', 'cancelled']:
-            pass
+            if time_end and time.time() > time_end:
+                raise Exception("Timeout.")
         self.assertEqual(manager.stderr_contents(job_id), b'moo')
         self.assertEqual(manager.stdout_contents(job_id), b'Hello World!')
         self.assertEqual(manager.return_code(job_id), 0)
-- 
GitLab