Newer
Older
#ifndef THREADPOOLTEST_H_
#define THREADPOOLTEST_H_
#include <cxxtest/TestSuite.h>
#include <MantidKernel/Timer.h>
#include <MantidKernel/FunctionTask.h>
#include "MantidKernel/MultiThreaded.h"
#include <MantidKernel/ThreadPool.h>
#include "MantidKernel/ThreadScheduler.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iomanip>
Janik Zikovsky
committed
#include <cstdlib>
using namespace Mantid::Kernel;
//#include <boost/thread.hpp>
Janik Zikovsky
committed
//=======================================================================================
class TimeWaster
{
public:
static size_t waste_time(double seconds)
{
// Waste time, but use up the CPU!
std::size_t num = 0;
Mantid::Kernel::Timer time;
while (time.elapsed_no_reset() < seconds)
{
double x = 1.1;
for (int j=0; j < 100000; j++)
{
x = x * x;
x = x + x;
x = x / 1.1;
}
num += 1;
}
return num;
}
void waste_time_with_lock(double seconds)
{
{
Mutex::ScopedLock lock(m_mutex);
std::cout << "waste_time for " << seconds << " seconds." << std::endl;
}
waste_time(seconds);
}
/** Add a number but use a lock to avoid contention */
void add_to_number(size_t adding)
{
{
Mutex::ScopedLock lock(m_mutex);
total += adding;
}
}
private:
Mutex m_mutex;
public:
size_t total;
};
Janik Zikovsky
committed
//=======================================================================================
int threadpooltest_check = 0;
void threadpooltest_function()
{
threadpooltest_check = 12;
}
std::vector<int> threadpooltest_vec;
void threadpooltest_adding_stuff(int val)
//TODO: Mutex
threadpooltest_vec.push_back(val);
}
Janik Zikovsky
committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Counter for the test.
size_t TaskThatAddsTasks_counter;
Mutex TaskThatAddsTasks_mutex;
//=======================================================================================
/** Class that adds tasks to its scheduler */
class TaskThatAddsTasks : public Task
{
public:
// ctor
TaskThatAddsTasks(ThreadScheduler * scheduler, size_t depth)
: m_scheduler(scheduler), depth(depth)
{
// Use a randomized cost function; this will have an effect on the sorted schedulers.
m_cost = rand();
}
// Run the task
void run()
{
if (depth < 4)
{
// Add ten tasks (one level deeper)
for (size_t i=0; i<10; i++)
{
m_scheduler->push(new TaskThatAddsTasks(m_scheduler, depth+1));
}
}
else
{
// Lock to ensure you don't step on yourself.
Mutex::ScopedLock lock(TaskThatAddsTasks_mutex);
// Increment the counter only at the lowest level.
TaskThatAddsTasks_counter += 1;
}
}
private:
ThreadScheduler * m_scheduler;
size_t depth;
};
int ThreadPoolTest_TaskThatThrows_counter = 0;
Janik Zikovsky
committed
Janik Zikovsky
committed
//=======================================================================================
class ThreadPoolTest : public CxxTest::TestSuite
{
public:
/** Test that shows that OPENMP does not use a thread pool idea to optimally allocate threads
* (unless you use schedule(dynamic) )! */
void xtestOpenMP()
{
Timer overall;
int num = 16;
//PARALLEL_FOR_NO_WSP_CHECK()
//#pragma omp parallel for schedule(dynamic)
for (int i=0; i<num; i++)
{
double delay = num-i;
PARALLEL_CRITICAL(test1)
std::cout << std::setw(5) << i << ": Thread " << PARALLEL_THREAD_NUMBER << " will delay for " << delay << " seconds." << std::endl;
TimeWaster::waste_time(delay);
PARALLEL_CRITICAL(test1)
std::cout << std::setw(5) << i << ": is done." << std::endl;
}
std::cout << overall.elapsed() << " secs total.\n";
}
Janik Zikovsky
committed
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/** Make it waste time, 0 to 16 seconds
* DISABLED because it is (intentionally) slow. */
void xtest_Scheduler_LargestCostFirst_wastetime()
{
ThreadPool p(new ThreadSchedulerFIFO(), 0);
threadpooltest_vec.clear();
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
TimeWaster mywaster;
for (int i=0; i< 16; i++)
{
double cost = i; // time is exactly i
// Bind to a member function of mywaster
p.schedule( new FunctionTask( boost::bind(&TimeWaster::waste_time_with_lock, &mywaster, i), cost ) );
}
Timer overall;
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
std::cout << overall.elapsed() << " secs total." << std::endl;
}
/** Speed comparison of test
* DISABLED: because it is not necessary
*/
void xtest_compare()
{
size_t total=0;
ThreadScheduler * sched = new ThreadSchedulerLargestCost();
for (size_t i=0; i<100000; i++)
{
total += 1;
sched->push(new FunctionTask( boost::bind(TimeWaster::waste_time, i*1.0), i*1.0 ));
}
size_t other = total;
//std::cout << total << std::endl;
}
//
// void xtest_Boost_single_threads()
// {
// Mantid::Kernel::Timer overall;
// double time;
// size_t num = 10000;
//
// for (size_t i=0; i < num; i++)
// {
// DoNothingBoost myDoNothing;
// boost::thread workerThread(myDoNothing);
// workerThread.join();
// }
// time = overall.elapsed();
// std::cout << "Boost: " <<std::setw(15) << time << " secs total = " << std::setw(15) << (num*1.0/time) << " per second" << std::endl;
// }
void test_Constructor()
{
}
void test_schedule()
{
ThreadPool p;
TS_ASSERT_EQUALS( threadpooltest_check, 0);
p.schedule( new FunctionTask( threadpooltest_function ) );
TS_ASSERT_EQUALS( threadpooltest_check, 0);
TS_ASSERT_EQUALS( threadpooltest_check, 12);
}
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
void test_Scheduler_FIFO()
{
// Only use one core, it'll make things simpler
ThreadPool p(new ThreadSchedulerFIFO(), 1);
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( boost::bind(threadpooltest_adding_stuff, i), cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 10);
// The first ones added are the first ones run.
TS_ASSERT_EQUALS( threadpooltest_vec[0], 0);
TS_ASSERT_EQUALS( threadpooltest_vec[1], 1);
TS_ASSERT_EQUALS( threadpooltest_vec[2], 2);
}
void test_Scheduler_LIFO()
{
ThreadPool p(new ThreadSchedulerLIFO(), 1);
threadpooltest_vec.clear();
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( boost::bind(threadpooltest_adding_stuff, i), cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 10);
// The last ones added are the first ones run.
TS_ASSERT_EQUALS( threadpooltest_vec[0], 9);
TS_ASSERT_EQUALS( threadpooltest_vec[1], 8);
TS_ASSERT_EQUALS( threadpooltest_vec[2], 7);
}
void test_Scheduler_LargestCostFirst()
{
// Only use one core, it'll make things simpler
ThreadPool p(new ThreadSchedulerLargestCost(), 1);
threadpooltest_vec.clear();
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( boost::bind(threadpooltest_adding_stuff, i), cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 10);
// The first ones added are the first ones run.
TS_ASSERT_EQUALS( threadpooltest_vec[0], 9);
TS_ASSERT_EQUALS( threadpooltest_vec[1], 8);
TS_ASSERT_EQUALS( threadpooltest_vec[2], 7);
}
//--------------------------------------------------------------------
/** Perform a stress test on the given scheduler.
* This runs a large number of super-short tasks; enough that the
* queue locking is tested against simultaneous access. A segfault
* results if the queue is improperly accessed.
*/
void do_StressTest_scheduler(ThreadScheduler * sched)
{
ThreadPool p(sched, 0);
TimeWaster mywaster;
Janik Zikovsky
committed
size_t num = 30000;
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
mywaster.total = 0;
for (size_t i=0; i<=num; i++)
{
p.schedule( new FunctionTask( boost::bind(&TimeWaster::add_to_number, &mywaster, i), i*1.0 ) );
}
Timer overall;
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
//std::cout << overall.elapsed() << " secs total." << std::endl;
// Expected total
size_t expected = (num * num + num) / 2;
TS_ASSERT_EQUALS( mywaster.total, expected);
}
void test_StressTest_ThreadSchedulerFIFO()
{
do_StressTest_scheduler(new ThreadSchedulerFIFO());
}
void test_StressTest_ThreadSchedulerLIFO()
{
do_StressTest_scheduler(new ThreadSchedulerLIFO());
}
void test_StressTest_ThreadSchedulerLargestCost()
{
do_StressTest_scheduler(new ThreadSchedulerLargestCost());
}
Janik Zikovsky
committed
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
//--------------------------------------------------------------------
/** Perform a stress test on the given scheduler.
* This one creates tasks that create new tasks; e.g. 10 tasks each add
* 10 tasks, and so on (up to a certain depth)
*/
void do_StressTest_TasksThatCreateTasks(ThreadScheduler * sched)
{
ThreadPool * p = new ThreadPool(sched, 0);
// Create the first task, depth 0, that will recursively create 100000
TaskThatAddsTasks * task = new TaskThatAddsTasks(sched, 0);
p->schedule( task );
//Reset the total
TaskThatAddsTasks_counter = 0;
TS_ASSERT_THROWS_NOTHING( p->joinAll() );
// Expected total = the number of lowest level entries
TS_ASSERT_EQUALS( TaskThatAddsTasks_counter, 10000);
delete p;
}
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerFIFO()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerFIFO());
}
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerLIFO()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerLIFO());
}
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerLargestCost()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerLargestCost());
}
Janik Zikovsky
committed
//=======================================================================================
/** Task that throws an exception */
class TaskThatThrows : public Task
{
void run()
{
ThreadPoolTest_TaskThatThrows_counter++;
Janik Zikovsky
committed
throw Mantid::Kernel::Exception::NotImplementedError("Test exception from TaskThatThrows.");
}
};
//--------------------------------------------------------------------
void test_TaskThatThrows()
{
ThreadPool p(new ThreadSchedulerFIFO(),1); // one core
ThreadPoolTest_TaskThatThrows_counter = 0;
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new TaskThatThrows());
}
// joinAll rethrows
TS_ASSERT_THROWS( p.joinAll() , std::runtime_error);
// And only one of the tasks actually ran (since we're on one core)
TS_ASSERT_EQUALS(ThreadPoolTest_TaskThatThrows_counter, 1);
}
Janik Zikovsky
committed