Skip to content
Snippets Groups Projects
Commit 5fe13cbf authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

Merge branch...

Merge branch '1-create-pulsar-pull-and-push-hooks-that-handle-remote-data-management-to-a-remote-data-management' into 'dev'

Resolve "Create Pulsar "pull" and "push" hooks that handle remote data management to a remote data management API service"

Closes #1

See merge request !2
parents fa0c376b 94ef34c6
No related branches found
No related tags found
1 merge request!2Resolve "Create Pulsar "pull" and "push" hooks that handle remote data management to a remote data management API service"
......@@ -8,5 +8,6 @@ managers:
shell_plugin: ParamikoShellWKey
shell_port: 22
type: queued_cli_token
remote_data_broker_url: http://localhost:8090
message_queue_url: amqp://guest:guest@localhost:5672
......@@ -68,6 +68,9 @@ def _get_default_options(conf):
options[simple_key] = conf[simple_key]
options["debug"] = conf.get("debug", False)
maximum_stream_size = conf.get("maximum_stream_size", 1024 * 1024)
options["remote_data_broker_url"] = conf.get("remote_data_broker_url", "")
if maximum_stream_size:
options["maximum_stream_size"] = int(maximum_stream_size)
# mode to create job directories with, if None just use
......
......@@ -8,30 +8,36 @@ from pulsar.client.staging import PulsarOutputs
from pulsar.client.staging.down import ResultsCollector
import logging
log = logging.getLogger(__name__)
import requests, sys, json
def postprocess(job_directory, action_executor):
def postprocess(job_directory, action_executor, remote_data_broker_url="",
refresh_token="", job_id=""):
# Returns True if outputs were collected.
try:
if job_directory.has_metadata("launch_config"):
staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None)
else:
staging_config = None
collected = __collect_outputs(job_directory, staging_config, action_executor)
collected = __collect_outputs(job_directory, staging_config, action_executor,
remote_data_broker_url, refresh_token, job_id)
return collected
finally:
job_directory.write_file("postprocessed", "")
return False
def __collect_outputs(job_directory, staging_config, action_executor):
def __collect_outputs(job_directory, staging_config, action_executor, remote_data_broker_url,
refresh_token, job_id):
collected = True
if "action_mapper" in staging_config:
file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"])
client_outputs = staging.ClientOutputs.from_dict(staging_config["client_outputs"])
pulsar_outputs = __pulsar_outputs(job_directory)
output_collector = PulsarServerOutputCollector(job_directory, action_executor)
output_collector = PulsarServerOutputCollector(job_directory, action_executor,
remote_data_broker_url, refresh_token, job_id)
results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
collection_failure_exceptions = results_collector.collect()
if collection_failure_exceptions:
......@@ -57,9 +63,12 @@ def realized_dynamic_file_sources(job_directory):
class PulsarServerOutputCollector:
def __init__(self, job_directory, action_executor):
def __init__(self, job_directory, action_executor, remote_data_broker_url, refresh_token, job_id):
self.job_directory = job_directory
self.action_executor = action_executor
self.remote_data_broker_url = remote_data_broker_url
self.refresh_token = refresh_token
self.job_id = job_id
def collect_output(self, results_collector, output_type, action, name):
# Not using input path, this is because action knows it path
......@@ -71,8 +80,9 @@ class PulsarServerOutputCollector:
# TODO: Would not work on Windows. Any use in allowing
# remote_transfer action for Windows?
name = os.path.basename(action.path)
pulsar_path = self.job_directory.calculate_path(name, output_type)
if output_type=="output":
output_to_remote_data(self.remote_data_broker_url, self.refresh_token, pulsar_path, self.job_id)
description = "staging out file {} via {}".format(pulsar_path, action)
self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description)
......@@ -91,4 +101,28 @@ def __pulsar_outputs(job_directory):
)
def upload(broker_url: str, token: str, input_file: str, dataset_id: str):
data = {"input_path": input_file, "token": token, "dataset": dataset_id}
response = requests.post(broker_url + "/upload", json=data)
if response.status_code != 200:
raise requests.HTTPError("wrong response ", response.status_code, response.text)
res = response.json()
res["__galaxy_remote_file__"] = 1
res["remote_broker"] = broker_url
return res
def output_to_remote_data(remote_data_broker_url, refresh_token, path, job_id):
log.debug("uploading output to remote storage via %s" % remote_data_broker_url)
try:
response = upload(remote_data_broker_url, refresh_token, path, job_id)
except Exception as e:
print("cannot upload data via remote broker", file=sys.stderr)
sys.exit(str(e))
with open(path, "w", encoding="utf-8") as f:
f.write(json.dumps(response, indent=4, sort_keys=True))
__all__ = ('postprocess', 'realized_dynamic_file_sources')
......@@ -3,20 +3,49 @@
from pulsar.client.action_mapper import from_dict
import logging
import requests, json
log = logging.getLogger(__name__)
def preprocess(job_directory, setup_actions, action_executor, object_store=None):
def preprocess(job_directory, setup_actions, action_executor, object_store=None,
remote_data_broker_url="",
refresh_token=""):
for setup_action in setup_actions:
name = setup_action["name"]
input_type = setup_action["type"]
action = from_dict(setup_action["action"])
if getattr(action, "inject_object_store", False):
action.object_store = object_store
path = job_directory.calculate_path(name, input_type)
description = "Staging {} '{}' via {} to {}".format(input_type, name, action, path)
log.debug(description)
action_executor.execute(lambda: action.write_to_path(path), "action[%s]" % description)
remote_dataset = action.source['object_store_ref'].get("remote_dataset", False)
if remote_dataset:
fetch_remote_data(remote_data_broker_url, refresh_token, path)
def download(broker_url: str, token: str, remote_id: str, output_file: str):
data = {"uid": remote_id, "token": token, "output_path": output_file}
response = requests.post(broker_url + "/download", json=data)
if response.status_code != 200:
raise requests.HTTPError("wrong response ", response.status_code, response.text)
def fetch_remote_data(remote_data_broker_url, refresh_token, path):
log.debug("remote dataset, fetching data from remote storage via %s" % remote_data_broker_url)
with open(path, 'r', encoding="utf-8") as f:
data = json.load(f)
assert data["__galaxy_remote_file__"] == 1
remote_id = data["uid"]
try:
download(remote_data_broker_url, refresh_token, remote_id, path)
except Exception as e:
raise Exception("cannot download data via remote broker ") from e
__all__ = ('preprocess',)
......@@ -37,6 +37,13 @@ ACTIVE_STATUS_LAUNCHED = "launched"
DEFAULT_MIN_POLLING_INTERVAL = 0.5
def extract_refresh_token(launch_config):
refresh_token = ""
for var in launch_config.get("env", []):
if var['name'] == 'GALAXY_REFRESH_TOKEN':
refresh_token = var['value']
return refresh_token
class StatefulManagerProxy(ManagerProxy):
"""
......@@ -52,6 +59,7 @@ class StatefulManagerProxy(ManagerProxy):
self.min_polling_interval = datetime.timedelta(0, min_polling_interval)
self.active_jobs = ActiveJobs.from_manager(manager)
self.__state_change_callback = self._default_status_change_callback
self.remote_data_broker_url = manager_options.get("remote_data_broker_url", "")
self.__monitor = None
def set_state_change_callback(self, state_change_callback):
......@@ -106,7 +114,11 @@ class StatefulManagerProxy(ManagerProxy):
for action in staging_config['setup']:
action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key'])
setup_config = staging_config.get("setup", [])
preprocess(job_directory, setup_config, self.__preprocess_action_executor, object_store=self.object_store)
refresh_token = extract_refresh_token(launch_config)
preprocess(job_directory, setup_config, self.__preprocess_action_executor,
object_store=self.object_store,
remote_data_broker_url=self.remote_data_broker_url,
refresh_token=refresh_token)
self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING)
new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False)
......@@ -214,8 +226,11 @@ class StatefulManagerProxy(ManagerProxy):
def do_postprocess():
postprocess_success = False
job_directory = self._proxied_manager.job_directory(job_id)
launch_config = job_directory.load_metadata("launch_config")
refresh_token = extract_refresh_token(launch_config)
try:
postprocess_success = postprocess(job_directory, self.__postprocess_action_executor)
postprocess_success = postprocess(job_directory, self.__postprocess_action_executor,
self.remote_data_broker_url,refresh_token,job_id)
except Exception:
log.exception("Failed to postprocess results for job id %s" % job_id)
final_status = status.COMPLETE if postprocess_success else status.FAILED
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment