Newer
Older
//----------------------------------------------------------------------
// Includes
//----------------------------------------------------------------------
#include "MantidKernel/ThreadPool.h"
#include "MantidKernel/ConfigService.h"
#include <sstream>
#include <Poco/Environment.h>
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
namespace Mantid {
namespace Kernel {
//--------------------------------------------------------------------------------
/** Constructor
*
* @param scheduler :: an instance of a ThreadScheduler to schedule tasks.
* NOTE: The ThreadPool destructor will delete this ThreadScheduler.
* @param numThreads :: number of cores to use; default = 0, meaning auto-detect
*all
* available physical cores.
* @param prog :: optional pointer to a Progress reporter object. If passed,
*then
* automatic progress reporting will be handled by the thread pool.
* NOTE: The ThreadPool destructor will delete this.
*/
ThreadPool::ThreadPool(ThreadScheduler *scheduler, size_t numThreads,
ProgressBase *prog)
: m_scheduler(scheduler), m_started(false), m_prog(prog) {
if (!m_scheduler)
throw std::invalid_argument(
"NULL ThreadScheduler passed to ThreadPool constructor.");
if (numThreads == 0) {
// Uses Poco to find how many cores there are.
m_numThreads = getNumPhysicalCores();
} else
m_numThreads = numThreads;
// std::cout << m_numThreads << " m_numThreads \n";
}
//--------------------------------------------------------------------------------
/** Destructor. Deletes the ThreadScheduler.
*/
ThreadPool::~ThreadPool() {
if (m_scheduler)
delete m_scheduler;
if (m_prog)
delete m_prog;
}
//--------------------------------------------------------------------------------
/** Return the number of physical cores available on the system.
* NOTE: Uses Poco::Environment::processorCount() to find the number.
* @return how many cores are present.
*/
size_t ThreadPool::getNumPhysicalCores() {
int maxCores(0);
int retVal = Kernel::ConfigService::Instance().getValue(
"MultiThreaded.MaxCores", maxCores);
if (retVal > 0 && maxCores > 0)
return maxCores;
else
return Poco::Environment::processorCount();
}
//--------------------------------------------------------------------------------
/** Start the threads and begin looking for tasks.
*
* @param waitSec :: how many seconds will each thread be allowed to wait (with
*no tasks scheduled to it)
* before exiting. Default 0.0 (exit right away).
* This allows you to start a ThreadPool before you start adding tasks.
* You still need to call joinAll() after you've finished!
*
* @throw runtime_error if called when it has already started.
*/
void ThreadPool::start(double waitSec) {
if (m_started)
throw std::runtime_error("Threads have already started.");
// Delete any old threads (they should NOT be running!)
for (size_t i = 0; i < m_threads.size(); ++i)
delete m_threads[i];
for (size_t i = 0; i < m_runnables.size(); ++i)
delete m_runnables[i];
// Now, launch that many threads and let them wait for new tasks.
m_threads.clear();
m_runnables.clear();
for (size_t i = 0; i < m_numThreads; i++) {
// Make a descriptive name
std::ostringstream name;
name << "Thread" << i;
// Create the thread
Poco::Thread *thread = new Poco::Thread(name.str());
m_threads.push_back(thread);
// Make the runnable object and run it
ThreadPoolRunnable *runnable =
new ThreadPoolRunnable(i, m_scheduler, m_prog, waitSec);
m_runnables.push_back(runnable);
thread->start(*runnable);
// Yep, all the threads are running.
m_started = true;
}
//--------------------------------------------------------------------------------
/** Schedule a task for later execution. If the threadpool is running,
* it will be picked up by the next available thread.
*
* @param task :: pointer to a Task object to run.
* @param start :: start the thread at the same time; default false
*/
void ThreadPool::schedule(Task *task, bool start) {
if (task) {
m_scheduler->push(task);
// Start all the threads if they were not already.
if (start && !m_started)
this->start();
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
}
//--------------------------------------------------------------------------------
/** Method to perform sorting of task lists.
* This prioritizes long tasks, so they end up at start of the list.
*
* @param lhs :: Task*
* @param rhs :: Task*
* @return true if lhs < rhs (aka lhs should be first)
*/
bool compareTasks(Task *lhs, Task *rhs) { return (lhs->cost() > rhs->cost()); }
//--------------------------------------------------------------------------------
/** Wait for all threads that have started to finish. If the
* threads had not started, start them first.
*
* @throw std::runtime_error rethrown if one of the tasks threw something. E.g.
* CancelException in the case of aborting an algorithm. Any exception
* gets downgraded to runtime_error.
*/
void ThreadPool::joinAll() {
// Are the threads REALLY started, or did they exit due to lack of tasks?
if (m_started) {
m_started = false;
// If any of the threads are running, then YES, it is really started.
for (size_t i = 0; i < m_threads.size(); ++i)
m_started = m_started || m_threads[i]->isRunning();
// Start all the threads if they were not already.
if (!m_started)
this->start();
// Clear any wait times so that the threads stop waiting for new tasks.
for (size_t i = 0; i < m_runnables.size(); i++)
m_runnables[i]->clearWait();
// Sequentially join all the threads.
for (size_t i = 0; i < m_threads.size(); i++) {
m_threads[i]->join();
delete m_threads[i];
// Clear the vectors (the threads are deleted now).
m_threads.clear();
// Get rid of the runnables too
for (size_t i = 0; i < m_runnables.size(); i++)
delete m_runnables[i];
m_runnables.clear();
Janik Zikovsky
committed
// This will make threads restart
m_started = false;
// Did one of the threads abort or throw an exception?
if (m_scheduler->getAborted()) {
// Re-raise the error
throw m_scheduler->getAbortException();
} // namespace Kernel
} // namespace Mantid