ThreadPoolTest.h 13.7 KB
Newer Older
1
2
3
4
5
6
#ifndef THREADPOOLTEST_H_
#define THREADPOOLTEST_H_

#include <cxxtest/TestSuite.h>

#include <MantidKernel/Timer.h>
7
#include <MantidKernel/FunctionTask.h>
8
#include <MantidKernel/ProgressText.h>
9
#include <MantidKernel/ThreadPool.h>
10
#include "MantidKernel/ThreadScheduler.h"
11
#include "MantidKernel/ThreadSchedulerMutexes.h"
12

13
14
#include <Poco/Thread.h>

15
#include <boost/bind.hpp>
16
#include <boost/make_shared.hpp>
17
#include <cstdlib>
18
19
20

using namespace Mantid::Kernel;

21
//=======================================================================================
22

23
class TimeWaster {
24
public:
25
26
27
28
29
30
31
32
33
34
35
36
  static size_t waste_time(double seconds) {
    // Waste time, but use up the CPU!
    std::size_t num = 0;
    Mantid::Kernel::Timer time;
    while (time.elapsed(false) < seconds) {
      double x = 1.1;
      for (int j = 0; j < 100000; j++) {
        x = x * x;
        x = x + x;
        x = x / 1.1;
      }
      num += 1;
37
    }
38
    return num;
39
40
  }

41
42
  void waste_time_with_lock(double seconds) {
    {
43
      std::lock_guard<std::mutex> lock(m_mutex);
44
      std::cout << "waste_time for " << seconds << " seconds.\n";
45
46
    }
    waste_time(seconds);
47
48
  }

49
50
51
  /** Add a number but use a lock to avoid contention */
  void add_to_number(size_t adding) {
    {
52
      std::lock_guard<std::mutex> lock(m_mutex);
53
54
      total += adding;
    }
55
56
57
  }

private:
58
  std::mutex m_mutex;
59
60
61
62
63

public:
  size_t total;
};

64
65
//=======================================================================================

66
int threadpooltest_check = 0;
67

68
void threadpooltest_function() { threadpooltest_check = 12; }
69
70

std::vector<int> threadpooltest_vec;
71
72
void threadpooltest_adding_stuff(int val) {
  // TODO: Mutex
73
74
  threadpooltest_vec.push_back(val);
}
75

76
77
// Counter for the test.
size_t TaskThatAddsTasks_counter;
78
std::mutex TaskThatAddsTasks_mutex;
79
80
81

//=======================================================================================
/** Class that adds tasks to its scheduler */
82
class TaskThatAddsTasks : public Task {
83
84
public:
  // ctor
85
86
87
88
  TaskThatAddsTasks(ThreadScheduler *scheduler, size_t depth)
      : Task(), m_scheduler(scheduler), depth(depth) {
    // Use a randomized cost function; this will have an effect on the sorted
    // schedulers.
89
90
91
92
    m_cost = rand();
  }

  // Run the task
93
  void run() override {
94
    if (depth < 4) {
95
      // Add ten tasks (one level deeper)
96
97
      for (size_t i = 0; i < 10; i++) {
        m_scheduler->push(new TaskThatAddsTasks(m_scheduler, depth + 1));
98
      }
99
    } else {
100
      // Lock to ensure you don't step on yourself.
101
      std::lock_guard<std::mutex> lock(TaskThatAddsTasks_mutex);
102
103
104
105
106
107
      // Increment the counter only at the lowest level.
      TaskThatAddsTasks_counter += 1;
    }
  }

private:
108
  ThreadScheduler *m_scheduler;
109
110
111
  size_t depth;
};

112
int ThreadPoolTest_TaskThatThrows_counter = 0;
113

114
//=======================================================================================
115
class ThreadPoolTest : public CxxTest::TestSuite {
116
public:
117
118
  /** Make it waste time, 0 to 16 seconds
   * DISABLED because it is (intentionally) slow. */
119
  void xtest_Scheduler_LargestCostFirst_wastetime() {
120
121
    ThreadPool p(new ThreadSchedulerFIFO(), 0);
    threadpooltest_vec.clear();
122
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 0);
123
124
    TimeWaster mywaster;

125
    for (int i = 0; i < 16; i++) {
126
127
      double cost = i; // time is exactly i
      // Bind to a member function of mywaster
128
129
      p.schedule(new FunctionTask(
          boost::bind(&TimeWaster::waste_time_with_lock, &mywaster, i), cost));
130
131
132
133
    }

