Commit ce7a84dc authored by Simon Spannagel's avatar Simon Spannagel
Browse files

ThreadPool: synchronize queue and thread poll using run_mutex_ and run_cnt_

(cherry picked from commit 8248d827)
parent 6c6976a1
Loading
Loading
Loading
Loading
+3 −5
Original line number Diff line number Diff line
@@ -86,7 +86,7 @@ void ThreadPool::checkException() {

void ThreadPool::wait() {
    std::unique_lock<std::mutex> lock{run_mutex_};
    run_condition_.wait(lock, [this]() { return exception_ptr_ != nullptr || (queue_.empty() && run_cnt_ == 0); });
    run_condition_.wait(lock, [this]() { return exception_ptr_ != nullptr || (run_cnt_ == 0); });
}

/**
@@ -106,18 +106,16 @@ void ThreadPool::worker(size_t min_thread_buffer,
            initialize_function();
        }

        // Increase the atomic run count and notify the master thread that we popped an event
        auto increase_run_cnt_func = [this]() noexcept { ++run_cnt_; };

        while(!done_) {
            Task task{nullptr};

            if(queue_.pop(task, increase_run_cnt_func, min_thread_buffer)) {
            if(queue_.pop(task, min_thread_buffer)) {
                // Execute task
                (*task)();
                // Fetch the future to propagate exceptions
                task->get_future().get();
                // Update the run count and propagate update
                std::unique_lock<std::mutex> lock{run_mutex_};
                --run_cnt_;
                run_condition_.notify_all();
            }
+1 −2
Original line number Diff line number Diff line
@@ -56,11 +56,10 @@ namespace allpix {
            /**
             * @brief Get the top value from the appropriate queue
             * @param out Reference where the value at the top of the queue will be written to
             * @param func Optional function to execute before releasing the queue mutex if pop was successful
             * @param buffer_left Optional number of jobs that should be left in priority buffer without stall on push
             * @return True if a task was acquired or false if pop was exited for another reason
             */
            bool pop(T& out, const std::function<void()>& func = nullptr, size_t buffer_left = 0);
            bool pop(T& out, size_t buffer_left = 0);

            /**
             * @brief Push a new value onto the standard queue, will block if queue is full
+4 −6
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ namespace allpix {
     */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstrict-overflow"
    template <typename T> bool ThreadPool::SafeQueue<T>::pop(T& out, const std::function<void()>& func, size_t buffer_left) {
    template <typename T> bool ThreadPool::SafeQueue<T>::pop(T& out, size_t buffer_left) {
        assert(buffer_left <= max_priority_size_);
        // Lock the mutex
        std::unique_lock<std::mutex> lock{mutex_};
@@ -52,11 +52,6 @@ namespace allpix {
            queue_.pop();
        }

        // Optionally execute the mutex protected function
        if(func != nullptr) {
            func();
        }

        // Notify possible pusher waiting to fill the queue
        lock.unlock();
        if(pop_priority) {
@@ -204,6 +199,9 @@ namespace allpix {
            } else {
                success = queue_.push(n, std::make_unique<std::packaged_task<void()>>(std::move(task_function)), false);
            }
            // Increment run count:
            std::unique_lock<std::mutex> lock{run_mutex_};
            ++run_cnt_;
        }
        if(success) {
            return future;