Parallel.cpp 6.24 KB
Newer Older
1
//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2
//
3
4
5
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6
7
8
//
//===----------------------------------------------------------------------===//

9
#include "llvm/Support/Parallel.h"
10
#include "llvm/Config/llvm-config.h"
11
#include "llvm/Support/ManagedStatic.h"
Rafael Espindola's avatar
Rafael Espindola committed
12
#include "llvm/Support/Threading.h"
13
14

#include <atomic>
15
#include <future>
16
#include <stack>
Rui Ueyama's avatar
Rui Ueyama committed
17
#include <thread>
18
#include <vector>
19

20
21
llvm::ThreadPoolStrategy llvm::parallel::strategy;

22
23
#if LLVM_ENABLE_THREADS

24
25
26
namespace llvm {
namespace parallel {
namespace detail {
27
28
29

namespace {

30
/// An abstract class that takes closures and runs them asynchronously.
31
32
33
34
35
36
37
38
class Executor {
public:
  virtual ~Executor() = default;
  virtual void add(std::function<void()> func) = 0;

  static Executor *getDefaultExecutor();
};

39
/// An implementation of an Executor that runs closures on a thread pool
40
41
42
///   in filo order.
class ThreadPoolExecutor : public Executor {
public:
43
44
  explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
    unsigned ThreadCount = S.compute_thread_count();
45
46
    // Spawn all but one of the threads in another thread as spawning threads
    // can take a while.
47
48
49
    Threads.reserve(ThreadCount);
    Threads.resize(1);
    std::lock_guard<std::mutex> Lock(Mutex);
50
51
52
    Threads[0] = std::thread([this, ThreadCount, S] {
      for (unsigned I = 1; I < ThreadCount; ++I) {
        Threads.emplace_back([=] { work(S, I); });
53
54
        if (Stop)
          break;
55
      }
56
      ThreadsCreated.set_value();
57
      work(S, 0);
58
    });
59
60
  }

61
62
63
64
65
66
67
  void stop() {
    {
      std::lock_guard<std::mutex> Lock(Mutex);
      if (Stop)
        return;
      Stop = true;
    }
68
    Cond.notify_all();
69
    ThreadsCreated.get_future().wait();
70
71
  }

72
73
74
75
76
77
78
79
80
81
  ~ThreadPoolExecutor() override {
    stop();
    std::thread::id CurrentThreadId = std::this_thread::get_id();
    for (std::thread &T : Threads)
      if (T.get_id() == CurrentThreadId)
        T.detach();
      else
        T.join();
  }

82
83
84
  struct Creator {
    static void *call() { return new ThreadPoolExecutor(strategy); }
  };
85
86
87
88
  struct Deleter {
    static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
  };

89
  void add(std::function<void()> F) override {
90
91
92
93
    {
      std::lock_guard<std::mutex> Lock(Mutex);
      WorkStack.push(F);
    }
94
95
96
97
    Cond.notify_one();
  }

private:
98
99
  void work(ThreadPoolStrategy S, unsigned ThreadID) {
    S.apply_thread_strategy(ThreadID);
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    while (true) {
      std::unique_lock<std::mutex> Lock(Mutex);
      Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
      if (Stop)
        break;
      auto Task = WorkStack.top();
      WorkStack.pop();
      Lock.unlock();
      Task();
    }
  }

  std::atomic<bool> Stop{false};
  std::stack<std::function<void()>> WorkStack;
  std::mutex Mutex;
  std::condition_variable Cond;
116
117
  std::promise<void> ThreadsCreated;
  std::vector<std::thread> Threads;
118
119
120
};

Executor *Executor::getDefaultExecutor() {
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
  // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
  // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
  // stops the thread pool and waits for any worker thread creation to complete
  // but does not wait for the threads to finish. The wait for worker thread
  // creation to complete is important as it prevents intermittent crashes on
  // Windows due to a race condition between thread creation and process exit.
  //
  // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
  // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
  // destructor ensures it has been stopped and waits for worker threads to
  // finish. The wait is important as it prevents intermittent crashes on
  // Windows when the process is doing a full exit.
  //
  // The Windows crashes appear to only occur with the MSVC static runtimes and
  // are more frequent with the debug static runtime.
  //
  // This also prevents intermittent deadlocks on exit with the MinGW runtime.
138
139

  static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
140
141
142
143
                       ThreadPoolExecutor::Deleter>
      ManagedExec;
  static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
  return Exec.get();
144
}
145
} // namespace
146

147
148
149
150
151
152
153
static std::atomic<int> TaskGroupInstances;

// Latch::sync() called by the dtor may cause one thread to block. If is a dead
// lock if all threads in the default executor are blocked. To prevent the dead
// lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
// of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
154
155
156
157
158
159
TaskGroup::~TaskGroup() {
  // We must ensure that all the workloads have finished before decrementing the
  // instances count.
  L.sync();
  --TaskGroupInstances;
}
160
161
162
163
164
165
166
167
168

void TaskGroup::spawn(std::function<void()> F) {
  if (Parallel) {
    L.inc();
    Executor::getDefaultExecutor()->add([&, F] {
      F();
      L.dec();
    });
  } else {
169
    F();
170
  }
171
}
172
173
174
175

} // namespace detail
} // namespace parallel
} // namespace llvm
176
#endif // LLVM_ENABLE_THREADS
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

void llvm::parallelForEachN(size_t Begin, size_t End,
                            llvm::function_ref<void(size_t)> Fn) {
  // If we have zero or one items, then do not incur the overhead of spinning up
  // a task group.  They are surprisingly expensive, and because they do not
  // support nested parallelism, a single entry task group can block parallel
  // execution underneath them.
#if LLVM_ENABLE_THREADS
  auto NumItems = End - Begin;
  if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
    // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
    // overhead on large inputs.
    auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
    if (TaskSize == 0)
      TaskSize = 1;

    parallel::detail::TaskGroup TG;
    for (; Begin + TaskSize < End; Begin += TaskSize) {
      TG.spawn([=, &Fn] {
        for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
          Fn(I);
      });
    }
    for (; Begin != End; ++Begin)
      Fn(Begin);
    return;
  }
#endif

  for (; Begin != End; ++Begin)
    Fn(Begin);
}