Loading src/core/module/ModuleManager.cpp +26 −22 Original line number Diff line number Diff line Loading @@ -702,7 +702,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { // Push 128 events for each worker to maintain enough work auto max_queue_size = threads_num * 128; std::unique_ptr<ThreadPool> thread_pool = thread_pool_ = std::make_unique<ThreadPool>(threads_num, max_queue_size, max_buffer_size, initialize_function, finalize_function); // Record the run stage total time Loading @@ -720,15 +720,15 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { // Mark the first N events as completed for the thread pool. Since events start at one, always mark zero identifier as // completed for(size_t n = 0; n <= skip_events; n++) { thread_pool->markComplete(n); thread_pool_->markComplete(n); } LOG(STATUS) << "Starting event loop"; for(uint64_t i = 1 + skip_events; i <= number_of_events + skip_events; i++) { // Check if run was aborted and stop pushing extra events to the threadpool if(terminate_) { LOG(INFO) << "Interrupting event loop after " << i << " events because of request to terminate"; thread_pool->destroy(); LOG(INFO) << "Interrupting event loop after " << finished_events << " events because of request to terminate"; thread_pool_->destroy(); global_config.set<uint64_t>("number_of_events", finished_events); break; } Loading @@ -738,8 +738,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wstrict-overflow" auto event_function_with_module = [this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events, &thread_pool]( auto event_function_with_module = [this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events]( std::shared_ptr<Event> event, ModuleList::iterator module_iter, long double event_time, Loading Loading @@ -782,7 +781,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { // Run module bool stop = false; try { if(module->require_sequence() && event_num != thread_pool->minimumUncompleted()) { if(module->require_sequence() && event_num != thread_pool_->minimumUncompleted()) { stop = true; } else { module->run(event.get()); Loading Loading @@ -817,9 +816,9 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { event->store_random_engine_state(); // Reschedule the event: auto event_function = std::bind(self_func, event, module_iter, event_time, self_func); auto future = thread_pool->submit(event->number, event_function, false); assert(future.valid() || !thread_pool->valid()); auto buffered_events = thread_pool->bufferedQueueSize(); auto future = thread_pool_->submit(event->number, event_function, false); assert(future.valid() || !thread_pool_->valid()); auto buffered_events = thread_pool_->bufferedQueueSize(); LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Buffered " << buffered_events << ", finished " << finished_events << " of " << number_of_events << " events"; return; Loading @@ -830,9 +829,9 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { #pragma GCC diagnostic pop // All modules finished, mark as complete thread_pool->markComplete(event->number); thread_pool_->markComplete(event->number); auto buffered_events = thread_pool->bufferedQueueSize(); auto buffered_events = thread_pool_->bufferedQueueSize(); if(plot) { this->buffer_fill_level_->Fill(static_cast<double>(buffered_events)); event_time_->Fill(static_cast<double>(event_time)); Loading @@ -846,24 +845,27 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { auto event_function = std::bind(event_function_with_module, nullptr, modules_.begin(), 0, event_function_with_module); auto future = thread_pool->submit(event_function); assert(future.valid() || !thread_pool->valid()); thread_pool->checkException(); auto future = thread_pool_->submit(event_function); assert(future.valid() || !thread_pool_->valid()); thread_pool_->checkException(); } LOG(TRACE) << "All events have been initialized. Waiting for thread pool to finish..."; // Wait for workers to finish thread_pool->wait(); thread_pool_->wait(); // Check exception for last events thread_pool->checkException(); thread_pool_->checkException(); LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Finished run of " << finished_events << " events"; global_config.set<uint64_t>("number_of_events", finished_events); auto end_time = std::chrono::steady_clock::now(); total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count(); LOG(TRACE) << "Destroying thread pool"; thread_pool_.reset(); } static std::string seconds_to_time(long double seconds) { Loading Loading @@ -1018,5 +1020,7 @@ void ModuleManager::finalize() { * All modules in the event loop continue to finish the current event */ void ModuleManager::terminate() { terminate_ = true; if(!terminate_.exchange(true) && thread_pool_) { thread_pool_->destroy(); } } src/core/module/ModuleManager.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -170,6 +170,9 @@ namespace allpix { Messenger* messenger_{}; // The thread pool used in the run method std::unique_ptr<ThreadPool> thread_pool_{nullptr}; // User defined multithreading flag in configuration bool multithreading_flag_{false}; Loading Loading
src/core/module/ModuleManager.cpp +26 −22 Original line number Diff line number Diff line Loading @@ -702,7 +702,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { // Push 128 events for each worker to maintain enough work auto max_queue_size = threads_num * 128; std::unique_ptr<ThreadPool> thread_pool = thread_pool_ = std::make_unique<ThreadPool>(threads_num, max_queue_size, max_buffer_size, initialize_function, finalize_function); // Record the run stage total time Loading @@ -720,15 +720,15 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { // Mark the first N events as completed for the thread pool. Since events start at one, always mark zero identifier as // completed for(size_t n = 0; n <= skip_events; n++) { thread_pool->markComplete(n); thread_pool_->markComplete(n); } LOG(STATUS) << "Starting event loop"; for(uint64_t i = 1 + skip_events; i <= number_of_events + skip_events; i++) { // Check if run was aborted and stop pushing extra events to the threadpool if(terminate_) { LOG(INFO) << "Interrupting event loop after " << i << " events because of request to terminate"; thread_pool->destroy(); LOG(INFO) << "Interrupting event loop after " << finished_events << " events because of request to terminate"; thread_pool_->destroy(); global_config.set<uint64_t>("number_of_events", finished_events); break; } Loading @@ -738,8 +738,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wstrict-overflow" auto event_function_with_module = [this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events, &thread_pool]( auto event_function_with_module = [this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events]( std::shared_ptr<Event> event, ModuleList::iterator module_iter, long double event_time, Loading Loading @@ -782,7 +781,7 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { // Run module bool stop = false; try { if(module->require_sequence() && event_num != thread_pool->minimumUncompleted()) { if(module->require_sequence() && event_num != thread_pool_->minimumUncompleted()) { stop = true; } else { module->run(event.get()); Loading Loading @@ -817,9 +816,9 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { event->store_random_engine_state(); // Reschedule the event: auto event_function = std::bind(self_func, event, module_iter, event_time, self_func); auto future = thread_pool->submit(event->number, event_function, false); assert(future.valid() || !thread_pool->valid()); auto buffered_events = thread_pool->bufferedQueueSize(); auto future = thread_pool_->submit(event->number, event_function, false); assert(future.valid() || !thread_pool_->valid()); auto buffered_events = thread_pool_->bufferedQueueSize(); LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Buffered " << buffered_events << ", finished " << finished_events << " of " << number_of_events << " events"; return; Loading @@ -830,9 +829,9 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { #pragma GCC diagnostic pop // All modules finished, mark as complete thread_pool->markComplete(event->number); thread_pool_->markComplete(event->number); auto buffered_events = thread_pool->bufferedQueueSize(); auto buffered_events = thread_pool_->bufferedQueueSize(); if(plot) { this->buffer_fill_level_->Fill(static_cast<double>(buffered_events)); event_time_->Fill(static_cast<double>(event_time)); Loading @@ -846,24 +845,27 @@ void ModuleManager::run(RandomNumberGenerator& seeder) { auto event_function = std::bind(event_function_with_module, nullptr, modules_.begin(), 0, event_function_with_module); auto future = thread_pool->submit(event_function); assert(future.valid() || !thread_pool->valid()); thread_pool->checkException(); auto future = thread_pool_->submit(event_function); assert(future.valid() || !thread_pool_->valid()); thread_pool_->checkException(); } LOG(TRACE) << "All events have been initialized. Waiting for thread pool to finish..."; // Wait for workers to finish thread_pool->wait(); thread_pool_->wait(); // Check exception for last events thread_pool->checkException(); thread_pool_->checkException(); LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Finished run of " << finished_events << " events"; global_config.set<uint64_t>("number_of_events", finished_events); auto end_time = std::chrono::steady_clock::now(); total_time_ += static_cast<std::chrono::duration<long double>>(end_time - start_time).count(); LOG(TRACE) << "Destroying thread pool"; thread_pool_.reset(); } static std::string seconds_to_time(long double seconds) { Loading Loading @@ -1018,5 +1020,7 @@ void ModuleManager::finalize() { * All modules in the event loop continue to finish the current event */ void ModuleManager::terminate() { terminate_ = true; if(!terminate_.exchange(true) && thread_pool_) { thread_pool_->destroy(); } }
src/core/module/ModuleManager.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -170,6 +170,9 @@ namespace allpix { Messenger* messenger_{}; // The thread pool used in the run method std::unique_ptr<ThreadPool> thread_pool_{nullptr}; // User defined multithreading flag in configuration bool multithreading_flag_{false}; Loading