Commit 8404aeb5 authored by Alexandre Ganea's avatar Alexandre Ganea
Browse files

[Support] On Windows, ensure hardware_concurrency() extends to all CPU sockets and all NUMA groups

The goal of this patch is to maximize CPU utilization on multi-socket or high core count systems, so that parallel computations such as LLD/ThinLTO can use all hardware threads in the system. Before this patch, on Windows, a maximum of 64 hardware threads could be used at most, in some cases dispatched only on one CPU socket.

== Background ==
Windows doesn't have a flat cpu_set_t like Linux. Instead, it projects hardware CPUs (or NUMA nodes) to applications through a concept of "processor groups". A "processor" is the smallest unit of execution on a CPU, that is, an hyper-thread if SMT is active; a core otherwise. There's a limit of 32-bit processors on older 32-bit versions of Windows, which later was raised to 64-processors with 64-bit versions of Windows. This limit comes from the affinity mask, which historically is represented by the sizeof(void*). Consequently, the concept of "processor groups" was introduced for dealing with systems with more than 64 hyper-threads.

By default, the Windows OS assigns only one "processor group" to each starting application, in a round-robin manner. If the application wants to use more processors, it needs to programmatically enable it, by assigning threads to other "processor groups". This also means that affinity cannot cross "processor group" boundaries; one can only specify a "preferred" group on start-up, but the application is free to allocate more groups if it wants to.

This creates a peculiar situation, where newer CPUs like the AMD EPYC 7702P (64-cores, 128-hyperthreads) are projected by the OS as two (2) "processor groups". This means that by default, an application can only use half of the cores. This situation could only get worse in the years to come, as dies with more cores will appear on the market.

== The problem ==
The heavyweight_hardware_concurrency() API was introduced so that only *one hardware thread per core* was used. Once that API returns, that original intention is lost, only the number of threads is retained. Consider a situation, on Windows, where the system has 2 CPU sockets, 18 cores each, each core having 2 hyper-threads, for a total of 72 hyper-threads. Both heavyweight_hardware_concurrency() and hardware_concurrency() currently return 36, because on Windows they are simply wrappers over std::thread::hardware_concurrency() -- which can only return processors from the current "processor group".

== The changes in this patch ==
To solve this situation, we capture (and retain) the initial intention until the point of usage, through a new ThreadPoolStrategy class. The number of threads to use is deferred as late as possible, until the moment where the std::threads are created (ThreadPool in the case of ThinLTO).

When using hardware_concurrency(), setting ThreadCount to 0 now means to use all the possible hardware CPU (SMT) threads. Providing a ThreadCount above to the maximum number of threads will have no effect, the maximum will be used instead.
The heavyweight_hardware_concurrency() is similar to hardware_concurrency(), except that only one thread per hardware *core* will be used.

When LLVM_ENABLE_THREADS is OFF, the threading APIs will always return 1, to ensure any caller loops will be exercised at least once.

