Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ThreadSchedulerMutexes.h 5.82 KiB
#ifndef MANTID_KERNEL_THREADSCHEDULERMUTEXES_H_
#define MANTID_KERNEL_THREADSCHEDULERMUTEXES_H_

#include "MantidKernel/DllConfig.h"
#include "MantidKernel/ThreadScheduler.h"
#include <set>

namespace Mantid {
namespace Kernel {

/** ThreadSchedulerMutexes : Version of a ThreadSchedulerLargestCost
 * that also makes sure to not try to schedule two tasks with the
 * same mutex at the same time.
 *
 * This scheduler also sorts by largest cost so as to optimize allocation
 * that way.
 *
 * NOTE: The performance of "pop"ping a task is much slower if you have a very
 * large number of different mutexes; this scheduler is better suited if you
 * only have a few (e.g. one for DiskIO, and NULL for calculations).
 * Popping a task scales with the N^2 where N is the number of different
 *mutexes.
 *
 * @author Janik Zikovsky
 * @date 2011-02-25 16:39:43.233991
 */
class DLLExport ThreadSchedulerMutexes : public ThreadScheduler {
public:
  ThreadSchedulerMutexes() : ThreadScheduler() {}

  ~ThreadSchedulerMutexes() override { clear(); }

  //-------------------------------------------------------------------------------
  void push(Task *newTask) override {
    // Cache the total cost
    Kernel::LockGuardMutex lock(m_queueLock);
    m_cost += newTask->cost();

    boost::shared_ptr<Mutex> mut = newTask->getMutex();
    m_supermap[mut].emplace(newTask->cost(), newTask);
  }

  //-------------------------------------------------------------------------------
  Task *pop(size_t threadnum) override {
    UNUSED_ARG(threadnum);

    Task *temp = NULL;

    Kernel::LockGuardMutex lock(m_queueLock);
    // Check the size within the same locking block; otherwise the size may
    // change before you get the next item.
    if (m_supermap.size() > 0) {
      // We iterate in reverse as to take the NULL mutex last, even if no mutex
      // is busy
      SuperMap::iterator it = m_supermap.begin();
      SuperMap::iterator it_end = m_supermap.end();
      for (; it != it_end; ++it) {
        // The key is the mutex associated with the inner map
        boost::shared_ptr<Mutex> mapMutex = it->first;
        if ((!mapMutex) || (m_mutexes.empty()) ||
            (m_mutexes.find(mapMutex) == m_mutexes.end())) {
          // The mutex of this map is free!
          InnerMap &map = it->second;

          if (map.size() > 0) {
            // Look for the largest cost item in it.
            InnerMap::iterator it2 = it->second.end();
            it2--;
            // Great, we found something.
            temp = it2->second;
            // Take it out of the map (popped)
            map.erase(it2);
            break;
          }
        }
      }
      if (temp == NULL) {
        // Nothing was found, meaning all mutexes are in use
        // Try the first non-empty map
        SuperMap::iterator it = m_supermap.begin();
        SuperMap::iterator it_end = m_supermap.end();
        for (; it != it_end; it++) {
          if (it->second.size() > 0) {
            InnerMap &map = it->second;
            // Use the first one
            temp = map.begin()->second;
            // And erase that item (pop it)
            map.erase(map.begin());
            break;
          }
        }
      }
      // If temp is still NULL, then no tasks are left.
    }

    // --- Add the mutex (if any) to the list of "busy" ones ---
    if (temp) {
      boost::shared_ptr<Mutex> mut = temp->getMutex();
      if (mut)
        m_mutexes.insert(mut);
    }

    return temp;
  }

  //-----------------------------------------------------------------------------------
  /** Signal to the scheduler that a task is complete.
   *
   * @param task :: the Task that was completed.
   * @param threadnum :: unused argument
   */
  void finished(Task *task, size_t threadnum) override {
    UNUSED_ARG(threadnum);
    boost::shared_ptr<Mutex> mut = task->getMutex();
    if (mut) {
      Kernel::LockGuardMutex lock(m_queueLock);
      // We take this mutex off the list of used ones.
      m_mutexes.erase(mut);
    }
  }

  //-------------------------------------------------------------------------------
  size_t size() override {
    Kernel::LockGuardMutex lock(m_queueLock);
    // Add up the sizes of all contained maps.
    size_t total = 0;
    SuperMap::iterator it = m_supermap.begin();
    SuperMap::iterator it_end = m_supermap.end();
    for (; it != it_end; it++)
      total += it->second.size();
    return total;
  }

  //-------------------------------------------------------------------------------
  /// @return true if the queue is empty
  bool empty() override {
    Kernel::LockGuardMutex lock(m_queueLock);
    SuperMap::iterator it = m_supermap.begin();
    SuperMap::iterator it_end = m_supermap.end();
    for (; it != it_end; it++)
      if (!it->second.empty())
        return false;
    return true;
  }

  //-------------------------------------------------------------------------------
  void clear() override {
    Kernel::LockGuardMutex lock(m_queueLock);

    // Empty out the queue and delete the pointers!
    SuperMap::iterator it = m_supermap.begin();
    SuperMap::iterator it_end = m_supermap.end();
    for (; it != it_end; it++) {
      InnerMap &map = it->second;
      InnerMap::iterator it2 = map.begin();
      InnerMap::iterator it2_end = map.end();
      for (; it2 != it2_end; it2++)
        delete it2->second;
      map.clear();
    }
    m_supermap.clear();
    m_cost = 0;
    m_costExecuted = 0;
  }

protected:
  /// Map to tasks, sorted by cost
  typedef std::multimap<double, Task *> InnerMap;
  /// Map to maps, sorted by Mutex*
  typedef std::map<boost::shared_ptr<Mutex>, InnerMap> SuperMap;

  /** A super map; first key = a Mutex *
   * Inside it: second key = the cost. */
  SuperMap m_supermap;

  /// Vector of currently used mutexes.
  std::set<boost::shared_ptr<Mutex>> m_mutexes;
};

} // namespace Mantid
} // namespace Kernel

#endif /* MANTID_KERNEL_THREADSCHEDULERMUTEXES_H_ */