Commit 7514526c authored by Alvarez, Gonzalo's avatar Alvarez, Gonzalo
Browse files

Parallelizer2 with C++11 lambda and test added

parent f520ccdf
Loading
Loading
Loading
Loading
+28 −0
Original line number Diff line number Diff line
#include "Parallelizer2.h"
#include "Vector.h"

int main(int argc, char** argv)
{
	if (argc < 3) return 1;

	const SizeType n = atoi(argv[1]);
	const SizeType threads = atoi(argv[2]);

	PsimagLite::Vector<double>::Type v(n);

	PsimagLite::Parallizer2<> parallelizer(threads);

	parallelizer.parallelFor([&v](SizeType i, SizeType)
	{
		v[i] = i + 42;        // <<<--- body of the loop
	},
	n);                       // <<<---- total of the loop

	/*
	for (SizeType i = 0; i < n; ++i) {
		v[i] = i + 42;
	}
	*/

	std::cout<<v[0]<<" "<<v[n - 1]<<"\n";
}

src/Parallelizer2.h

0 → 100644
+11 −0
Original line number Diff line number Diff line
#ifndef PARALLELIZER2_H
#define PARALLELIZER2_H
#include "Vector.h"

#ifdef USE_PTHREADS
#include "Parallelizer2Pthread.h"
#else
#include "Parallelizer2Serial.h"
#endif

#endif // PARALLELIZER2_H
+153 −0
Original line number Diff line number Diff line
#ifndef PARALLELIZER2PTHREAD_H
#define PARALLELIZER2PTHREAD_H
#include <pthread.h>
#include <iostream>
#include <algorithm>
#include "Vector.h"
#include <sched.h>
#include <unistd.h>
#include "TypeToString.h"
#include "CodeSectionParams.h"
#include "LoadBalancerDefault.h"

#ifdef __linux__
#include <sys/types.h>
#include <sys/syscall.h>
#endif

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#ifdef _GNU_SOURCE
#include <errno.h>
#include <string.h>
#endif

template<typename SomeLambdaType,
         typename LoadBalancerType=PsimagLite::LoadBalancerDefault
         >
struct PthreadFunctionStruct {
	PthreadFunctionStruct()
	    : pfh(0),loadBalancer(0),threadNum(0),nthreads(0),total(0),cpu(0)
	{}

	const SomeLambdaType* pfh;
	const LoadBalancerType* loadBalancer;
	int threadNum;
	SizeType nthreads;
	SizeType total;
	SizeType cpu;
};

template<typename SomeLambdaType>
void *thread_function_wrapper(void *dummyPtr)
{
	PthreadFunctionStruct<SomeLambdaType> *pfs =
	        static_cast<PthreadFunctionStruct<SomeLambdaType> *>(dummyPtr);

	const SomeLambdaType* pfh = pfs->pfh;

	int s = 0;
#ifdef __linux__
	s = sched_getcpu();
#endif
	if (s >= 0) pfs->cpu = s;

	SizeType blockSize = pfs->loadBalancer->blockSize(pfs->threadNum);

	for (SizeType p = 0; p < blockSize; ++p) {
		SizeType taskNumber = pfs->loadBalancer->taskNumber(pfs->threadNum, p);
		if (taskNumber > pfs->total) break;
		(*pfh)(taskNumber, pfs->threadNum);
	}

	return 0;
}

namespace PsimagLite {

template<typename LoadBalancerType=LoadBalancerDefault>
class Parallizer2 {

public:

	typedef LoadBalancerDefault::VectorSizeType VectorSizeType;

	Parallizer2(SizeType nthreads)
	    : nthreads_(nthreads) {}

	// no weights, no balancer ==> create weights, set all weigths to 1, delegate
	template<typename SomeLambdaType>
	void parallelFor(const SomeLambdaType& lambda,
	                 SizeType ntasks)
	{
		VectorSizeType weights(ntasks,1);
		parallelFor(lambda, weights, ntasks);
	}

	// weights, no balancer ==> create balancer with weights ==> delegate
	template<typename SomeLambdaType>
	void parallelFor(const SomeLambdaType& lambda,
	                 const VectorSizeType& weights,
	                 SizeType ntasks)
	{
		LoadBalancerType* loadBalancer = new LoadBalancerType(weights, nthreads_);
		parallelFor(lambda, *loadBalancer, ntasks);
		delete loadBalancer;
		loadBalancer = 0;
	}

	template<typename SomeLambdaType>
	void parallelFor(const SomeLambdaType& lambda,
	                 const LoadBalancerType& loadBalancer,
	                 SizeType ntasks)
	{
		PthreadFunctionStruct<SomeLambdaType>* pfs =
		new PthreadFunctionStruct<SomeLambdaType>[nthreads_];
		pthread_t* thread_id = new pthread_t[nthreads_];
		pthread_attr_t** attr = new pthread_attr_t*[nthreads_];

		for (SizeType j = 0; j < nthreads_; ++j) {
			pfs[j].pfh = &lambda;
			pfs[j].loadBalancer = &loadBalancer;
			pfs[j].threadNum = j;
			pfs[j].total = ntasks;
			pfs[j].nthreads = nthreads_;

			attr[j] = new pthread_attr_t;

			int ret = pthread_attr_init(attr[j]);
			checkForError(ret);

			ret = pthread_create(&thread_id[j],
			                     attr[j],
			                     thread_function_wrapper<SomeLambdaType>,
			                     &pfs[j]);
			checkForError(ret);
		}

		for (SizeType j=0; j <nthreads_; ++j) pthread_join(thread_id[j], 0);
		for (SizeType j=0; j <nthreads_; ++j) {
			int ret = pthread_attr_destroy(attr[j]);
			checkForError(ret);
			delete attr[j];
			attr[j] = 0;
		}

		delete [] attr;
		delete [] thread_id;
		delete [] pfs;
	}

private:

	void checkForError(int ret) const
	{
		if (ret == 0) return;
		std::cerr<<"PthreadsNg ERROR: "<<strerror(ret)<<"\n";
	}

	SizeType nthreads_;
};
}
#endif // PARALLELIZER2PTHREAD_H
+26 −0
Original line number Diff line number Diff line
#ifndef PARALLELIZER2SERIAL_H
#define PARALLELIZER2SERIAL_H
#include "Vector.h"

namespace PsimagLite {

template<typename = int>
class Parallizer2 {

public:

	Parallizer2(SizeType threads)
	{
		if (threads != 1)
			throw RuntimeError("Please compile with -DUSE_PTHREADS\n");
	}

	template<typename SomeLambdaType>
	void parallelFor(const SomeLambdaType& lambda, SizeType n)
	{
		for (SizeType i = 0; i < n; ++i)
			lambda(i, 0);
	}
};
}
#endif // PARALLELIZER2SERIAL_H