Commit 595265b8 authored by Stephan Lachnit's avatar Stephan Lachnit
Browse files

Merge branch 'b-locking' into 'v2.3-stable'

[v2.3-stable] Improve Locking

See merge request allpix-squared/allpix-squared!897
parents 6ab301b5 8ff1f6ab
Loading
Loading
Loading
Loading
+38 −35
Original line number Diff line number Diff line
@@ -343,7 +343,7 @@ std::pair<ModuleIdentifier, Module*> ModuleManager::create_unique_modules(void*
    set_module_after(old_settings);
    // Update execution time
    auto end = std::chrono::steady_clock::now();
    module_execution_time_[module] += static_cast<std::chrono::duration<long double>>(end - start).count();
    module_execution_time_[module] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();

    // Set the module directory afterwards to catch invalid access in constructor
    module->get_configuration().set<std::string>("_output_dir", output_dir);
@@ -456,7 +456,7 @@ std::vector<std::pair<ModuleIdentifier, Module*>> ModuleManager::create_detector
        set_module_after(old_settings);
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module] += static_cast<std::chrono::duration<long double>>(end - start).count();
        module_execution_time_[module] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();

        // Set the module directory afterwards to catch invalid access in constructor
        module->get_configuration().set<std::string>("_output_dir", output_dir);
@@ -624,11 +624,12 @@ void ModuleManager::initialize() {
        set_module_after(old_settings);
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module.get()] += static_cast<std::chrono::duration<long double>>(end - start).count();
        module_execution_time_[module.get()] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
    }
    LOG_PROGRESS(STATUS, "INIT_LOOP") << "Initialized " << modules_.size() << " module instantiations";
    auto end_time = std::chrono::steady_clock::now();
    total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count();
    initialize_time_ =
        static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
}

