Unverified Commit 96c9be32 authored by Marius van den Beek's avatar Marius van den Beek Committed by GitHub
Browse files

Merge pull request #18342 from mvdbeek/fix_copying_purged_files

[24.0] Do not copy purged outputs to object store
parents d7dad3a7 f0e09cc8
Loading
Loading
Loading
Loading
+10 −2
Original line number Diff line number Diff line
@@ -521,6 +521,7 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
                outdata.designation = designation
                outdata.dataset.external_filename = None  # resets filename_override
                # Move data from temp location to dataset location
                if not outdata.dataset.purged:
                    job_context.object_store.update_from_file(outdata.dataset, file_name=filename, create=True)
                primary_output_assigned = True
                continue
@@ -554,6 +555,7 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
                dataset_attributes=new_primary_datasets_attributes,
                creating_job_id=job_context.get_job_id() if job_context else None,
                storage_callbacks=storage_callbacks,
                purged=outdata.dataset.purged,
            )
            # Associate new dataset with job
            job_context.add_output_dataset_association(f"__new_primary_file_{name}|{designation}__", primary_data)
@@ -563,7 +565,13 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
        if primary_output_assigned:
            outdata.name = new_outdata_name
            outdata.init_meta()
            if not outdata.dataset.purged:
                try:
                    outdata.set_meta()
                except Exception:
                    # We don't want to fail here on a single "bad" discovered dataset
                    log.debug("set meta failed for %s", outdata, exc_info=True)
                    outdata.state = HistoryDatasetAssociation.states.FAILED_METADATA
            outdata.set_peek()
            outdata.discovered = True
            sa_session = job_context.sa_session
+7 −3
Original line number Diff line number Diff line
@@ -2000,10 +2000,14 @@ class MinimalJobWrapper(HasResourceParameters):
        quota_source_info = None
        # Once datasets are collected, set the total dataset size (includes extra files)
        for dataset_assoc in job.output_datasets:
            if not dataset_assoc.dataset.dataset.purged:
            dataset = dataset_assoc.dataset.dataset
            if not dataset.purged:
                # assume all datasets in a job get written to the same objectstore
                quota_source_info = dataset_assoc.dataset.dataset.quota_source_info
                collected_bytes += dataset_assoc.dataset.set_total_size()
                quota_source_info = dataset.quota_source_info
                collected_bytes += dataset.set_total_size()
            else:
                # Purge, in case job wrote directly to object store
                dataset.full_delete()

        user = job.user
        if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
+5 −1
Original line number Diff line number Diff line
@@ -293,7 +293,11 @@ def __copy_if_exists_command(work_dir_output):
    source_file, destination = work_dir_output
    if "?" in source_file or "*" in source_file:
        source_file = source_file.replace("*", '"*"').replace("?", '"?"')
    return f'\nif [ -f "{source_file}" ] ; then cp "{source_file}" "{destination}" ; fi'
    # Check if source and destination exist.
    # Users can purge outputs before the job completes,
    # in that case we don't want to copy the output to a purged path.
    # Static, non work_dir_output files are handled in job_finish code.
    return f'\nif [ -f "{source_file}" -a -f "{destination}" ] ; then cp "{source_file}" "{destination}" ; fi'


class CommandsBuilder:
+24 −4
Original line number Diff line number Diff line
@@ -380,6 +380,13 @@ class BaseJobRunner:
        job_tool = job_wrapper.tool
        for joda, dataset in self._walk_dataset_outputs(job):
            if joda and job_tool:
                if dataset.dataset.purged:
                    log.info(
                        "Output dataset %s for job %s purged before job completed, skipping output collection.",
                        joda.name,
                        job.id,
                    )
                    continue
                hda_tool_output = job_tool.find_output_def(joda.name)
                if hda_tool_output and hda_tool_output.from_work_dir:
                    # Copy from working dir to HDA.
@@ -618,10 +625,23 @@ class BaseJobRunner:

            tool_stdout_path = os.path.join(outputs_directory, "tool_stdout")
            tool_stderr_path = os.path.join(outputs_directory, "tool_stderr")
            try:
                with open(tool_stdout_path, "rb") as stdout_file:
                    tool_stdout = self._job_io_for_db(stdout_file)
                with open(tool_stderr_path, "rb") as stderr_file:
                    tool_stderr = self._job_io_for_db(stderr_file)
            except FileNotFoundError:
                if job.state in (model.Job.states.DELETING, model.Job.states.DELETED):
                    # We killed the job, so we may not even have the tool stdout / tool stderr
                    tool_stdout = ""
                    tool_stderr = "Job cancelled"
                else:
                    # Should we instead just move on ?
                    # In the end the only consequence here is that we won't be able to determine
                    # if the job failed for known tool reasons (check_tool_output).
                    # OTOH I don't know if this can even be reached
                    # Deal with it if we ever get reports about this.
                    raise

            check_output_detected_state = job_wrapper.check_tool_output(
                tool_stdout,
+1 −1
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ class MetadataCollectionStrategy(metaclass=abc.ABCMeta):
            rstring = f"Metadata results could not be read from '{filename_results_code}'"

        if not rval:
            log.debug(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
            log.warning(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
        return rval


Loading