Skip to content
Snippets Groups Projects
ThreadPool.cpp 4.75 KiB
Newer Older
Janik Zikovsky's avatar
Janik Zikovsky committed
//----------------------------------------------------------------------
// Includes
//----------------------------------------------------------------------
#include "MantidKernel/MultiThreaded.h"
Janik Zikovsky's avatar
Janik Zikovsky committed
#include "MantidKernel/ThreadPool.h"
#include "MantidKernel/ThreadPoolRunnable.h"
Janik Zikovsky's avatar
Janik Zikovsky committed
#include "MantidKernel/Task.h"
#include <algorithm>
Janik Zikovsky's avatar
Janik Zikovsky committed
#include <cmath>
#include <cfloat>
#include <Poco/Mutex.h>
#include <Poco/Runnable.h>
#include <Poco/Thread.h>
Janik Zikovsky's avatar
Janik Zikovsky committed

namespace Mantid
{
namespace Kernel
{

  //--------------------------------------------------------------------------------
Janik Zikovsky's avatar
Janik Zikovsky committed
  /** Constructor
   *
   * @param scheduler :: an instance of a ThreadScheduler to schedule tasks.
   *        NOTE: The ThreadPool destructor will delete this.
Janik Zikovsky's avatar
Janik Zikovsky committed
   * @param numThreads :: number of cores to use; default = 0, meaning auto-detect all
   *        available physical cores.
   */
  ThreadPool::ThreadPool( ThreadScheduler * scheduler, size_t numThreads)
    : m_scheduler(scheduler), m_started(false)
      throw std::invalid_argument("NULL ThreadScheduler passed to ThreadPool constructor.");

Janik Zikovsky's avatar
Janik Zikovsky committed
    if (numThreads == 0)
    {
      //Uses OpenMP to find how many cores there are.
Janik Zikovsky's avatar
Janik Zikovsky committed
    }
    else
      m_numThreads = numThreads;
    //std::cout << m_numThreads << " m_numThreads \n";
  //--------------------------------------------------------------------------------
  /** Destructor. Deletes the ThreadScheduler.
   */
  ThreadPool::~ThreadPool()
  {
    if (m_scheduler)
      delete m_scheduler;
  }

  //--------------------------------------------------------------------------------
  /** Return the number of physical cores available on the system.
   * NOTE: Uses OpenMP getMaxThreads to find the number.
   * @return how many cores are present. 1 if no OpenMP is installed.
   */
  size_t ThreadPool::getNumPhysicalCores()
  {
    return PARALLEL_GET_MAX_THREADS;
  }



Janik Zikovsky's avatar
Janik Zikovsky committed
  //--------------------------------------------------------------------------------
  /** Start the threads and begin looking for tasks.
   * @throw runtime_error if called when it has already started.
   */
  void ThreadPool::start()
  {
    if (m_started)
      throw std::runtime_error("Threads have already started.");

    // Now, launch that many threads and let them wait for new tasks.
    m_threads.clear();
    for (size_t i = 0; i < m_numThreads; i++)
    {
      // Make a descriptive name
      std::ostringstream name;
      name << "Thread" << i;
      // Create the thread
      Poco::Thread * thread = new Poco::Thread(name.str());
      m_threads.push_back(thread);

      // Make the runnable object and run it
      ThreadPoolRunnable * runnable = new ThreadPoolRunnable(i, m_scheduler);
      thread->start(*runnable);
    }
    // Yep, all the threads are running.
    m_started = true;
  }

  //--------------------------------------------------------------------------------
  /** Schedule a task for later execution. If the threadpool is running,
   * it will be picked up by the next available thread.
Janik Zikovsky's avatar
Janik Zikovsky committed
   *
   * @param task :: pointer to a Task object to run.
   * @param start :: start the thread at the same time; default false
  void ThreadPool::schedule(Task * task, bool start)
Janik Zikovsky's avatar
Janik Zikovsky committed
  {
    if (task)
      m_scheduler->push(task);
      // Start all the threads if they were not already.
      if (start && !m_started)
        this->start();
Janik Zikovsky's avatar
Janik Zikovsky committed
  }



  //--------------------------------------------------------------------------------
  /** Method to perform sorting of task lists.
   * This prioritizes long tasks, so they end up at start of the list.
   *
   * @param lhs :: Task*
   * @param rhs :: Task*
   * @return true if lhs < rhs (aka lhs should be first)
   */
  bool compareTasks(Task * lhs, Task * rhs)
  {
    return (lhs->cost() > rhs->cost());
  }


  //--------------------------------------------------------------------------------
  /** Begin the execution of all scheduled tasks.
   * TODO: Make it parallel! For now, serial execution.
   *
   */
Janik Zikovsky's avatar
Janik Zikovsky committed
  void ThreadPool::joinAll()
    // Start all the threads if they were not already.
    if (!m_started)
      this->start();
    // Sequentially join all the threads.
    for (size_t i=0; i < m_threads.size(); i++)
    {
      m_threads[i]->join();
      // Delete the old thread
      delete m_threads[i];
    // Clear the vectors (the threads are deleted now).
    m_threads.clear();
    for (size_t i=0; i < m_runnables.size(); i++)
      delete m_runnables[i];
    m_runnables.clear();

    // Did one of the threads abort or throw an exception?
    if (m_scheduler->getAborted())
    {
      // Re-raise the error
      throw m_scheduler->getAbortException();
    }
Janik Zikovsky's avatar
Janik Zikovsky committed
  }



} // namespace Kernel
} // namespace Mantid