Commit 8bfa6aaf authored by Alvarez, Gonzalo's avatar Alvarez, Gonzalo
Browse files

PthreadsNg: setAffinities now disabled

parent 66aa64fa
Loading
Loading
Loading
Loading
+89 −0
Original line number Diff line number Diff line
/*
Copyright (c) 2009-2017, UT-Battelle, LLC
All rights reserved

[PsimagLite, Version 1.]

*********************************************************
THE SOFTWARE IS SUPPLIED BY THE COPYRIGHT HOLDERS AND
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.

Please see full open source license included in file LICENSE.
*********************************************************

*/
#include "Concurrency.h"
#include <iostream>
#include <cstdlib>
#define USE_PTHREADS_OR_NOT_NG
#include "Parallelizer.h"

class MyHelper {

	typedef PsimagLite::Concurrency ConcurrencyType;
	typedef PsimagLite::Vector<SizeType>::Type VectorSizeType;

public:

	MyHelper(SizeType ntasks, SizeType nthreads)
	    : ntasks_(ntasks), x_(nthreads,0)
	{}

	SizeType tasks() const { return ntasks_; }

	int result() const
	{
		return x_[0];
	}

	void doTask(SizeType taskNumber, SizeType threadNum)
	{
		x_[threadNum] += taskNumber;
	}

	void sync()
	{
		for (SizeType i = 1; i < x_.size(); ++i)
			x_[0] += x_[i];
	}

private:

	SizeType ntasks_;
	VectorSizeType x_;
}; // class MyHelper


int main(int argc,char *argv[])
{
	typedef PsimagLite::Concurrency ConcurrencyType;


	if (argc!=3) {
		std::cout<<"USAGE: "<<argv[0]<<" nthreads ntasks\n";
		return 1;
	}

	SizeType nthreads  = atoi(argv[1]);
	SizeType ntasks = atoi(argv[2]);

	ConcurrencyType concurrency(&argc,&argv,nthreads);

	typedef MyHelper HelperType;
	typedef PsimagLite::Parallelizer<HelperType> ParallelizerType;

	PsimagLite::CodeSectionParams csp(nthreads, true);
	ParallelizerType threadObject(csp);

	HelperType helper(ntasks, nthreads);

	std::cout<<"Using "<<threadObject.name();
	std::cout<<" with "<<threadObject.threads()<<" threads.\n";
	threadObject.loopCreate(helper);
	helper.sync();
	std::cout<<"Sum of all tasks= "<<helper.result()<<"\n";
}
+1 −1
Original line number Diff line number Diff line
@@ -44,7 +44,7 @@ sub createMakefile
	continuedFractionCollection range kernelPolynomial fit
	linearPrediction options randomTest svd testLapack threads loadImbalance testIsClass
	testMemResolv1 sumDecomposition calculator closuresTest base64test checkRunId
	testLanczos testExcitedLanczos testLanczosMatrixInFile nested testIoNg testIoNgBoolean);
	testLanczos testExcitedLanczos testLanczosMatrixInFile nested testIoNg testIoNgBoolean affinityTest);

	my %args;
	$args{"code"} = "PsimagLite/drivers";
+75 −23
Original line number Diff line number Diff line
@@ -144,19 +144,17 @@ void *thread_function_wrapper(void *dummyPtr)
template<typename PthreadFunctionHolderType, typename LoadBalancerType=LoadBalancerDefault>
class PthreadsNg  {

	static const int MAX_CPUS = 1024;

public:

	typedef LoadBalancerDefault::VectorSizeType VectorSizeType;

	PthreadsNg(const CodeSectionParams& codeSectionParams)
	    : nthreads_(codeSectionParams.npthreads),
	      cores_(1),
	      setAffinities_(codeSectionParams.setAffinities),
	      stackSize_(codeSectionParams.stackSize)
	{
		int cores = sysconf(_SC_NPROCESSORS_ONLN);
		cores_ = (cores > 0) ? cores : 1;
	}
	{}

