Unverified Commit ecaa7471 authored by davelopez's avatar davelopez
Browse files

Move quota check to job enqueue

We don't have the object_store_id until we get to this point, and we need the object_store_id to check whether the target object store is subject to quota or not.
parent 31a3d48f
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -1510,7 +1510,7 @@ class MinimalJobWrapper(HasResourceParameters):
            job = self.get_job()
        if message is None:
            message = "Execution of this dataset's job is paused"
        if job.state == job.states.NEW:
        if job.state in (job.states.NEW, job.states.QUEUED):
            for dataset_assoc in job.output_datasets + job.output_library_datasets:
                dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED
                dataset_assoc.dataset.info = message
@@ -1610,10 +1610,18 @@ class MinimalJobWrapper(HasResourceParameters):
        self.set_job_destination(self.job_destination, None, flush=False, job=job)
        # Set object store after job destination so can leverage parameters...
        self._set_object_store_ids(job)
        # Now that we have the object store id, check if we are over the limit
        self._pause_job_if_over_quota(job)
        with transaction(self.sa_session):
            self.sa_session.commit()
        return True

    def _pause_job_if_over_quota(self, job):
        if self.app.quota_agent.is_over_quota(self.app, job, self.job_destination):
            log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
            message = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
            self.pause(job, message)

    def set_job_destination(self, job_destination, external_id=None, flush=True, job=None):
        """Subclasses should implement this to persist a destination, if necessary."""

+3 −11
Original line number Diff line number Diff line
@@ -555,18 +555,12 @@ class JobHandlerQueue(BaseJobHandlerQueue):
                    log.info("(%d) Job deleted by user while still queued" % job.id)
                elif job_state == JOB_ADMIN_DELETED:
                    log.info("(%d) Job deleted by admin while still queued" % job.id)
                elif job_state in (JOB_USER_OVER_QUOTA, JOB_USER_OVER_TOTAL_WALLTIME):
                    if job_state == JOB_USER_OVER_QUOTA:
                        log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
                        what = "your disk quota"
                    else:
                elif job_state == JOB_USER_OVER_TOTAL_WALLTIME:
                    log.info("(%d) User (%s) is over total walltime limit: job paused" % (job.id, job.user_id))
                        what = "your total job runtime"

                    job.set_state(model.Job.states.PAUSED)
                    for dataset_assoc in job.output_datasets + job.output_library_datasets:
                        dataset_assoc.dataset.dataset.state = model.Dataset.states.PAUSED
                        dataset_assoc.dataset.info = f"Execution of this dataset's job is paused because you were over {what} at the time it was ready to run"
                        dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your total job runtime at the time it was ready to run"
                        self.sa_session.add(dataset_assoc.dataset.dataset)
                    self.sa_session.add(job)
                elif job_state == JOB_ERROR:
@@ -740,8 +734,6 @@ class JobHandlerQueue(BaseJobHandlerQueue):

        if state == JOB_READY:
            state = self.__check_user_jobs(job, job_wrapper)
        if state == JOB_READY and self.app.quota_agent.is_over_quota(self.app, job, job_destination):
            return JOB_USER_OVER_QUOTA, job_destination
        # Check total walltime limits
        if state == JOB_READY and "delta" in self.app.job_config.limits.total_walltime:
            jobs_to_check = self.sa_session.query(model.Job).filter(