Commit d0bd9aad authored by Simon Spannagel's avatar Simon Spannagel
Browse files

ModuleManager: move initialization logic of threading to init()

parent ada13cc1
Loading
Loading
Loading
Loading
+54 −54
Original line number Diff line number Diff line
@@ -540,7 +540,6 @@ void ModuleManager::initialize() {

    Configuration& global_config = conf_manager_->getGlobalConfiguration();
    LOG(TRACE) << "Register number of workers for possible multithreading";
    unsigned int threads_num = 0;
    if(multithreading_flag_ && can_parallelize_) {
        // Try to fetch a suitable number of workers if multithreading is enabled
        auto available_hardware_concurrency = std::thread::hardware_concurrency();
@@ -548,19 +547,52 @@ void ModuleManager::initialize() {
            // Try to be graceful and leave one core out if the number of workers was not specified
            available_hardware_concurrency -= 1u;
        }
        threads_num = global_config.get<unsigned int>("workers", std::max(available_hardware_concurrency, 1u));
        if(threads_num < 2) {
        number_of_threads_ = global_config.get<unsigned int>("workers", std::max(available_hardware_concurrency, 1u));
        if(number_of_threads_ < 2) {
            throw InvalidValueError(global_config, "workers", "number of workers should be larger than one");
        }

        if(threads_num > std::thread::hardware_concurrency()) {
            LOG(WARNING) << "Using more workers (" << threads_num << ") than supported concurrent threads on this system ("
                         << std::thread::hardware_concurrency() << ") may impact simulation performance";
        if(number_of_threads_ > std::thread::hardware_concurrency()) {
            LOG(WARNING) << "Using more workers (" << number_of_threads_
                         << ") than supported concurrent threads on this system (" << std::thread::hardware_concurrency()
                         << ") may impact simulation performance";
        }

        LOG(STATUS) << "Multithreading enabled, processing events in parallel on " << number_of_threads_
                    << " worker threads";

        // Adjust the modules buffer size according to the number of threads used
        max_buffer_size_ = global_config.get<size_t>("buffer_per_worker", 256) * number_of_threads_;
        if(max_buffer_size_ < number_of_threads_) {
            throw InvalidValueError(global_config, "buffer_per_worker", "buffer per worker should be larger than one");
        }
        LOG(STATUS) << "Allocating a total of " << max_buffer_size_ << " event slots for buffered modules";
    } else {
        // Issue a warning in case MT was requested but we can't actually run in MT
        if(multithreading_flag_ && !can_parallelize_) {
            global_config.set<bool>("multithreading", false);
            LOG(ERROR) << "Multithreading disabled since the current module configuration does not support it";
        } else {
            LOG(STATUS) << "Multithreading disabled";
        }
    }

    // Store final number of threads to the config for later reference
    global_config.set<size_t>("workers", number_of_threads_, true);

    // Initialize the thread pool with the number of threads
    if(number_of_threads_ > 0) {
        ThreadPool::registerThreadCount(number_of_threads_);
    }
    global_config.set<size_t>("workers", threads_num);
    if(threads_num > 0) {
        ThreadPool::registerThreadCount(threads_num);

    // Book global performance histograms
    if(global_config.get<bool>("performance_plots")) {
        buffer_fill_level_ = CreateHistogram<TH1D>("buffer_fill_level",
                                                   "Buffer fill level;# buffered events;# events",
                                                   static_cast<int>(max_buffer_size_),
                                                   0,
                                                   static_cast<double>(max_buffer_size_));
        event_time_ = CreateHistogram<TH1D>("event_time", "processing time per event;time [s];# events", 1000, 0, 10);
    }

    auto start_time = std::chrono::steady_clock::now();
@@ -611,6 +643,15 @@ void ModuleManager::initialize() {
        // Update execution time
        auto end = std::chrono::steady_clock::now();
        module_execution_time_[module.get()] += static_cast<std::chrono::duration<long double>>(end - start).count();

        // Book per-module performance plots
        if(global_config.get<bool>("performance_plots")) {
            auto identifier = module->get_identifier().getIdentifier();
            auto name = (identifier.empty() ? module->get_configuration().getName() : identifier);
            auto title = module->get_configuration().getName() + " event processing time " +
                         (!identifier.empty() ? "for " + identifier : "") + ";time [s];# events";
            module_event_time_.emplace(module.get(), CreateHistogram<TH1D>(name.c_str(), title.c_str(), 1000, 0, 1));
        }
    }
    LOG_PROGRESS(STATUS, "INIT_LOOP") << "Initialized " << modules_.size() << " module instantiations";
    auto end_time = std::chrono::steady_clock::now();
@@ -626,49 +667,8 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
    Configuration& global_config = conf_manager_->getGlobalConfiguration();
    auto plot = global_config.get<bool>("performance_plots");

    // Default to no additional thread without multithreading
    auto threads_num = global_config.get<unsigned int>("workers");
    size_t max_buffer_size = 1;

    // See if we can run in parallel with how many workers
    if(multithreading_flag_ && can_parallelize_) {
        LOG(STATUS) << "Multithreading enabled, processing events in parallel on " << threads_num << " worker threads";

        // Adjust the modules buffer size according to the number of threads used
        max_buffer_size = global_config.get<size_t>("buffer_per_worker", 256) * threads_num;
        if(max_buffer_size < threads_num) {
            throw InvalidValueError(global_config, "buffer_per_worker", "buffer per worker should be larger than one");
        }
        LOG(STATUS) << "Allocating a total of " << max_buffer_size << " event slots for buffered modules";
    } else {
        // Issue a warning in case MT was requested but we can't actually run in MT
        if(multithreading_flag_ && !can_parallelize_) {
            global_config.set<bool>("multithreading", false);
            LOG(ERROR) << "Multithreading disabled since the current module configuration does not support it";
        } else {
            LOG(STATUS) << "Multithreading disabled";
        }
    }

    // Book performance histograms
    if(global_config.get<bool>("performance_plots")) {
        buffer_fill_level_ = CreateHistogram<TH1D>("buffer_fill_level",
                                                   "Buffer fill level;# buffered events;# events",
                                                   static_cast<int>(max_buffer_size),
                                                   0,
                                                   static_cast<double>(max_buffer_size));
        event_time_ = CreateHistogram<TH1D>("event_time", "processing time per event;time [s];# events", 1000, 0, 10);
        for(auto& module : modules_) {
            auto identifier = module->get_identifier().getIdentifier();
            auto name = (identifier.empty() ? module->get_configuration().getName() : identifier);
            auto title = module->get_configuration().getName() + " event processing time " +
                         (!identifier.empty() ? "for " + identifier : "") + ";time [s];# events";
            module_event_time_.emplace(module.get(), CreateHistogram<TH1D>(name.c_str(), title.c_str(), 1000, 0, 1));
        }
    }

    // Creates the thread pool
    LOG(TRACE) << "Initializing thread pool with " << threads_num << " threads";
    LOG(TRACE) << "Initializing thread pool with " << number_of_threads_ << " threads";
    auto initialize_function =
        [log_level = Log::getReportingLevel(), log_format = Log::getFormat(), modules_list = modules_]() {
            // Initialize the threads to the same log level and format as the master setting
@@ -705,9 +705,9 @@ void ModuleManager::run(RandomNumberGenerator& seeder) {
    };

    // Push 128 events for each worker to maintain enough work
    auto max_queue_size = threads_num * 128;
    std::unique_ptr<ThreadPool> thread_pool =
        std::make_unique<ThreadPool>(threads_num, max_queue_size, max_buffer_size, initialize_function, finalize_function);
    auto max_queue_size = number_of_threads_ * 128;
    std::unique_ptr<ThreadPool> thread_pool = std::make_unique<ThreadPool>(
        number_of_threads_, max_queue_size, max_buffer_size_, initialize_function, finalize_function);

    // Record the run stage total time
    auto start_time = std::chrono::steady_clock::now();
+3 −1
Original line number Diff line number Diff line
@@ -171,8 +171,10 @@ namespace allpix {

        Messenger* messenger_{};

        // User defined multithreading flag in configuration
        // User defined multithreading flags and parameters from configuration
        bool multithreading_flag_{false};
        unsigned int number_of_threads_{0};
        size_t max_buffer_size_{1};

        // Possibility of running loaded modules in parallel
        bool can_parallelize_{true};