	bool affinities() const { return setAffinities_; }

@@ -207,6 +205,15 @@ public:
		pthread_attr_t** attr = new pthread_attr_t*[nthreads_];
		SizeType ntasks = pfh.tasks();

#ifndef __APPLE__
//		cpu_set_t cpuset;

//		if (setAffinities_) {
//			pid_t pid = getpid(); // always successfull
//			getPidAffinity(&cpuset, pid);
//		}
#endif

		for (SizeType j=0; j <nthreads_; j++) {
			pfs[j].pfh = &pfh;
			pfs[j].loadBalancer = &loadBalancer;
@@ -229,8 +236,8 @@ public:
			ret = pthread_attr_init(attr[j]);
			checkForError(ret);

			if (setAffinities_)
				setAffinity(attr[j],j,cores_);
			//if (setAffinities_)
			//	setAffinity(attr[j], &cpuset, j);

			ret = pthread_create(&thread_id[j],
			                     attr[j],
@@ -260,34 +267,79 @@ public:

private:

	void setAffinity(pthread_attr_t* attr,
	                 SizeType threadNum,
	                 SizeType cores) const
	void setAffinity(pthread_attr_t* attr, cpu_set_t* cpuset, SizeType threadNum) const
	{
		// pick a cpu from cpuset
		int chosenCpu = getOneCpuFromSet(cpuset);
		if (chosenCpu < 0) {
			std::cerr<<"setAffinity: no cpus left in set for thread "<<threadNum<<"\n";
			return;
		}

		cpu_set_t mynewset;
		CPU_ZERO(&mynewset);
		CPU_SET(chosenCpu, &mynewset); // add cpu to new set
		int ret = pthread_attr_setaffinity_np(attr, sizeof(cpu_set_t), &mynewset);
		checkForError(ret);
		if (ret != 0)
			return;

		std::cerr<<"Threadnum "<<threadNum<<" cpu=" << chosenCpu<<"\n";
		std::cerr.flush();
		// remove cpu from set
		CPU_CLR(chosenCpu, cpuset);
	}

	void getPidAffinity(cpu_set_t* cpuset, pid_t pid) const
	{
#ifndef __APPLE__
		cpu_set_t* cpuset = new cpu_set_t;
		int cpu = threadNum % cores;
		CPU_ZERO(cpuset);
		CPU_SET(cpu,cpuset);
		std::size_t cpusetsize = sizeof(cpu_set_t);
		int ret = pthread_attr_setaffinity_np(attr,cpusetsize,cpuset);
		int ret = sched_getaffinity(pid, sizeof(cpu_set_t), cpuset);
		checkForError(ret);
		// clean up
		delete cpuset;
		cpuset = 0;
#endif
		if (ret != 0) {
			CPU_ZERO(cpuset);
			return;
		}

		int count = CPU_COUNT(cpuset);
		std::cout<<"Pid "<<pid<<": "<<count<<" in sched_getaffinity\n";

		std::vector<SizeType> cpus(MAX_CPUS);
		int total = 0;
		for (int i = 0; i < MAX_CPUS; ++i) {
			if (CPU_ISSET(i, cpuset) == 0) continue;
			cpus[total++] = i;
			if (total == count) break;
		}

		std::cout<<"Pid "<<pid<<": ";
		for (int i = 0; i < total; ++i)
			std::cout<<cpus[i]<<" ";
		std::cout<<"\n";
		std::cout.flush();
	}

	int getOneCpuFromSet(cpu_set_t* cpuset) const
	{
		int count = CPU_COUNT(cpuset);
		if (count == 0) return -1;

		int chosenCpu = -1;
		for (int i = 0; i < MAX_CPUS; ++i) {
			if (CPU_ISSET(i, cpuset) == 0) continue;
			chosenCpu = i;
			break;
		}

		return chosenCpu;
	}

	void checkForError(int ret) const
	{
		if (ret == 0) return;
#ifdef _GNU_SOURCE
		std::cerr<<"PthreadsNg ERROR: "<<strerror(ret)<<"\n";
#endif
	}

	SizeType nthreads_;
	SizeType cores_;
	bool setAffinities_;
	size_t stackSize_;
}; // PthreadsNg class