    Timer overall;

134
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
135

136
    std::cout << overall.elapsed() << " secs total.\n";
137
138
  }

139
  void test_Constructor() { ThreadPool p; }
140

141
  void test_schedule() {
142
    ThreadPool p;
143
144
145
146
147
    TS_ASSERT_EQUALS(threadpooltest_check, 0);
    p.schedule(new FunctionTask(threadpooltest_function));
    TS_ASSERT_EQUALS(threadpooltest_check, 0);
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
148
149
  }

150
151
152
  //=======================================================================================
  //=======================================================================================
  /** Class for debugging progress reporting */
153
  class MyTestProgress : public ProgressBase {
154
  public:
155
156
157
    MyTestProgress(double start, double end, int64_t numSteps,
                   ThreadPoolTest *myParent)
        : ProgressBase(start, end, numSteps), parent(myParent) {}
158

159
    void doReport(const std::string &msg = "") override {
160
161
      parent->last_report_message = msg;
      parent->last_report_counter = m_i;
162
      double p = m_start + m_step * double(m_i - m_ifirst);
163
164
165
166
      parent->last_report_value = p;
    }

  public:
167
    ThreadPoolTest *parent;
168
169
170
  };

  /// Index that was last set at doReport
Janik Zikovsky's avatar
Janik Zikovsky committed
171
  int64_t last_report_counter;
172
173
174
  double last_report_value;
  std::string last_report_message;

175
  void test_with_progress_reporting() {
176
    last_report_counter = 0;
177
178
179
    ThreadPool p(new ThreadSchedulerFIFO(), 1,
                 new MyTestProgress(0.0, 1.0, 10, this));
    for (int i = 0; i < 10; i++) {
180
      double cost = i;
181
      p.schedule(new FunctionTask(threadpooltest_function, cost));
182
    }
183
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
184
    // The test reporter was called
185
    TS_ASSERT_EQUALS(last_report_counter, 10);
186
187
  }

188
189
  //=======================================================================================

190
  /** Start a threadpool before adding tasks
191
192
193
   * DISABLED because the timing issues make it somewhat unreliable under heavy
   * loads. */
  void xtest_start_and_wait() {
194
195
196
197
198
199
200
201
202
203
    ThreadPool p; // Makes a default scheduler
    threadpooltest_check = 0;

    // Start and allow it to wait for 1 second
    p.start(1.0);

    // Simulate doing some work
    Poco::Thread::sleep(40);

    // Now you add the task
204
    p.schedule(new FunctionTask(threadpooltest_function));
205
206
207
208
209

    // Simulate doing more work (this allows the task to run)
    Poco::Thread::sleep(40);

    // The task ran before we called joinAll(). Magic!
210
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
211

212
213
    // Reset and try again. The threads are still waiting, it has been less than
    // 1 second.
214
    threadpooltest_check = 0;
215
    p.schedule(new FunctionTask(threadpooltest_function));
216
    Poco::Thread::sleep(40);
217
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
218
219
220
221
222

    // You still need to call joinAll() to clean up everything.
    p.joinAll();

    // Ok, the task did execute.
223
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
224
225
  }

226
  /** Start a threadpool before adding tasks. But the wait time was too short!
227
228
229
   * DISABLED because the timing issues make it somewhat unreliable under heavy
   * loads. */
  void xtest_start_and_wait_short_wait_time() {
230
231
232
233
234
235
236
    ThreadPool p; // Makes a default scheduler
    threadpooltest_check = 0;

    // Start and allow it to wait for a very short time
    p.start(0.03);

    // But it takes too long before the task is actually added
237
    Poco::Thread::sleep(100);
238
    p.schedule(new FunctionTask(threadpooltest_function));
239
240
    Poco::Thread::sleep(30);
    // So the task has not run, since the threads exited before!
241
    TS_ASSERT_EQUALS(threadpooltest_check, 0);
242
243
244
245

    // But you can still call joinAll() to run the task that is waiting.
    p.joinAll();
    // Ok, the task did execute.
246
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
247
248
  }

249
250
  //=======================================================================================

251
252
253
  /** We schedule a task, run the threads, but don't abort them.
   * Then we re-schedule stuff, and re-join.
   */
254
  void test_schedule_resume_tasks() {
255
256
    ThreadPool p; // Makes a default scheduler
    threadpooltest_check = 0;
257
258
    p.schedule(new FunctionTask(threadpooltest_function));
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
259
    // Ok, the task did execute.
260
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
261
262
263

    // Now we reset.
    threadpooltest_check = 0;
264
265
266
    p.schedule(new FunctionTask(threadpooltest_function));
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
    TS_ASSERT_EQUALS(threadpooltest_check, 12);
267
268
  }

269
  void test_Scheduler_FIFO() {
270
271
272
    // Only use one core, it'll make things simpler
    ThreadPool p(new ThreadSchedulerFIFO(), 1);

273
274
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 0);
    for (int i = 0; i < 10; i++) {
275
      double cost = i;
276
277
      p.schedule(
          new FunctionTask(boost::bind(threadpooltest_adding_stuff, i), cost));
278
    }
279
280
281
282
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 10);
    if (threadpooltest_vec.size() < 10)
      return;
283
    // The first ones added are the first ones run.
284
285
286
    TS_ASSERT_EQUALS(threadpooltest_vec[0], 0);
    TS_ASSERT_EQUALS(threadpooltest_vec[1], 1);
    TS_ASSERT_EQUALS(threadpooltest_vec[2], 2);
287
288
  }

289
  void test_Scheduler_LIFO() {
290
291
    ThreadPool p(new ThreadSchedulerLIFO(), 1);
    threadpooltest_vec.clear();
292
293
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 0);
    for (int i = 0; i < 10; i++) {
294
      double cost = i;
295
296
      p.schedule(
          new FunctionTask(boost::bind(threadpooltest_adding_stuff, i), cost));
297
    }
298
299
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 10);
300
    // The last ones added are the first ones run.
301
302
303
    TS_ASSERT_EQUALS(threadpooltest_vec[0], 9);
    TS_ASSERT_EQUALS(threadpooltest_vec[1], 8);
    TS_ASSERT_EQUALS(threadpooltest_vec[2], 7);
304
305
  }

306
  void test_Scheduler_LargestCostFirst() {
307
308
309
    // Only use one core, it'll make things simpler
    ThreadPool p(new ThreadSchedulerLargestCost(), 1);
    threadpooltest_vec.clear();
310
311
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 0);
    for (int i = 0; i < 10; i++) {
312
      double cost = i;
313
314
      p.schedule(
          new FunctionTask(boost::bind(threadpooltest_adding_stuff, i), cost));
315
    }
316
317
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
    TS_ASSERT_EQUALS(threadpooltest_vec.size(), 10);
318
    // The first ones added are the first ones run.
319
320
321
    TS_ASSERT_EQUALS(threadpooltest_vec[0], 9);
    TS_ASSERT_EQUALS(threadpooltest_vec[1], 8);
    TS_ASSERT_EQUALS(threadpooltest_vec[2], 7);
322
  }
323

324
325
326
327
328
329
  //--------------------------------------------------------------------
  /** Perform a stress test on the given scheduler.
   * This runs a large number of super-short tasks; enough that the
   * queue locking is tested against simultaneous access. A segfault
   * results if the queue is improperly accessed.
   */
330
  void do_StressTest_scheduler(ThreadScheduler *sched) {
331
332
    ThreadPool p(sched, 0);
    TimeWaster mywaster;
333
    size_t num = 30000;
334
    mywaster.total = 0;
335
    boost::shared_ptr<std::mutex> lastMutex;
336
337
338
339
340
341
    for (size_t i = 0; i <= num; i++) {
      Task *task = new FunctionTask(
          boost::bind(&TimeWaster::add_to_number, &mywaster, i),
          static_cast<double>(i));
      // Create a new mutex every 1000 tasks. This is more relevant to the
      // ThreadSchedulerMutexes; others ignore it.
342
      if (i % 1000 == 0)
343
        lastMutex = boost::make_shared<std::mutex>();
344
      task->setMutex(lastMutex);
345
      p.schedule(task);
346
347
348
    }

    Timer overall;
349
    TS_ASSERT_THROWS_NOTHING(p.joinAll());
350
    // std::cout << overall.elapsed() << " secs total.\n";
351
352
353

    // Expected total
    size_t expected = (num * num + num) / 2;
354
    TS_ASSERT_EQUALS(mywaster.total, expected);
355
356
  }

357
  void test_StressTest_ThreadSchedulerFIFO() {
358
359
360
    do_StressTest_scheduler(new ThreadSchedulerFIFO());
  }

361
  void test_StressTest_ThreadSchedulerLIFO() {
362
363
364
    do_StressTest_scheduler(new ThreadSchedulerLIFO());
  }

365
  void test_StressTest_ThreadSchedulerLargestCost() {
366
367
368
    do_StressTest_scheduler(new ThreadSchedulerLargestCost());
  }

369
  void test_StressTest_ThreadSchedulerMutexes() {
370
371
372
    do_StressTest_scheduler(new ThreadSchedulerMutexes());
  }

373
374
375
  //--------------------------------------------------------------------
  /** Perform a stress test on the given scheduler.
   * This one creates tasks that create new tasks; e.g. 10 tasks each add
376
377
378
   * 10 tasks, and so on (up to a certain depth).
   * So it tests against possible segfaults of one task
   * accessing the queue while another thread is popping it.
379
   */
380
381
  void do_StressTest_TasksThatCreateTasks(ThreadScheduler *sched) {
    ThreadPool *p = new ThreadPool(sched, 0);
382
    // Create the first task, depth 0, that will recursively create 10000
383
384
    TaskThatAddsTasks *task = new TaskThatAddsTasks(sched, 0);
    p->schedule(task);
385

386
    // Reset the total
387
    TaskThatAddsTasks_counter = 0;
388
    TS_ASSERT_THROWS_NOTHING(p->joinAll());
389
390

    // Expected total = the number of lowest level entries
391
    TS_ASSERT_EQUALS(TaskThatAddsTasks_counter, 10000);
392
393
394
    delete p;
  }

395
  void test_StressTest_TasksThatCreateTasks_ThreadSchedulerFIFO() {
396
397
398
    do_StressTest_TasksThatCreateTasks(new ThreadSchedulerFIFO());
  }

399
  void test_StressTest_TasksThatCreateTasks_ThreadSchedulerLIFO() {
400
401
402
    do_StressTest_TasksThatCreateTasks(new ThreadSchedulerLIFO());
  }

403
  void test_StressTest_TasksThatCreateTasks_ThreadSchedulerLargestCost() {
404
405
406
    do_StressTest_TasksThatCreateTasks(new ThreadSchedulerLargestCost());
  }

407
  void test_StressTest_TasksThatCreateTasks_ThreadSchedulerMutexes() {
408
409
410
    do_StressTest_TasksThatCreateTasks(new ThreadSchedulerMutexes());
  }

411
412
  //=======================================================================================
  /** Task that throws an exception */
413
  class TaskThatThrows : public Task {
414
    void run() override {
415
      ThreadPoolTest_TaskThatThrows_counter++;
416
417
      throw Mantid::Kernel::Exception::NotImplementedError(
          "Test exception from TaskThatThrows.");
418
419
420
    }
  };

421
  //--------------------------------------------------------------------
422
423
  void test_TaskThatThrows() {
    ThreadPool p(new ThreadSchedulerFIFO(), 1); // one core
424
    ThreadPoolTest_TaskThatThrows_counter = 0;
425
426
    for (int i = 0; i < 10; i++) {
      p.schedule(new TaskThatThrows());
427
428
    }
    // joinAll rethrows
429
    TS_ASSERT_THROWS(p.joinAll(), std::runtime_error);
430
431
432
    // And only one of the tasks actually ran (since we're on one core)
    TS_ASSERT_EQUALS(ThreadPoolTest_TaskThatThrows_counter, 1);
  }
433
434
435
};

#endif