Differential Revision: https://reviews.llvm.org/D71775
parent d9049e87
......@@ -268,8 +268,7 @@ int main(int argc, const char **argv) {
Error = false;
llvm::sys::Mutex IndexMutex;
// ExecutorConcurrency is a flag exposed by AllTUsExecution.h
llvm::ThreadPool Pool(ExecutorConcurrency == 0 ? llvm::hardware_concurrency()
: ExecutorConcurrency);
llvm::ThreadPool Pool(llvm::hardware_concurrency(ExecutorConcurrency));
for (auto &Group : USRToBitcode) {
Pool.async([&]() {
std::vector<std::unique_ptr<doc::Info>> Infos;
......
......@@ -842,13 +842,7 @@ std::string renderTUAction(const TUAction &Action) {
} // namespace
unsigned getDefaultAsyncThreadsCount() {
unsigned HardwareConcurrency = llvm::heavyweight_hardware_concurrency();
// heavyweight_hardware_concurrency may fall back to hardware_concurrency.
// C++ standard says that hardware_concurrency() may return 0; fallback to 1
// worker thread in that case.
if (HardwareConcurrency == 0)
return 1;
return HardwareConcurrency;
return llvm::heavyweight_hardware_concurrency().compute_thread_count();
}
FileStatus TUStatus::render(PathRef File) const {
......
......@@ -148,9 +148,10 @@ BackgroundIndex::BackgroundIndex(
CDB.watch([&](const std::vector<std::string> &ChangedFiles) {
enqueue(ChangedFiles);
})) {
assert(ThreadPoolSize > 0 && "Thread pool size can't be zero.");
assert(Rebuilder.TUsBeforeFirstBuild > 0 &&
"Thread pool size can't be zero.");
assert(this->IndexStorageFactory && "Storage factory can not be null!");
for (unsigned I = 0; I < ThreadPoolSize; ++I) {
for (unsigned I = 0; I < Rebuilder.TUsBeforeFirstBuild; ++I) {
ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1), [this] {
WithContext Ctx(this->BackgroundContext.clone());
Queue.work([&] { Rebuilder.idle(); });
......
......@@ -135,7 +135,7 @@ public:
Context BackgroundContext, const FileSystemProvider &,
const GlobalCompilationDatabase &CDB,
BackgroundIndexStorage::Factory IndexStorageFactory,
size_t ThreadPoolSize = llvm::heavyweight_hardware_concurrency(),
size_t ThreadPoolSize = 0, // 0 = use all hardware threads
std::function<void(BackgroundQueue::Stats)> OnProgress = nullptr);
~BackgroundIndex(); // Blocks while the current task finishes.
......
......@@ -49,7 +49,9 @@ class BackgroundIndexRebuilder {
public:
BackgroundIndexRebuilder(SwapIndex *Target, FileSymbols *Source,
unsigned Threads)
: TUsBeforeFirstBuild(Threads), Target(Target), Source(Source) {}
: TUsBeforeFirstBuild(llvm::heavyweight_hardware_concurrency(Threads)
.compute_thread_count()),
Target(Target), Source(Source) {}
// Called to indicate a TU has been indexed.
// May rebuild, if enough TUs have been indexed.
......
......@@ -114,8 +114,7 @@ llvm::Error AllTUsToolExecutor::execute(
auto &Action = Actions.front();
{
llvm::ThreadPool Pool(ThreadCount == 0 ? llvm::hardware_concurrency()
: ThreadCount);
llvm::ThreadPool Pool(llvm::hardware_concurrency(ThreadCount));
for (std::string File : Files) {
Pool.async(
[&](std::string Path) {
......
......@@ -106,7 +106,8 @@ DependencyScanningFilesystemSharedCache::
// sharding gives a performance edge by reducing the lock contention.
// FIXME: A better heuristic might also consider the OS to account for
// the different cost of lock contention on different OSes.
NumShards = std::max(2u, llvm::hardware_concurrency() / 4);
NumShards =
std::max(2u, llvm::hardware_concurrency().compute_thread_count() / 4);
CacheShards = std::make_unique<CacheShard[]>(NumShards);
}
......
......@@ -485,15 +485,9 @@ int main(int argc, const char **argv) {
DependencyScanningService Service(ScanMode, Format, ReuseFileManager,
SkipExcludedPPRanges);
#if LLVM_ENABLE_THREADS
unsigned NumWorkers =
NumThreads == 0 ? llvm::hardware_concurrency() : NumThreads;
#else
unsigned NumWorkers = 1;
#endif
llvm::ThreadPool Pool(NumWorkers);
llvm::ThreadPool Pool(llvm::hardware_concurrency(NumThreads));
std::vector<std::unique_ptr<DependencyScanningTool>> WorkerTools;
for (unsigned I = 0; I < NumWorkers; ++I)
for (unsigned I = 0; I < Pool.getThreadCount(); ++I)
WorkerTools.push_back(std::make_unique<DependencyScanningTool>(Service));
std::vector<SingleCommandCompilationDatabase> Inputs;
......@@ -508,9 +502,9 @@ int main(int argc, const char **argv) {
if (Verbose) {
llvm::outs() << "Running clang-scan-deps on " << Inputs.size()
<< " files using " << NumWorkers << " workers\n";
<< " files using " << Pool.getThreadCount() << " workers\n";
}
for (unsigned I = 0; I < NumWorkers; ++I) {
for (unsigned I = 0; I < Pool.getThreadCount(); ++I) {
Pool.async([I, &Lock, &Index, &Inputs, &HadErrors, &FD, &WorkerTools,
&DependencyOS, &Errs]() {
llvm::StringSet<> AlreadySeenModules;
......
......@@ -2747,8 +2747,8 @@ createSymbols(ArrayRef<std::vector<GdbIndexSection::NameAttrEntry>> nameAttrs,
size_t numShards = 32;
size_t concurrency = 1;
if (threadsEnabled)
concurrency =
std::min<size_t>(PowerOf2Floor(hardware_concurrency()), numShards);
concurrency = std::min<size_t>(
hardware_concurrency().compute_thread_count(), numShards);
// A sharded map to uniquify symbols by name.
std::vector<DenseMap<CachedHashStringRef, size_t>> map(numShards);
......@@ -3191,8 +3191,8 @@ void MergeNoTailSection::finalizeContents() {
// operations in the following tight loop.
size_t concurrency = 1;
if (threadsEnabled)
concurrency =
std::min<size_t>(PowerOf2Floor(hardware_concurrency()), numShards);
concurrency = std::min<size_t>(
hardware_concurrency().compute_thread_count(), numShards);
// Add section pieces to the builders.
parallelForEachN(0, concurrency, [&](size_t threadId) {
......
......@@ -227,7 +227,8 @@ using ThinBackend = std::function<std::unique_ptr<ThinBackendProc>(
AddStreamFn AddStream, NativeObjectCache Cache)>;
/// This ThinBackend runs the individual backend jobs in-process.
ThinBackend createInProcessThinBackend(unsigned ParallelismLevel);
/// The default value means to use one job per hardware core (not hyper-thread).
ThinBackend createInProcessThinBackend(unsigned ParallelismLevel = 0);
/// This ThinBackend writes individual module indexes to files, instead of
/// running the individual backend jobs. This backend is for distributed builds
......
......@@ -13,7 +13,9 @@
#ifndef LLVM_SUPPORT_THREAD_POOL_H
#define LLVM_SUPPORT_THREAD_POOL_H
#include "llvm/ADT/BitVector.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Threading.h"
#include "llvm/Support/thread.h"
#include <future>
......@@ -38,12 +40,11 @@ public:
using TaskTy = std::function<void()>;
using PackagedTaskTy = std::packaged_task<void()>;
/// Construct a pool with the number of threads found by
/// hardware_concurrency().
ThreadPool();
/// Construct a pool of \p ThreadCount threads
ThreadPool(unsigned ThreadCount);
/// Construct a pool using the hardware strategy \p S for mapping hardware
/// execution resources (threads, cores, CPUs)
/// Defaults to using the maximum execution resources in the system, but
/// excluding any resources contained in the affinity mask.
ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
/// Blocking destructor: the pool will wait for all the threads to complete.
~ThreadPool();
......@@ -68,6 +69,8 @@ public:
/// It is an error to try to add new tasks while blocking on this call.
void wait();
unsigned getThreadCount() const { return ThreadCount; }
private:
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
......@@ -94,6 +97,8 @@ private:
/// Signal for the destruction of the pool, asking thread to exit.
bool EnableFlag;
#endif
unsigned ThreadCount;
};
}
......
......@@ -14,6 +14,7 @@
#ifndef LLVM_SUPPORT_THREADING_H
#define LLVM_SUPPORT_THREADING_H
#include "llvm/ADT/BitVector.h"
#include "llvm/ADT/FunctionExtras.h"
#include "llvm/ADT/SmallVector.h"
#include "llvm/Config/llvm-config.h" // for LLVM_ON_UNIX
......@@ -143,20 +144,52 @@ void llvm_execute_on_thread_async(
#endif
}
/// Get the amount of currency to use for tasks requiring significant
/// memory or other resources. Currently based on physical cores, if
/// available for the host system, otherwise falls back to
/// thread::hardware_concurrency().
/// Returns 1 when LLVM is configured with LLVM_ENABLE_THREADS=OFF
unsigned heavyweight_hardware_concurrency();
/// Get the number of threads that the current program can execute
/// concurrently. On some systems std::thread::hardware_concurrency() returns
/// the total number of cores, without taking affinity into consideration.
/// Returns 1 when LLVM is configured with LLVM_ENABLE_THREADS=OFF.
/// Fallback to std::thread::hardware_concurrency() if sched_getaffinity is
/// not available.
unsigned hardware_concurrency();
/// This tells how a thread pool will be used
class ThreadPoolStrategy {
public:
// The default value (0) means all available threads should be used,
// excluding affinity mask. If set, this value only represents a suggested
// high bound, the runtime might choose a lower value (not higher).
unsigned ThreadsRequested = 0;
// If SMT is active, use hyper threads. If false, there will be only one
// std::thread per core.
bool UseHyperThreads = true;
/// Retrieves the max available threads for the current strategy. This
/// accounts for affinity masks and takes advantage of all CPU sockets.
unsigned compute_thread_count() const;
/// Assign the current thread to an ideal hardware CPU or NUMA node. In a
/// multi-socket system, this ensures threads are assigned to all CPU
/// sockets. \p ThreadPoolNum represents a number bounded by [0,
/// compute_thread_count()).
void apply_thread_strategy(unsigned ThreadPoolNum) const;
};
/// Returns a thread strategy for tasks requiring significant memory or other
/// resources. To be used for workloads where hardware_concurrency() proves to
/// be less efficient. Avoid this strategy if doing lots of I/O. Currently
/// based on physical cores, if available for the host system, otherwise falls
/// back to hardware_concurrency(). Returns 1 when LLVM is configured with
/// LLVM_ENABLE_THREADS = OFF
inline ThreadPoolStrategy
heavyweight_hardware_concurrency(unsigned ThreadCount = 0) {
ThreadPoolStrategy S;
S.UseHyperThreads = false;
S.ThreadsRequested = ThreadCount;
return S;
}
/// Returns a default thread strategy where all available hardware ressources
/// are to be used, except for those initially excluded by an affinity mask.
/// This function takes affinity into consideration. Returns 1 when LLVM is
/// configured with LLVM_ENABLE_THREADS=OFF.
inline ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount = 0) {
ThreadPoolStrategy S;
S.ThreadsRequested = ThreadCount;
return S;
}
/// Return the current thread id, as used in various OS system calls.
/// Note that not all platforms guarantee that the value returned will be
......@@ -184,6 +217,14 @@ void llvm_execute_on_thread_async(
/// the operation succeeded or failed is returned.
void get_thread_name(SmallVectorImpl<char> &Name);
/// Returns a mask that represents on which hardware thread, core, CPU, NUMA
/// group, the calling thread can be executed. On Windows, threads cannot
/// cross CPU boundaries.
llvm::BitVector get_thread_affinity_mask();
/// Returns how many physical CPUs or NUMA groups the system has.
unsigned get_cpus();
enum class ThreadPriority {
Background = 0,
Default = 1,
......
......@@ -51,7 +51,7 @@ std::unique_ptr<Module> llvm::splitCodeGen(
// Create ThreadPool in nested scope so that threads will be joined
// on destruction.
{
ThreadPool CodegenThreadPool(OSs.size());
ThreadPool CodegenThreadPool(hardware_concurrency(OSs.size()));
int ThreadCount = 0;
SplitModule(
......
......@@ -2446,7 +2446,7 @@ bool DWARFLinker::link() {
}
EmitLambda();
} else {
ThreadPool Pool(2);
ThreadPool Pool(hardware_concurrency(2));
Pool.async(AnalyzeAll);
Pool.async(CloneAll);
Pool.wait();
......
......@@ -445,7 +445,7 @@ Error DwarfTransformer::convert(uint32_t NumThreads) {
// Now parse all DIEs in case we have cross compile unit references in a
// thread pool.
ThreadPool pool(NumThreads);
ThreadPool pool(hardware_concurrency(NumThreads));
for (const auto &CU : DICtx.compile_units())
pool.async([&CU]() { CU->getUnitDIE(false /*CUDieOnly*/); });
pool.wait();
......
......@@ -157,7 +157,8 @@ LLJIT::LLJIT(LLJITBuilderState &S, Error &Err)
if (S.NumCompileThreads > 0) {
TransformLayer->setCloneToNewContextOnEmit(true);
CompileThreads = std::make_unique<ThreadPool>(S.NumCompileThreads);
CompileThreads =
std::make_unique<ThreadPool>(hardware_concurrency(S.NumCompileThreads));
ES->setDispatchMaterialization(
[this](JITDylib &JD, std::unique_ptr<MaterializationUnit> MU) {
// FIXME: Switch to move capture once we have c++14.
......
......@@ -477,8 +477,7 @@ LTO::RegularLTOState::RegularLTOState(unsigned ParallelCodeGenParallelismLevel,
LTO::ThinLTOState::ThinLTOState(ThinBackend Backend)
: Backend(Backend), CombinedIndex(/*HaveGVs*/ false) {
if (!Backend)
this->Backend =
createInProcessThinBackend(llvm::heavyweight_hardware_concurrency());
this->Backend = createInProcessThinBackend();
}
LTO::LTO(Config Conf, ThinBackend Backend,
......@@ -1095,7 +1094,8 @@ public:
const StringMap<GVSummaryMapTy> &ModuleToDefinedGVSummaries,
AddStreamFn AddStream, NativeObjectCache Cache)
: ThinBackendProc(Conf, CombinedIndex, ModuleToDefinedGVSummaries),
BackendThreadPool(ThinLTOParallelismLevel),
BackendThreadPool(
heavyweight_hardware_concurrency(ThinLTOParallelismLevel)),
AddStream(std::move(AddStream)), Cache(std::move(Cache)) {
for (auto &Name : CombinedIndex.cfiFunctionDefs())
CfiFunctionDefs.insert(
......
......@@ -375,7 +375,8 @@ void codegen(const Config &Conf, TargetMachine *TM, AddStreamFn AddStream,
void splitCodeGen(const Config &C, TargetMachine *TM, AddStreamFn AddStream,
unsigned ParallelCodeGenParallelismLevel,
std::unique_ptr<Module> Mod) {
ThreadPool CodegenThreadPool(ParallelCodeGenParallelismLevel);
ThreadPool CodegenThreadPool(
heavyweight_hardware_concurrency(ParallelCodeGenParallelismLevel));
unsigned ThreadCount = 0;
const Target *T = &TM->getTarget();
......
......@@ -80,8 +80,8 @@ extern cl::opt<std::string> RemarksFormat;
namespace {
static cl::opt<int>
ThreadCount("threads", cl::init(llvm::heavyweight_hardware_concurrency()));
// Default to using one job per hardware core in the system
static cl::opt<int> ThreadCount("threads", cl::init(0));
// Simple helper to save temporary files for debug.
static void saveTempBitcode(const Module &TheModule, StringRef TempDir,
......@@ -1042,7 +1042,7 @@ void ThinLTOCodeGenerator::run() {
// Parallel optimizer + codegen
{
ThreadPool Pool(ThreadCount);
ThreadPool Pool(heavyweight_hardware_concurrency(ThreadCount));
for (auto IndexCount : ModulesOrdering) {
auto &Mod = Modules[IndexCount];
Pool.async([&](int count) {
......
......@@ -1266,7 +1266,7 @@ StringRef sys::getHostCPUName() { return "generic"; }
// On Linux, the number of physical cores can be computed from /proc/cpuinfo,
// using the number of unique physical/core id pairs. The following
// implementation reads the /proc/cpuinfo format on an x86_64 system.
static int computeHostNumPhysicalCores() {
int computeHostNumPhysicalCores() {
// Read /proc/cpuinfo as a stream (until EOF reached). It cannot be
// mmapped because it appears to have 0 size.
llvm::ErrorOr<std::unique_ptr<llvm::MemoryBuffer>> Text =
......@@ -1312,7 +1312,7 @@ static int computeHostNumPhysicalCores() {
#include <sys/sysctl.h>
// Gets the number of *physical cores* on the machine.
static int computeHostNumPhysicalCores() {
int computeHostNumPhysicalCores() {
uint32_t count;
size_t len = sizeof(count);
sysctlbyname("hw.physicalcpu", &count, &len, NULL, 0);
......@@ -1326,6 +1326,9 @@ static int computeHostNumPhysicalCores() {
}
return count;
}
#elif defined(_WIN32)
// Defined in llvm/lib/Support/Windows/Threading.inc
int computeHostNumPhysicalCores();
#else
// On other systems, return -1 to indicate unknown.
static int computeHostNumPhysicalCores() { return -1; }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment