Commit 217bdd4b authored by Simon Spannagel's avatar Simon Spannagel
Browse files

Merge branch 'p/stat_locking' into 'master'

Some Improvements on Locking

See merge request allpix-squared/allpix-squared!885
parents 9af073ad 36dda535
Loading
Loading
Loading
Loading
+37 −35
Original line number Diff line number Diff line
@@ -348,7 +348,7 @@ std::pair<ModuleIdentifier, Module*> ModuleManager::create_unique_modules(void*
    set_module_after(old_settings);
    // Update execution time
    auto end = std::chrono::steady_clock::now();
    module_execution_time_[module] += static_cast<std::chrono::duration<long double>>(end - start).count();
    module_execution_time_[module] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();

    // Set the module directory afterwards to catch invalid access in constructor
    module->get_configuration().set<std::string>("_output_dir", output_dir);
@@ -461,7 +461,7 @@ std::vector<std::pair<ModuleIdentifier, Module*>> ModuleManager::create_detector
        set_module_after(old_settings);
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module] += static_cast<std::chrono::duration<long double>>(end - start).count();
        module_execution_time_[module] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();

        // Set the module directory afterwards to catch invalid access in constructor
        module->get_configuration().set<std::string>("_output_dir", output_dir);
@@ -661,7 +661,7 @@ void ModuleManager::initialize() {
        set_module_after(old_settings);
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module.get()] += static_cast<std::chrono::duration<long double>>(end - start).count();
        module_execution_time_[module.get()] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();

        // Book per-module performance plots
        if(global_config.get<bool>("performance_plots")) {
@@ -674,7 +674,8 @@ void ModuleManager::initialize() {
    }
    LOG_PROGRESS(STATUS, "INIT_LOOP") << "Initialized " << modules_.size() << " module instantiations";
    auto end_time = std::chrono::steady_clock::now();
    total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count();
    initialize_time_ =
        static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
}

/**
@@ -765,7 +766,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
            [this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events, &aborted_events](
                std::shared_ptr<Event> event,
                ModuleList::iterator module_iter,
                long double event_time,
                int64_t event_time,
                auto&& self_func) mutable -> void {
            // The RNG to be used by all events running on this thread
            static thread_local RandomNumberGenerator random_engine;
@@ -827,14 +828,15 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {

                // Update execution time
                auto end = std::chrono::steady_clock::now();
                std::lock_guard<std::mutex> stat_lock{event->stats_mutex_};

                auto duration = static_cast<std::chrono::duration<long double>>(end - start).count();
                event_time += duration;
                auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
                // Note: we do not need to lock a mutex because the std::map is not altered and its values are atomic.
                this->module_execution_time_[module.get()] += duration;

                if(plot) {
                    this->module_event_time_[module.get()]->Fill(static_cast<double>(duration));
                    std::lock_guard<std::mutex> stat_lock{event->stats_mutex_};
                    event_time += duration;
                    this->module_event_time_[module.get()]->Fill(
                        std::chrono::duration<double>(std::chrono::nanoseconds(duration)).count());
                }

                if(abort) {
@@ -869,7 +871,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
            auto buffered_events = thread_pool_->bufferedQueueSize();
            if(plot) {
                this->buffer_fill_level_->Fill(static_cast<double>(buffered_events));
                event_time_->Fill(static_cast<double>(event_time));
                event_time_->Fill(static_cast<double>(event_time) * 1e-9);
            }

            finished_events++;
@@ -901,14 +903,14 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
    }

    auto end_time = std::chrono::steady_clock::now();
    total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count();
    run_time_ = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());

    LOG(TRACE) << "Destroying thread pool";
    thread_pool_.reset();
}

static std::string seconds_to_time(long double seconds) {
    auto duration = std::chrono::duration<long long>(static_cast<long long>(std::round(seconds)));
static std::string nanoseconds_to_time(uint64_t nanoseconds) {
    auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(nanoseconds));

    std::string time_str;
    auto hours = std::chrono::duration_cast<std::chrono::hours>(duration);
@@ -955,7 +957,7 @@ void ModuleManager::finalize() {
        set_module_after(old_settings);
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module.get()] += static_cast<std::chrono::duration<long double>>(end - start).count();
        module_execution_time_[module.get()] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
    }

    // Store performance plots
@@ -992,7 +994,9 @@ void ModuleManager::finalize() {
    modules_file_->Close();
    LOG_PROGRESS(STATUS, "FINALIZE_LOOP") << "Finalization completed";
    auto end_time = std::chrono::steady_clock::now();
    total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count();
    finalize_time_ =
        static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
    auto total_time = initialize_time_ + run_time_ + finalize_time_;

    // Check for unused configuration keys:
    auto unused_keys = global_config.getUnusedKeys();
@@ -1023,35 +1027,33 @@ void ModuleManager::finalize() {
    }

    // Find the slowest module, and accumulate the total run-time for all modules
    long double slowest_time = 0, total_module_time = 0;
    int64_t slowest_time = 0, total_module_time = 0;
    std::string slowest_module;
    for(auto& module_time : module_execution_time_) {
        total_module_time += module_time.second;
        if(module_time.second > slowest_time) {
            slowest_time = module_time.second;
            slowest_module = module_time.first->getUniqueName();
    for(auto& module_exec_time : module_execution_time_) {
        total_module_time += module_exec_time.second;
        if(module_exec_time.second > slowest_time) {
            slowest_time = module_exec_time.second;
            slowest_module = module_exec_time.first->getUniqueName();
        }
    }
    LOG(STATUS) << "Executed " << modules_.size() << " instantiations in " << seconds_to_time(total_time_) << ", spending "
                << std::round((100 * slowest_time) / std::max(1.0l, total_module_time))
    LOG(STATUS) << "Executed " << modules_.size() << " instantiations in " << nanoseconds_to_time(total_time)
                << ", spending " << std::round((100 * slowest_time) / std::max(int64_t(1), total_module_time))
                << "% of time in slowest instantiation " << slowest_module;
    for(auto& module : modules_) {
        LOG(INFO) << " Module " << module->getUniqueName() << " took " << module_execution_time_[module.get()] << " seconds";
    }

    long double processing_time = 0;
    auto total_events = global_config.get<uint64_t>("number_of_events");
    if(total_events > 0) {
        processing_time = std::round((1000 * total_time_) / total_events);
        LOG(INFO) << " Module " << module->getUniqueName() << " took "
                  << Units::display(module_execution_time_[module.get()].load(), {"s", "ms"});
    }

    LOG(STATUS) << "Average processing time is \x1B[1m" << processing_time << " ms/event\x1B[0m, event generation at \x1B[1m"
                << std::round(global_config.get<double>("number_of_events") / total_time_) << " Hz\x1B[0m";
    auto processing_time = std::round(run_time_ / std::max(uint64_t(1), global_config.get<uint64_t>("number_of_events")));
    LOG(STATUS) << "Average processing time is \x1B[1m" << Units::display(processing_time, {"ms", "us"})
                << "/event\x1B[0m, event generation at \x1B[1m"
                << std::round(global_config.get<double>("number_of_events") / Units::convert(run_time_, "s"))
                << " Hz\x1B[0m";

    if(global_config.get<unsigned int>("workers") > 0) {
        auto event_processing_time = std::round(processing_time * global_config.get<unsigned int>("workers"));
        LOG(STATUS) << "This corresponds to a processing time of \x1B[1m" << event_processing_time
                    << " ms/event\x1B[0m per worker";
        LOG(STATUS) << "This corresponds to a processing time of \x1B[1m"
                    << Units::display(event_processing_time, {"ms", "us"}) << "/event\x1B[0m per worker";
    }
}

+4 −2
Original line number Diff line number Diff line
@@ -158,12 +158,14 @@ namespace allpix {

        std::unique_ptr<TFile> modules_file_;

        std::map<Module*, long double> module_execution_time_;
        // Duration in ns
        std::map<Module*, std::atomic_int64_t> module_execution_time_;
        std::map<Module*, Histogram<TH1D>> module_event_time_;
        Histogram<TH1D> event_time_;
        Histogram<TH1D> buffer_fill_level_;

        long double total_time_{};
        // Durations in ns
        uint64_t initialize_time_{}, run_time_{}, finalize_time_{};

        std::map<std::string, void*> loaded_libraries_;

+1 −0
Original line number Diff line number Diff line
@@ -127,6 +127,7 @@ namespace allpix {
            uint64_t current_id_{0};
            using PQValue = std::pair<uint64_t, T>;
            std::priority_queue<PQValue, std::vector<PQValue>, std::greater<>> priority_queue_;
            std::atomic_size_t priority_queue_size_{0};
            std::condition_variable push_condition_;
            std::condition_variable pop_condition_;
            const size_t max_standard_size_;
+4 −4
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ namespace allpix {
            // Priority queue is missing a pop returning a non-const reference, so need to apply a const_cast
            out = std::move(const_cast<PQValue&>(priority_queue_.top())).second; // NOLINT
            priority_queue_.pop();
            priority_queue_size_--;
        } else { // pop_standard
            out = std::move(queue_.front());
            queue_.pop();
@@ -111,6 +112,7 @@ namespace allpix {

        // Push a new element to the queue and notify possible consumer
        priority_queue_.emplace(n, std::move(value));
        priority_queue_size_++;
        lock.unlock();
        pop_condition_.notify_one();
        return true;
@@ -153,10 +155,7 @@ namespace allpix {
        return queue_.size() + priority_queue_.size();
    }

    template <typename T> size_t ThreadPool::SafeQueue<T>::prioritySize() const {
        std::lock_guard<std::mutex> lock{mutex_};
        return priority_queue_.size();
    }
    template <typename T> size_t ThreadPool::SafeQueue<T>::prioritySize() const { return priority_queue_size_; }

    /*
     * Used to ensure no conditions are being waited for in pop when a thread or the application is trying to exit. The queue
@@ -165,6 +164,7 @@ namespace allpix {
    template <typename T> void ThreadPool::SafeQueue<T>::invalidate() {
        std::unique_lock<std::mutex> lock{mutex_};
        std::priority_queue<PQValue, std::vector<PQValue>, std::greater<>>().swap(priority_queue_);
        priority_queue_size_ = 0;
        std::queue<T>().swap(queue_);
        valid_ = false;
        lock.unlock();