Skip to content
Snippets Groups Projects

Resolve "Create Pulsar "pull" and "push" hooks that handle remote data management to a remote data management API service"

All threads resolved!
5 files
+ 93
11
Compare changes
  • Side-by-side
  • Inline
Files
5
@@ -8,30 +8,36 @@ from pulsar.client.staging import PulsarOutputs
from pulsar.client.staging.down import ResultsCollector
import logging
log = logging.getLogger(__name__)
import requests, sys, json
def postprocess(job_directory, action_executor):
def postprocess(job_directory, action_executor, remote_data_broker_url="",
refresh_token="", job_id=""):
# Returns True if outputs were collected.
try:
if job_directory.has_metadata("launch_config"):
staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None)
else:
staging_config = None
collected = __collect_outputs(job_directory, staging_config, action_executor)
collected = __collect_outputs(job_directory, staging_config, action_executor,
remote_data_broker_url, refresh_token, job_id)
return collected
finally:
job_directory.write_file("postprocessed", "")
return False
def __collect_outputs(job_directory, staging_config, action_executor):
def __collect_outputs(job_directory, staging_config, action_executor, remote_data_broker_url,
refresh_token, job_id):
collected = True
if "action_mapper" in staging_config:
file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"])
client_outputs = staging.ClientOutputs.from_dict(staging_config["client_outputs"])
pulsar_outputs = __pulsar_outputs(job_directory)
output_collector = PulsarServerOutputCollector(job_directory, action_executor)
output_collector = PulsarServerOutputCollector(job_directory, action_executor,
remote_data_broker_url, refresh_token, job_id)
results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs)
collection_failure_exceptions = results_collector.collect()
if collection_failure_exceptions:
@@ -57,9 +63,12 @@ def realized_dynamic_file_sources(job_directory):
class PulsarServerOutputCollector:
def __init__(self, job_directory, action_executor):
def __init__(self, job_directory, action_executor, remote_data_broker_url, refresh_token, job_id):
self.job_directory = job_directory
self.action_executor = action_executor
self.remote_data_broker_url = remote_data_broker_url
self.refresh_token = refresh_token
self.job_id = job_id
def collect_output(self, results_collector, output_type, action, name):
# Not using input path, this is because action knows it path
@@ -71,8 +80,9 @@ class PulsarServerOutputCollector:
# TODO: Would not work on Windows. Any use in allowing
# remote_transfer action for Windows?
name = os.path.basename(action.path)
pulsar_path = self.job_directory.calculate_path(name, output_type)
if output_type=="output":
output_to_remote_data(self.remote_data_broker_url, self.refresh_token, pulsar_path, self.job_id)
description = "staging out file {} via {}".format(pulsar_path, action)
self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description)
@@ -91,4 +101,28 @@ def __pulsar_outputs(job_directory):
)
def upload(broker_url: str, token: str, input_file: str, dataset_id: str):
data = {"input_path": input_file, "token": token, "dataset": dataset_id}
response = requests.post(broker_url + "/upload", json=data)
if response.status_code != 200:
raise requests.HTTPError("wrong response ", response.status_code, response.text)
res = response.json()
res["__galaxy_remote_file__"] = 1
res["remote_broker"] = broker_url
return res
def output_to_remote_data(remote_data_broker_url, refresh_token, path, job_id):
log.debug("uploading output to remote storage via %s" % remote_data_broker_url)
try:
response = upload(remote_data_broker_url, refresh_token, path, job_id)
except Exception as e:
print("cannot upload data via remote broker", file=sys.stderr)
sys.exit(str(e))
with open(path, "w", encoding="utf-8") as f:
f.write(json.dumps(response, indent=4, sort_keys=True))
__all__ = ('postprocess', 'realized_dynamic_file_sources')
Loading