Skip to content
Snippets Groups Projects
Commit 13be5b17 authored by Lynch, Vickie's avatar Lynch, Vickie
Browse files

Refs #13350 gather mpi workspaces

parent 674c5e02
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,7 @@ include_directories( ${Boost_INCLUDE_DIRS} ) ...@@ -23,7 +23,7 @@ include_directories( ${Boost_INCLUDE_DIRS} )
# Add a definition that's used to guard MPI-specific parts of the main code # Add a definition that's used to guard MPI-specific parts of the main code
add_definitions ( -DMPI_BUILD ) add_definitions ( -DMPI_BUILD )
add_subdirectory( ${CMAKE_CURRENT_SOURCE_DIR}/Testing/SystemTests/scripts ) #add_subdirectory( ${CMAKE_CURRENT_SOURCE_DIR}/Testing/SystemTests/scripts )
# Add the ability to build a 'mantid-mpi' rpm # Add the ability to build a 'mantid-mpi' rpm
set ( CPACK_PACKAGE_NAME mantid-mpi ) set ( CPACK_PACKAGE_NAME mantid-mpi )
......
...@@ -67,6 +67,7 @@ protected: ...@@ -67,6 +67,7 @@ protected:
getProcessProperties(const std::string &propertyManager=std::string()) const; getProcessProperties(const std::string &propertyManager=std::string()) const;
/// MPI option. If false, we will use one job event if MPI is available /// MPI option. If false, we will use one job event if MPI is available
bool m_useMPI; bool m_useMPI;
Workspace_sptr assemble(Workspace_sptr partialWS);
Workspace_sptr assemble(const std::string &partialWSName, Workspace_sptr assemble(const std::string &partialWSName,
const std::string &outputWSName); const std::string &outputWSName);
void saveNexus(const std::string &outputWSName, void saveNexus(const std::string &outputWSName,
...@@ -91,10 +92,15 @@ protected: ...@@ -91,10 +92,15 @@ protected:
/// Add a matrix workspace to another matrix workspace /// Add a matrix workspace to another matrix workspace
MatrixWorkspace_sptr plus(const MatrixWorkspace_sptr lhs, MatrixWorkspace_sptr plus(const MatrixWorkspace_sptr lhs,
const MatrixWorkspace_sptr rhs); const MatrixWorkspace_sptr rhs);
/// Add a single value to a matrix workspace /// Add a single value to a matrix workspace
MatrixWorkspace_sptr plus(const MatrixWorkspace_sptr lhs, MatrixWorkspace_sptr plus(const MatrixWorkspace_sptr lhs,
const double &rhsValue); const double &rhsValue);
/// Add a matrix workspace to another matrix workspace
MatrixWorkspace_sptr plus2(MatrixWorkspace_sptr lhs,
MatrixWorkspace_sptr rhs);
/// Subract a matrix workspace by another matrix workspace /// Subract a matrix workspace by another matrix workspace
MatrixWorkspace_sptr minus(const MatrixWorkspace_sptr lhs, MatrixWorkspace_sptr minus(const MatrixWorkspace_sptr lhs,
const MatrixWorkspace_sptr rhs); const MatrixWorkspace_sptr rhs);
......
...@@ -196,6 +196,32 @@ MatrixWorkspace_sptr DataProcessorAlgorithm::loadChunk(const size_t rowIndex) { ...@@ -196,6 +196,32 @@ MatrixWorkspace_sptr DataProcessorAlgorithm::loadChunk(const size_t rowIndex) {
"DataProcessorAlgorithm::loadChunk is not implemented"); "DataProcessorAlgorithm::loadChunk is not implemented");
} }
/**
* Assemble the partial workspaces from all MPI processes
* @param partialWS :: workspace to assemble
* thread only)
*/
Workspace_sptr
DataProcessorAlgorithm::assemble(Workspace_sptr partialWS) {
Workspace_sptr outputWS =partialWS;
#ifdef MPI_BUILD
IAlgorithm_sptr gatherAlg = createChildAlgorithm("GatherWorkspaces");
gatherAlg->setLogging(true);
gatherAlg->setAlwaysStoreInADS(true);
gatherAlg->setProperty("InputWorkspace", partialWS);
gatherAlg->setProperty("PreserveEvents", true);
gatherAlg->setPropertyValue("OutputWorkspace", "_total");
gatherAlg->execute();
if (isMainThread())
{
outputWS = AnalysisDataService::Instance().retrieve("_total");
}
#endif
return outputWS;
}
/** /**
* Assemble the partial workspaces from all MPI processes * Assemble the partial workspaces from all MPI processes
* @param partialWSName :: Name of the workspace to assemble * @param partialWSName :: Name of the workspace to assemble
...@@ -451,6 +477,20 @@ DataProcessorAlgorithm::plus(const MatrixWorkspace_sptr lhs, ...@@ -451,6 +477,20 @@ DataProcessorAlgorithm::plus(const MatrixWorkspace_sptr lhs,
"Plus", lhs, rhs); "Plus", lhs, rhs);
} }
/**
* Add a matrix workspace to another matrix workspace
* @param lhs :: the workspace on the left hand side of the addition symbol
* @param rhs :: the workspace on the right hand side of the addition symbol
* @return matrix workspace resulting from the operation
*/
MatrixWorkspace_sptr
DataProcessorAlgorithm::plus2(MatrixWorkspace_sptr lhs,
MatrixWorkspace_sptr rhs) {
return this->executeBinaryAlgorithm<
MatrixWorkspace_sptr, MatrixWorkspace_sptr, MatrixWorkspace_sptr>(
"Plus", lhs, rhs);
}
/** /**
* Add a single value to another matrix workspace * Add a single value to another matrix workspace
* @param lhs :: the workspace on the left hand side of the addition symbol * @param lhs :: the workspace on the left hand side of the addition symbol
......
...@@ -70,7 +70,7 @@ void GatherWorkspaces::init() { ...@@ -70,7 +70,7 @@ void GatherWorkspaces::init() {
// Output is optional - only the root process will output a workspace // Output is optional - only the root process will output a workspace
declareProperty(new WorkspaceProperty<>( declareProperty(new WorkspaceProperty<>(
"OutputWorkspace", "", Direction::Output, PropertyMode::Optional)); "OutputWorkspace", "", Direction::Output, PropertyMode::Optional));
declareProperty("PreserveEvents", false, declareProperty("PreserveEvents", true,
"Keep the output workspace as an EventWorkspace, if the " "Keep the output workspace as an EventWorkspace, if the "
"input has events (default).\n" "input has events (default).\n"
"If false, then the workspace gets converted to a " "If false, then the workspace gets converted to a "
...@@ -182,7 +182,7 @@ void GatherWorkspaces::exec() { ...@@ -182,7 +182,7 @@ void GatherWorkspaces::exec() {
outputWorkspace->dataE(wi) = inputWorkspace->readE(wi); outputWorkspace->dataE(wi) = inputWorkspace->readE(wi);
const int numReqs(3 * (included.size() - 1)); const int numReqs(3 * (included.size() - 1));
mpi::request reqs[numReqs]; std::vector<boost::mpi::request> reqs(numReqs);
int j(0); int j(0);
// Receive data from all the other processes // Receive data from all the other processes
...@@ -201,7 +201,7 @@ void GatherWorkspaces::exec() { ...@@ -201,7 +201,7 @@ void GatherWorkspaces::exec() {
} }
// Make sure everything's been received before exiting the algorithm // Make sure everything's been received before exiting the algorithm
mpi::wait_all(reqs, reqs + numReqs); mpi::wait_all(reqs.begin(), reqs.end());
} }
ISpectrum *outSpec = outputWorkspace->getSpectrum(wi); ISpectrum *outSpec = outputWorkspace->getSpectrum(wi);
outSpec->clearDetectorIDs(); outSpec->clearDetectorIDs();
...@@ -211,7 +211,7 @@ void GatherWorkspaces::exec() { ...@@ -211,7 +211,7 @@ void GatherWorkspaces::exec() {
reduce(included, inputWorkspace->readY(wi), vplus(), 0); reduce(included, inputWorkspace->readY(wi), vplus(), 0);
reduce(included, inputWorkspace->readE(wi), eplus(), 0); reduce(included, inputWorkspace->readE(wi), eplus(), 0);
} else if (accum == "Append") { } else if (accum == "Append") {
mpi::request reqs[3]; std::vector<boost::mpi::request> reqs(3);
// Send the spectrum to the root process // Send the spectrum to the root process
reqs[0] = included.isend(0, 0, inputWorkspace->readX(0)); reqs[0] = included.isend(0, 0, inputWorkspace->readX(0));
...@@ -219,7 +219,7 @@ void GatherWorkspaces::exec() { ...@@ -219,7 +219,7 @@ void GatherWorkspaces::exec() {
reqs[2] = included.isend(0, 2, inputWorkspace->readE(0)); reqs[2] = included.isend(0, 2, inputWorkspace->readE(0));
// Make sure the sends have completed before exiting the algorithm // Make sure the sends have completed before exiting the algorithm
mpi::wait_all(reqs, reqs + 3); mpi::wait_all(reqs.begin(), reqs.end());
} }
} }
} }
......
...@@ -200,8 +200,7 @@ void LoadEventAndCompress::exec() { ...@@ -200,8 +200,7 @@ void LoadEventAndCompress::exec() {
m_chunkingTable = determineChunk(m_filename); m_chunkingTable = determineChunk(m_filename);
// first run is free // first run is free
EventWorkspace_sptr resultWS = MatrixWorkspace_sptr resultWS = loadChunk(0);
boost::dynamic_pointer_cast<EventWorkspace>(loadChunk(0));
processChunk(resultWS); processChunk(resultWS);
// load the other chunks // load the other chunks
...@@ -209,18 +208,14 @@ void LoadEventAndCompress::exec() { ...@@ -209,18 +208,14 @@ void LoadEventAndCompress::exec() {
for (size_t i = 1; i < numRows; ++i) { for (size_t i = 1; i < numRows; ++i) {
MatrixWorkspace_sptr temp = loadChunk(i); MatrixWorkspace_sptr temp = loadChunk(i);
processChunk(temp); processChunk(temp);
auto alg = createChildAlgorithm("Plus"); resultWS = plus2(resultWS,temp);
alg->setProperty("LHSWorkspace", resultWS);
alg->setProperty("RHSWorkspace", temp);
alg->setProperty("OutputWorkspace", resultWS);
alg->setProperty("ClearRHSWorkspace", true);
alg->executeAsChildAlg();
} }
Workspace_sptr total = assemble(resultWS);
// Don't bother compressing combined workspace. DetermineChunking is designed // Don't bother compressing combined workspace. DetermineChunking is designed
// to prefer loading full banks so no further savings should be available. // to prefer loading full banks so no further savings should be available.
setProperty("OutputWorkspace", resultWS); setProperty("OutputWorkspace", total);
} }
} // namespace WorkflowAlgorithms } // namespace WorkflowAlgorithms
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment