Commit 42ee3d54 authored by Paul Schütze's avatar Paul Schütze
Browse files

Merge branch 'backport_threadpool' into 'v2.2-stable'

[v2.2-stable] Fix ThreadPool/Queue Synchronization

See merge request allpix-squared/allpix-squared!740
parents 9935ef2f a8aa2c24
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -729,7 +729,6 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
        if(terminate_) {
            LOG(INFO) << "Interrupting event loop after " << finished_events << " events because of request to terminate";
            thread_pool_->destroy();
            global_config.set<uint64_t>("number_of_events", finished_events);
            break;
        }

@@ -810,7 +809,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
                }

                if(stop) {
                    LOG(TRACE) << "Event " << event->number
                    LOG(DEBUG) << "Event " << event->number
                               << " was interrupted because of missing dependencies, rescheduling...";
                    // Store state of PRNG engine:
                    event->store_random_engine_state();
@@ -830,6 +829,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {

            // All modules finished, mark as complete
            thread_pool_->markComplete(event->number);
            LOG(INFO) << "Finished event " << event_num << " with seed " << event_seed;

            auto buffered_events = thread_pool_->bufferedQueueSize();
            if(plot) {
+11 −7
Original line number Diff line number Diff line
@@ -86,7 +86,7 @@ void ThreadPool::checkException() {

void ThreadPool::wait() {
    std::unique_lock<std::mutex> lock{run_mutex_};
    run_condition_.wait(lock, [this]() { return exception_ptr_ != nullptr || (queue_.empty() && run_cnt_ == 0); });
    run_condition_.wait(lock, [this]() { return exception_ptr_ != nullptr || (run_cnt_ == 0 || done_ == true); });
}

/**
@@ -106,22 +106,21 @@ void ThreadPool::worker(size_t min_thread_buffer,
            initialize_function();
        }

        // Increase the atomic run count and notify the master thread that we popped an event
        auto increase_run_cnt_func = [this]() noexcept { ++run_cnt_; };

        while(!done_) {
            Task task{nullptr};

            if(queue_.pop(task, increase_run_cnt_func, min_thread_buffer)) {
            if(queue_.pop(task, min_thread_buffer)) {
                // Execute task
                (*task)();
                // Fetch the future to propagate exceptions
                task->get_future().get();
                // Update the run count and propagate update
                --run_cnt_;
                std::unique_lock<std::mutex> lock{run_mutex_};
                if(--run_cnt_ == 0) {
                    run_condition_.notify_all();
                }
            }
        }

        // Execute the cleanup function at the end of run
        if(finalize_function) {
@@ -129,6 +128,7 @@ void ThreadPool::worker(size_t min_thread_buffer,
        }
    } catch(...) {
        // Check if the first exception thrown
        std::unique_lock<std::mutex> lock{run_mutex_};
        if(!has_exception_.test_and_set()) {
            // Save the first exception
            exception_ptr_ = std::current_exception();
@@ -141,8 +141,12 @@ void ThreadPool::worker(size_t min_thread_buffer,
}

void ThreadPool::destroy() {
    // Lock run mutex to synchronize with queue
    std::unique_lock<std::mutex> lock{run_mutex_};
    done_ = true;
    queue_.invalidate();
    run_condition_.notify_all();
    lock.unlock();

    for(auto& thread : threads_) {
        if(thread.joinable()) {
+1 −2
Original line number Diff line number Diff line
@@ -56,11 +56,10 @@ namespace allpix {
            /**
             * @brief Get the top value from the appropriate queue
             * @param out Reference where the value at the top of the queue will be written to
             * @param func Optional function to execute before releasing the queue mutex if pop was successful
             * @param buffer_left Optional number of jobs that should be left in priority buffer without stall on push
             * @return True if a task was acquired or false if pop was exited for another reason
             */
            bool pop(T& out, const std::function<void()>& func = nullptr, size_t buffer_left = 0);
            bool pop(T& out, size_t buffer_left = 0);

            /**
             * @brief Push a new value onto the standard queue, will block if queue is full
+4 −6
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ namespace allpix {
     */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstrict-overflow"
    template <typename T> bool ThreadPool::SafeQueue<T>::pop(T& out, const std::function<void()>& func, size_t buffer_left) {
    template <typename T> bool ThreadPool::SafeQueue<T>::pop(T& out, size_t buffer_left) {
        assert(buffer_left <= max_priority_size_);
        // Lock the mutex
        std::unique_lock<std::mutex> lock{mutex_};
@@ -52,11 +52,6 @@ namespace allpix {
            queue_.pop();
        }

        // Optionally execute the mutex protected function
        if(func != nullptr) {
            func();
        }

        // Notify possible pusher waiting to fill the queue
        lock.unlock();
        if(pop_priority) {
@@ -204,6 +199,9 @@ namespace allpix {
            } else {
                success = queue_.push(n, std::make_unique<std::packaged_task<void()>>(std::move(task_function)), false);
            }
            // Increment run count:
            std::unique_lock<std::mutex> lock{run_mutex_};
            ++run_cnt_;
        }
        if(success) {
            return future;