/**
@@ -758,7 +759,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
        auto event_function_with_module = [this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events](
                                              std::shared_ptr<Event> event,
                                              ModuleList::iterator module_iter,
                                              long double event_time,
                                              int64_t event_time,
                                              auto&& self_func) mutable -> void {
            // The RNG to be used by all events running on this thread
            static thread_local RandomNumberGenerator random_engine;
@@ -816,14 +817,15 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {

                // Update execution time
                auto end = std::chrono::steady_clock::now();
                std::lock_guard<std::mutex> stat_lock{event->stats_mutex_};

                auto duration = static_cast<std::chrono::duration<long double>>(end - start).count();
                event_time += duration;
                auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
                // Note: we do not need to lock a mutex because the std::map is not altered and its values are atomic.
                this->module_execution_time_[module.get()] += duration;

                if(plot) {
                    this->module_event_time_[module.get()]->Fill(static_cast<double>(duration));
                    std::lock_guard<std::mutex> stat_lock{event->stats_mutex_};
                    event_time += duration;
                    this->module_event_time_[module.get()]->Fill(
                        std::chrono::duration<double>(std::chrono::nanoseconds(duration)).count());
                }

                if(stop) {
@@ -852,7 +854,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
            auto buffered_events = thread_pool_->bufferedQueueSize();
            if(plot) {
                this->buffer_fill_level_->Fill(static_cast<double>(buffered_events));
                event_time_->Fill(static_cast<double>(event_time));
                event_time_->Fill(static_cast<double>(event_time) / 1e9);
            }

            finished_events++;
@@ -880,14 +882,14 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
    global_config.set<uint64_t>("number_of_events", finished_events);

    auto end_time = std::chrono::steady_clock::now();
    total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count();
    run_time_ = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());

    LOG(TRACE) << "Destroying thread pool";
    thread_pool_.reset();
}

static std::string seconds_to_time(long double seconds) {
    auto duration = std::chrono::duration<long long>(static_cast<long long>(std::round(seconds)));
static std::string nanoseconds_to_time(uint64_t nanoseconds) {
    auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(nanoseconds));

    std::string time_str;
    auto hours = std::chrono::duration_cast<std::chrono::hours>(duration);
@@ -934,7 +936,7 @@ void ModuleManager::finalize() {
        set_module_after(old_settings);
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module.get()] += static_cast<std::chrono::duration<long double>>(end - start).count();
        module_execution_time_[module.get()] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
    }

    // Store performance plots
@@ -971,7 +973,9 @@ void ModuleManager::finalize() {
    modules_file_->Close();
    LOG_PROGRESS(STATUS, "FINALIZE_LOOP") << "Finalization completed";
    auto end_time = std::chrono::steady_clock::now();
    total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count();
    finalize_time_ =
        static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
    auto total_time = initialize_time_ + run_time_ + finalize_time_;

    // Check for unused configuration keys:
    auto unused_keys = global_config.getUnusedKeys();
@@ -1002,35 +1006,34 @@ void ModuleManager::finalize() {
    }

    // Find the slowest module, and accumulate the total run-time for all modules
    long double slowest_time = 0, total_module_time = 0;
    int64_t slowest_time = 0, total_module_time = 0;
    std::string slowest_module;
    for(auto& module_time : module_execution_time_) {
        total_module_time += module_time.second;
        if(module_time.second > slowest_time) {
            slowest_time = module_time.second;
            slowest_module = module_time.first->getUniqueName();
    for(auto& module_exec_time : module_execution_time_) {
        total_module_time += module_exec_time.second;
        if(module_exec_time.second > slowest_time) {
            slowest_time = module_exec_time.second;
            slowest_module = module_exec_time.first->getUniqueName();
        }
    }
    LOG(STATUS) << "Executed " << modules_.size() << " instantiations in " << seconds_to_time(total_time_) << ", spending "
                << std::round((100 * slowest_time) / std::max(1.0l, total_module_time))
    LOG(STATUS) << "Executed " << modules_.size() << " instantiations in " << nanoseconds_to_time(total_time)
                << ", spending " << std::round((100 * slowest_time) / std::max(int64_t(1), total_module_time))
                << "% of time in slowest instantiation " << slowest_module;
    for(auto& module : modules_) {
        LOG(INFO) << " Module " << module->getUniqueName() << " took " << module_execution_time_[module.get()] << " seconds";
    }

    long double processing_time = 0;
    auto total_events = global_config.get<uint64_t>("number_of_events");
    if(total_events > 0) {
        processing_time = std::round((1000 * total_time_) / total_events);
        LOG(INFO) << " Module " << module->getUniqueName() << " took "
                  << Units::display(module_execution_time_[module.get()].load(), {"s", "ms"});
    }

    LOG(STATUS) << "Average processing time is \x1B[1m" << processing_time << " ms/event\x1B[0m, event generation at \x1B[1m"
                << std::round(global_config.get<double>("number_of_events") / total_time_) << " Hz\x1B[0m";
    auto processing_time =
        std::round((1000ul * run_time_) / std::max(uint64_t(1), global_config.get<uint64_t>("number_of_events")));
    LOG(STATUS) << "Average processing time is \x1B[1m" << Units::display(processing_time, {"ms", "us"})
                << "/event\x1B[0m, event generation at \x1B[1m"
                << std::round(global_config.get<double>("number_of_events") / static_cast<double>(run_time_))
                << " Hz\x1B[0m";

    if(global_config.get<unsigned int>("workers") > 0) {
        auto event_processing_time = std::round(processing_time * global_config.get<unsigned int>("workers"));
        LOG(STATUS) << "This corresponds to a processing time of \x1B[1m" << event_processing_time
                    << " ms/event\x1B[0m per worker";
        LOG(STATUS) << "This corresponds to a processing time of \x1B[1m"
                    << Units::display(event_processing_time, {"ms", "us"}) << "/event\x1B[0m per worker";
    }
}

+2 −2
Original line number Diff line number Diff line
@@ -156,12 +156,12 @@ namespace allpix {

        std::unique_ptr<TFile> modules_file_;

        std::map<Module*, long double> module_execution_time_;
        std::map<Module*, std::atomic_int64_t> module_execution_time_;
        std::map<Module*, Histogram<TH1D>> module_event_time_;
        Histogram<TH1D> event_time_;
        Histogram<TH1D> buffer_fill_level_;

        long double total_time_{};
        uint64_t initialize_time_{}, run_time_{}, finalize_time_{};

        std::map<std::string, void*> loaded_libraries_;

+1 −0
Original line number Diff line number Diff line
@@ -126,6 +126,7 @@ namespace allpix {
            uint64_t current_id_{0};
            using PQValue = std::pair<uint64_t, T>;
            std::priority_queue<PQValue, std::vector<PQValue>, std::greater<>> priority_queue_;
            std::atomic_size_t priority_queue_size_{0};
            std::condition_variable push_condition_;
            std::condition_variable pop_condition_;
            const size_t max_standard_size_;
+4 −4
Original line number Diff line number Diff line
@@ -47,6 +47,7 @@ namespace allpix {
            // Priority queue is missing a pop returning a non-const reference, so need to apply a const_cast
            out = std::move(const_cast<PQValue&>(priority_queue_.top())).second; // NOLINT
            priority_queue_.pop();
            priority_queue_size_--;
        } else { // pop_standard
            out = std::move(queue_.front());
            queue_.pop();
@@ -110,6 +111,7 @@ namespace allpix {

        // Push a new element to the queue and notify possible consumer
        priority_queue_.emplace(n, std::move(value));
        priority_queue_size_++;
        lock.unlock();
        pop_condition_.notify_one();
        return true;
@@ -152,10 +154,7 @@ namespace allpix {
        return queue_.size() + priority_queue_.size();
    }

    template <typename T> size_t ThreadPool::SafeQueue<T>::prioritySize() const {
        std::lock_guard<std::mutex> lock{mutex_};
        return priority_queue_.size();
    }
    template <typename T> size_t ThreadPool::SafeQueue<T>::prioritySize() const { return priority_queue_size_; }

    /*
     * Used to ensure no conditions are being waited for in pop when a thread or the application is trying to exit. The queue
@@ -164,6 +163,7 @@ namespace allpix {
    template <typename T> void ThreadPool::SafeQueue<T>::invalidate() {
        std::unique_lock<std::mutex> lock{mutex_};
        std::priority_queue<PQValue, std::vector<PQValue>, std::greater<>>().swap(priority_queue_);
        priority_queue_size_ = 0;
        std::queue<T>().swap(queue_);
        valid_ = false;
        lock.unlock();