Newer
Older
#ifndef THREADPOOLTEST_H_
#define THREADPOOLTEST_H_
#include <cxxtest/TestSuite.h>
#include <MantidKernel/Timer.h>
#include <MantidKernel/FunctionTask.h>
Janik Zikovsky
committed
#include <MantidKernel/ProgressText.h>
#include "MantidKernel/MultiThreaded.h"
#include <MantidKernel/ThreadPool.h>
#include "MantidKernel/ThreadScheduler.h"
Janik Zikovsky
committed
#include "MantidKernel/ThreadSchedulerMutexes.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iomanip>
Janik Zikovsky
committed
#include <cstdlib>
using namespace Mantid::Kernel;
//#include <boost/thread.hpp>
Janik Zikovsky
committed
//=======================================================================================
class TimeWaster
{
public:
static size_t waste_time(double seconds)
{
// Waste time, but use up the CPU!
std::size_t num = 0;
Mantid::Kernel::Timer time;
while (time.elapsed_no_reset() < seconds)
{
double x = 1.1;
for (int j=0; j < 100000; j++)
{
x = x * x;
x = x + x;
x = x / 1.1;
}
num += 1;
}
return num;
}
void waste_time_with_lock(double seconds)
{
{
Mutex::ScopedLock lock(m_mutex);
std::cout << "waste_time for " << seconds << " seconds." << std::endl;
}
waste_time(seconds);
}
/** Add a number but use a lock to avoid contention */
void add_to_number(size_t adding)
{
{
Mutex::ScopedLock lock(m_mutex);
total += adding;
}
}
private:
Mutex m_mutex;
public:
size_t total;
};
Janik Zikovsky
committed
//=======================================================================================
int threadpooltest_check = 0;
void threadpooltest_function()
{
threadpooltest_check = 12;
}
std::vector<int> threadpooltest_vec;
void threadpooltest_adding_stuff(int val)
//TODO: Mutex
threadpooltest_vec.push_back(val);
}
Janik Zikovsky
committed
// Counter for the test.
size_t TaskThatAddsTasks_counter;
Mutex TaskThatAddsTasks_mutex;
//=======================================================================================
/** Class that adds tasks to its scheduler */
class TaskThatAddsTasks : public Task
{
public:
// ctor
TaskThatAddsTasks(ThreadScheduler * scheduler, size_t depth)
: Task(), m_scheduler(scheduler), depth(depth)
Janik Zikovsky
committed
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
{
// Use a randomized cost function; this will have an effect on the sorted schedulers.
m_cost = rand();
}
// Run the task
void run()
{
if (depth < 4)
{
// Add ten tasks (one level deeper)
for (size_t i=0; i<10; i++)
{
m_scheduler->push(new TaskThatAddsTasks(m_scheduler, depth+1));
}
}
else
{
// Lock to ensure you don't step on yourself.
Mutex::ScopedLock lock(TaskThatAddsTasks_mutex);
// Increment the counter only at the lowest level.
TaskThatAddsTasks_counter += 1;
}
}
private:
ThreadScheduler * m_scheduler;
size_t depth;
};
int ThreadPoolTest_TaskThatThrows_counter = 0;
Janik Zikovsky
committed
Janik Zikovsky
committed
//=======================================================================================
class ThreadPoolTest : public CxxTest::TestSuite
{
public:
/** Test that shows that OPENMP does not use a thread pool idea to optimally allocate threads
* (unless you use schedule(dynamic) )! */
void xtestOpenMP()
{
Timer overall;
int num = 16;
//PARALLEL_FOR_NO_WSP_CHECK()
//#pragma omp parallel for schedule(dynamic)
for (int i=0; i<num; i++)
{
double delay = num-i;
PARALLEL_CRITICAL(test1)
std::cout << std::setw(5) << i << ": Thread " << PARALLEL_THREAD_NUMBER << " will delay for " << delay << " seconds." << std::endl;
TimeWaster::waste_time(delay);
PARALLEL_CRITICAL(test1)
std::cout << std::setw(5) << i << ": is done." << std::endl;
}
std::cout << overall.elapsed() << " secs total.\n";
}
Janik Zikovsky
committed
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/** Make it waste time, 0 to 16 seconds
* DISABLED because it is (intentionally) slow. */
void xtest_Scheduler_LargestCostFirst_wastetime()
{
ThreadPool p(new ThreadSchedulerFIFO(), 0);
threadpooltest_vec.clear();
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
TimeWaster mywaster;
for (int i=0; i< 16; i++)
{
double cost = i; // time is exactly i
// Bind to a member function of mywaster
p.schedule( new FunctionTask( boost::bind(&TimeWaster::waste_time_with_lock, &mywaster, i), cost ) );
}
Timer overall;
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
std::cout << overall.elapsed() << " secs total." << std::endl;
}
/** Speed comparison of test
* DISABLED: because it is not necessary
*/
void xtest_compare()
{
size_t total=0;
ThreadScheduler * sched = new ThreadSchedulerLargestCost();
for (size_t i=0; i<100000; i++)
{
total += 1;
sched->push(new FunctionTask( boost::bind(TimeWaster::waste_time, i*1.0), i*1.0 ));
}
size_t other = total;
//std::cout << total << std::endl;
}
//
// void xtest_Boost_single_threads()
// {
// Mantid::Kernel::Timer overall;
// double time;
// size_t num = 10000;
//
// for (size_t i=0; i < num; i++)
// {
// DoNothingBoost myDoNothing;
// boost::thread workerThread(myDoNothing);
// workerThread.join();
// }
// time = overall.elapsed();
// std::cout << "Boost: " <<std::setw(15) << time << " secs total = " << std::setw(15) << (num*1.0/time) << " per second" << std::endl;
// }
void test_Constructor()
{
}
void test_schedule()
{
ThreadPool p;
TS_ASSERT_EQUALS( threadpooltest_check, 0);
p.schedule( new FunctionTask( threadpooltest_function ) );
TS_ASSERT_EQUALS( threadpooltest_check, 0);
TS_ASSERT_EQUALS( threadpooltest_check, 12);
}
Janik Zikovsky
committed
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
//=======================================================================================
//=======================================================================================
/** Class for debugging progress reporting */
class MyTestProgress : public ProgressBase
{
public:
MyTestProgress(double start,double end, int numSteps, ThreadPoolTest * myParent)
: ProgressBase(start,end, numSteps), parent(myParent)
{
}
void doReport(const std::string& msg = "")
{
parent->last_report_message = msg;
parent->last_report_counter = m_i;
double p = m_start + m_step*(m_i - m_ifirst);
parent->last_report_value = p;
}
public:
ThreadPoolTest * parent;
};
/// Index that was last set at doReport
int last_report_counter;
double last_report_value;
std::string last_report_message;
void test_with_progress_reporting()
{
last_report_counter = 0;
ThreadPool p(new ThreadSchedulerFIFO(), 0, new MyTestProgress(0.0, 1.0, 10, this));
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( threadpooltest_function, cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
// The test reporter was called
TS_ASSERT_EQUALS( last_report_counter, 10);
}
/// Disabled because it has std output
void xtest_with_progress_reporting2()
{
ThreadPool p(new ThreadSchedulerFIFO(), 0, new ProgressText(0.0, 1.0, 50));
for (int i=0; i< 50; i++)
{
double cost = i;
p.schedule( new FunctionTask( threadpooltest_function, cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
}
//=======================================================================================
/** We schedule a task, run the threads, but don't abort them.
* Then we re-schedule stuff, and re-join.
*/
void test_schedule_resume_tasks()
{
ThreadPool p; // Makes a default scheduler
threadpooltest_check = 0;
p.schedule( new FunctionTask( threadpooltest_function ) );
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
// Ok, the task did execute.
TS_ASSERT_EQUALS( threadpooltest_check, 12);
// Now we reset.
threadpooltest_check = 0;
p.schedule( new FunctionTask( threadpooltest_function ) );
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_check, 12);
}
void test_Scheduler_FIFO()
{
// Only use one core, it'll make things simpler
ThreadPool p(new ThreadSchedulerFIFO(), 1);
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( boost::bind(threadpooltest_adding_stuff, i), cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 10);
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
// The first ones added are the first ones run.
TS_ASSERT_EQUALS( threadpooltest_vec[0], 0);
TS_ASSERT_EQUALS( threadpooltest_vec[1], 1);
TS_ASSERT_EQUALS( threadpooltest_vec[2], 2);
}
void test_Scheduler_LIFO()
{
ThreadPool p(new ThreadSchedulerLIFO(), 1);
threadpooltest_vec.clear();
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( boost::bind(threadpooltest_adding_stuff, i), cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 10);
// The last ones added are the first ones run.
TS_ASSERT_EQUALS( threadpooltest_vec[0], 9);
TS_ASSERT_EQUALS( threadpooltest_vec[1], 8);
TS_ASSERT_EQUALS( threadpooltest_vec[2], 7);
}
void test_Scheduler_LargestCostFirst()
{
// Only use one core, it'll make things simpler
ThreadPool p(new ThreadSchedulerLargestCost(), 1);
threadpooltest_vec.clear();
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 0);
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new FunctionTask( boost::bind(threadpooltest_adding_stuff, i), cost ) );
}
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
TS_ASSERT_EQUALS( threadpooltest_vec.size(), 10);
// The first ones added are the first ones run.
TS_ASSERT_EQUALS( threadpooltest_vec[0], 9);
TS_ASSERT_EQUALS( threadpooltest_vec[1], 8);
TS_ASSERT_EQUALS( threadpooltest_vec[2], 7);
}
//--------------------------------------------------------------------
/** Perform a stress test on the given scheduler.
* This runs a large number of super-short tasks; enough that the
* queue locking is tested against simultaneous access. A segfault
* results if the queue is improperly accessed.
*/
void do_StressTest_scheduler(ThreadScheduler * sched)
{
ThreadPool p(sched, 0);
TimeWaster mywaster;
Janik Zikovsky
committed
size_t num = 30000;
Janik Zikovsky
committed
Mutex * lastMutex = NULL;
for (size_t i=0; i<=num; i++)
{
Janik Zikovsky
committed
Task * task = new FunctionTask( boost::bind(&TimeWaster::add_to_number, &mywaster, i), i*1.0 );
// Create a new mutex every 1000 tasks. This is more relevant to the ThreadSchedulerMutexes; others ignore it.
if (i % 1000 == 0)
lastMutex = new Mutex();
task->setMutex(lastMutex);
p.schedule( task );
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
}
Timer overall;
TS_ASSERT_THROWS_NOTHING( p.joinAll() );
//std::cout << overall.elapsed() << " secs total." << std::endl;
// Expected total
size_t expected = (num * num + num) / 2;
TS_ASSERT_EQUALS( mywaster.total, expected);
}
void test_StressTest_ThreadSchedulerFIFO()
{
do_StressTest_scheduler(new ThreadSchedulerFIFO());
}
void test_StressTest_ThreadSchedulerLIFO()
{
do_StressTest_scheduler(new ThreadSchedulerLIFO());
}
void test_StressTest_ThreadSchedulerLargestCost()
{
do_StressTest_scheduler(new ThreadSchedulerLargestCost());
}
Janik Zikovsky
committed
void test_StressTest_ThreadSchedulerMutexes()
{
do_StressTest_scheduler(new ThreadSchedulerMutexes());
}
Janik Zikovsky
committed
//--------------------------------------------------------------------
/** Perform a stress test on the given scheduler.
* This one creates tasks that create new tasks; e.g. 10 tasks each add
Janik Zikovsky
committed
* 10 tasks, and so on (up to a certain depth).
* So it tests against possible segfaults of one task
* accessing the queue while another thread is popping it.
Janik Zikovsky
committed
*/
void do_StressTest_TasksThatCreateTasks(ThreadScheduler * sched)
{
ThreadPool * p = new ThreadPool(sched, 0);
Janik Zikovsky
committed
// Create the first task, depth 0, that will recursively create 10000
Janik Zikovsky
committed
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
TaskThatAddsTasks * task = new TaskThatAddsTasks(sched, 0);
p->schedule( task );
//Reset the total
TaskThatAddsTasks_counter = 0;
TS_ASSERT_THROWS_NOTHING( p->joinAll() );
// Expected total = the number of lowest level entries
TS_ASSERT_EQUALS( TaskThatAddsTasks_counter, 10000);
delete p;
}
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerFIFO()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerFIFO());
}
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerLIFO()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerLIFO());
}
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerLargestCost()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerLargestCost());
}
Janik Zikovsky
committed
void test_StressTest_TasksThatCreateTasks_ThreadSchedulerMutexes()
{
do_StressTest_TasksThatCreateTasks(new ThreadSchedulerMutexes());
}
Janik Zikovsky
committed
//=======================================================================================
/** Task that throws an exception */
class TaskThatThrows : public Task
{
void run()
{
ThreadPoolTest_TaskThatThrows_counter++;
Janik Zikovsky
committed
throw Mantid::Kernel::Exception::NotImplementedError("Test exception from TaskThatThrows.");
}
};
//--------------------------------------------------------------------
void test_TaskThatThrows()
{
ThreadPool p(new ThreadSchedulerFIFO(),1); // one core
ThreadPoolTest_TaskThatThrows_counter = 0;
for (int i=0; i< 10; i++)
{
double cost = i;
p.schedule( new TaskThatThrows());
}
// joinAll rethrows
TS_ASSERT_THROWS( p.joinAll() , std::runtime_error);
// And only one of the tasks actually ran (since we're on one core)
TS_ASSERT_EQUALS(ThreadPoolTest_TaskThatThrows_counter, 1);
}
Janik Zikovsky
committed