Unverified Commit c3b6cbfa authored by Marius van den Beek's avatar Marius van den Beek Committed by GitHub
Browse files

Merge pull request #19578 from davelopez/24.1_fix_job_paused_on_user_defined_object_store

[24.1] Fix job paused on user defined object store
parents 8c986bc8 adef4f8f
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -1510,7 +1510,7 @@ class MinimalJobWrapper(HasResourceParameters):
            job = self.get_job()
        if message is None:
            message = "Execution of this dataset's job is paused"
        if job.state == job.states.NEW:
        if job.state in (job.states.NEW, job.states.QUEUED):
            for dataset_assoc in job.output_datasets + job.output_library_datasets:
                dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED
                dataset_assoc.dataset.info = message
@@ -1610,10 +1610,18 @@ class MinimalJobWrapper(HasResourceParameters):
        self.set_job_destination(self.job_destination, None, flush=False, job=job)
        # Set object store after job destination so can leverage parameters...
        self._set_object_store_ids(job)
        # Now that we have the object store id, check if we are over the limit
        self._pause_job_if_over_quota(job)
        with transaction(self.sa_session):
            self.sa_session.commit()
        return True

    def _pause_job_if_over_quota(self, job):
        if self.app.quota_agent.is_over_quota(self.app, job, self.job_destination):
            log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
            message = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
            self.pause(job, message)

    def set_job_destination(self, job_destination, external_id=None, flush=True, job=None):
        """Subclasses should implement this to persist a destination, if necessary."""

+3 −11
Original line number Diff line number Diff line
@@ -555,18 +555,12 @@ class JobHandlerQueue(BaseJobHandlerQueue):
                    log.info("(%d) Job deleted by user while still queued" % job.id)
                elif job_state == JOB_ADMIN_DELETED:
                    log.info("(%d) Job deleted by admin while still queued" % job.id)
                elif job_state in (JOB_USER_OVER_QUOTA, JOB_USER_OVER_TOTAL_WALLTIME):
                    if job_state == JOB_USER_OVER_QUOTA:
                        log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
                        what = "your disk quota"
                    else:
                elif job_state == JOB_USER_OVER_TOTAL_WALLTIME:
                    log.info("(%d) User (%s) is over total walltime limit: job paused" % (job.id, job.user_id))
                        what = "your total job runtime"

                    job.set_state(model.Job.states.PAUSED)
                    for dataset_assoc in job.output_datasets + job.output_library_datasets:
                        dataset_assoc.dataset.dataset.state = model.Dataset.states.PAUSED
                        dataset_assoc.dataset.info = f"Execution of this dataset's job is paused because you were over {what} at the time it was ready to run"
                        dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your total job runtime at the time it was ready to run"
                        self.sa_session.add(dataset_assoc.dataset.dataset)
                    self.sa_session.add(job)
                elif job_state == JOB_ERROR:
@@ -740,8 +734,6 @@ class JobHandlerQueue(BaseJobHandlerQueue):

        if state == JOB_READY:
            state = self.__check_user_jobs(job, job_wrapper)
        if state == JOB_READY and self.app.quota_agent.is_over_quota(self.app, job, job_destination):
            return JOB_USER_OVER_QUOTA, job_destination
        # Check total walltime limits
        if state == JOB_READY and "delta" in self.app.job_config.limits.total_walltime:
            jobs_to_check = self.sa_session.query(model.Job).filter(
+3 −2
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ from sqlalchemy.sql import text

import galaxy.util
from galaxy.model.base import transaction
from galaxy.objectstore import is_user_object_store

log = logging.getLogger(__name__)

@@ -378,8 +379,8 @@ WHERE default_quota_association.type = :default_type
                self.sa_session.commit()

    def is_over_quota(self, app, job, job_destination):
        # Doesn't work because job.object_store_id until inside handler :_(
        # quota_source_label = job.quota_source_label
        if is_user_object_store(job.object_store_id):
            return False  # User object stores are not subject to quotas
        if job_destination is not None:
            object_store_id = job_destination.params.get("object_store_id", None)
            object_store = app.object_store
+66 −0
Original line number Diff line number Diff line
@@ -409,3 +409,69 @@ class TestPerUserObjectStoreUpgradesWithSecretsIntegration(
            assert object_store_json["template_version"] == 2
            assert "sec1" not in secrets
            assert "sec2" in secrets


class TestPerUserObjectStoreQuotaIntegration(BaseUserObjectStoreIntegration):
    @classmethod
    def handle_galaxy_config_kwds(cls, config):
        cls._write_template_and_object_store_config(config, LIBRARY_2)
        config["enable_quotas"] = True

    def test_user_object_store_does_not_pause_jobs_over_quota(self):
        object_store_id = self._create_simple_object_store()
        with self.dataset_populator.test_history() as history_id:

            def _run_tool(tool_id, inputs, preferred_object_store_id=None):
                response = self.dataset_populator.run_tool(
                    tool_id,
                    inputs,
                    history_id,
                    preferred_object_store_id=preferred_object_store_id,
                )
                self.dataset_populator.wait_for_history(history_id)
                return response

            # Create one dataset in the default object store
            hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3\n4 5 6\n7 8 9\n", wait=True)
            storage_info = self.dataset_populator.dataset_storage_info(hda1["id"])
            assert storage_info["object_store_id"] == "default"

            # Set a quota of 1 byte so running a tool will pause the job
            self._define_quota_in_bytes(1)

            # Run a tool
            hda1_input = {"src": "hda", "id": hda1["id"]}
            response = _run_tool("multi_data_param", {"f1": hda1_input, "f2": hda1_input})
            paused_job_id = response["jobs"][0]["id"]
            storage_info, _ = self._storage_info_for_job_output(response)
            assert storage_info["object_store_id"] == "default"

            # The job should be paused because the default object store is over quota
            state = self.dataset_populator.wait_for_job(paused_job_id)
            assert state == "paused"

            # Set the user object store as the preferred object store
            self.dataset_populator.set_user_preferred_object_store_id(object_store_id)

            # Run the tool again
            response = _run_tool("multi_data_param", {"f1": hda1_input, "f2": hda1_input})
            job_id = response["jobs"][0]["id"]
            storage_info, _ = self._storage_info_for_job_output(response)
            assert storage_info["object_store_id"] == object_store_id

            # The job should not be paused because the user object store is not subject to quotas
            state = self.dataset_populator.wait_for_job(job_id)
            assert state == "ok"

    def _define_quota_in_bytes(self, bytes: int):
        quotas = self.dataset_populator.get_quotas()
        assert len(quotas) == 0

        payload = {
            "name": "defaultquota1",
            "description": "first default quota",
            "amount": f"{bytes} bytes",
            "operation": "=",
            "default": "registered",
        }
        self.dataset_populator.create_quota(payload)