Unverified Commit e0b797bb authored by Atkins, Charles Vernon's avatar Atkins, Charles Vernon Committed by GitHub
Browse files

Merge pull request #362 from eisenhauer/sst_marshalling

Preliminary FFS-based MxN data transfer for global arrays
parents fda62168 d98477e6
<?xml version="1.0"?>
<!-- Config XML file fo the
- heatTransfer_write_adios2
- heatTransfer_read
executables in build/bin -->
<adios-config>
<!--====================================
Configuration for the Writer
====================================-->
<io name="writer">
<engine type="SST">
<!-- for vectorized memory operations, make sure your system
enables threads-->
<parameter key="Threads" value="2"/>
<!-- Microseconds (default), Milliseconds, Seconds,
Minutes, Hours -->
<parameter key="ProfileUnits" value="Microseconds"/>
</engine>
<transport type="File">
<!-- POSIX, stdio (C FILE*), fstream (C++) -->
<parameter key="Library" value="MPI"/>
<!-- For read/write, Microseconds (default), Milliseconds, Seconds,
Minutes, Hours. open/close always in Microseconds -->
<parameter key="ProfileUnits" value="Microseconds"/>
</transport>
</io>
<!--====================================
Configuration for the Reader
====================================-->
<io name="reader">
<engine type="SST">
<!-- for vectorized memory operations, make sure your system
enables threads-->
<parameter key="Threads" value="2"/>
<!-- Microseconds (default), Milliseconds, Seconds,
Minutes, Hours -->
<parameter key="ProfileUnits" value="Microseconds"/>
</engine>
<transport type="File">
<!-- POSIX, stdio (C FILE*), fstream (C++) -->
<parameter key="Library" value="POSIX"/>
<!-- For read/write, Microseconds (default), Milliseconds, Seconds,
Minutes, Hours. open/close always in Microseconds -->
<parameter key="ProfileUnits" value="Microseconds"/>
</transport>
</io>
</adios-config>
......@@ -8,6 +8,7 @@
* Author: Greg Eisenhauer
*/
#include "adios2/helper/adiosFunctions.h"
#include <cstring>
#include <string>
......@@ -25,10 +26,80 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
std::strcpy(cstr, name.c_str());
m_Input = SstReaderOpen(cstr, NULL, mpiComm);
auto varCallback = [](void *reader, const char *variableName,
const char *type, void *data) {
std::string Type(type);
typename SstReader::SstReader *Reader =
reinterpret_cast<typename SstReader::SstReader *>(reader);
if (Type == "compound")
{
return (void *)NULL;
}
#define declare_type(T) \
else if (Type == GetType<T>()) \
{ \
Variable<T> *variable = \
&(Reader->m_IO.DefineVariable<T>(variableName)); \
variable->SetData((T *)data); \
variable->m_AvailableStepsCount = 1; \
return (void *)variable; \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
return (void *)NULL;
};
auto arrayCallback = [](void *reader, const char *variableName,
const char *type, int DimCount, size_t *Shape,
size_t *Start, size_t *Count) {
std::vector<size_t> VecShape;
std::vector<size_t> VecStart;
std::vector<size_t> VecCount;
std::string Type(type);
typename SstReader::SstReader *Reader =
reinterpret_cast<typename SstReader::SstReader *>(reader);
for (int i = 0; i < DimCount; i++)
{
VecShape.push_back(Shape[i]);
VecStart.push_back(Start[i]);
VecCount.push_back(Count[i]);
}
if (Type == "compound")
{
return (void *)NULL;
}
#define declare_type(T) \
else if (Type == GetType<T>()) \
{ \
Variable<T> *variable = &(Reader->m_IO.DefineVariable<T>( \
variableName, VecShape, VecStart, VecCount)); \
variable->m_AvailableStepsCount = 1; \
return (void *)variable; \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
return (void *)NULL;
};
SstReaderInitCallback(m_Input, this, varCallback, arrayCallback);
Init();
delete[] cstr;
}
StepStatus SstReader::BeginStep(StepMode mode, const float timeout_sec)
{
return (StepStatus)SstAdvanceStep(m_Input, (int)mode, timeout_sec);
}
void SstReader::EndStep()
{
m_IO.RemoveAllVariables();
SstReleaseStep(m_Input);
}
void SstReader::Close(const int transportIndex) { SstReaderClose(m_Input); }
// PRIVATE
......@@ -37,4 +108,29 @@ void SstReader::Init()
auto itRealTime = m_IO.m_Parameters.find("real_time");
}
#define declare_type(T) \
void SstReader::DoGetSync(Variable<T> &variable, T *data) \
{ \
SstGetDeferred(m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
SstPerformGets(m_Input); \
} \
void SstReader::DoGetDeferred(Variable<T> &variable, T *data) \
{ \
SstGetDeferred(m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
} \
void SstReader::DoGetDeferred(Variable<T> &variable, T &data) \
{ \
SstGetDeferred(m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), &data); \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
void SstReader::PerformGets() { SstPerformGets(m_Input); }
} // end namespace adios
......@@ -42,11 +42,29 @@ public:
virtual ~SstReader() = default;
StepStatus BeginStep();
StepStatus BeginStep(StepMode mode, const float timeoutSeconds = 0.f);
void EndStep();
void Close(const int transportIndex = -1);
void PerformGets();
private:
void Init();
SstStream m_Input;
#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
void DoGetDeferred(Variable<T> &, T *) final; \
void DoGetDeferred(Variable<T> &, T &) final;
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
template <class T>
void GetSyncCommon(Variable<T> &variable, T *data);
template <class T>
void GetDeferredCommon(Variable<T> &variable, T *data);
};
} // end namespace adios
......
......@@ -32,11 +32,33 @@ SstWriter::SstWriter(IO &io, const std::string &name, const Mode mode,
StepStatus SstWriter::BeginStep(StepMode mode, const float timeout_sec)
{
if (m_FFSmarshal)
{
return (StepStatus)SstWriterBeginStep(m_Output, (int)mode, timeout_sec);
}
else
{
// When BP marshalling/unmarshaling complete, this should call
// SstProvideTimestep and clean up at this level
}
return StepStatus::OK;
}
void SstWriter::EndStep() {}
void SstWriter::EndStep()
{
if (m_FFSmarshal)
{
SstWriterEndStep(m_Output);
}
else
{
// When BP marshalling/unmarshaling complete, this should call
// SstProvideTimestep and clean up at this level
}
}
void SstWriter::Close(const int transportIndex) { SstWriterClose(m_Output); }
void SstWriter::PerformPuts() {}
// PRIVATE functions below
void SstWriter::Init()
......@@ -56,75 +78,19 @@ void SstWriter::Init()
}
}
};
// lf_SetBoolParameter("real_time", m_DoRealTime);
// lf_SetBoolParameter("monitoring", m_DoMonitor);
// if (m_DoRealTime)
// {
// /**
// * Lambda function that assigns a parameter in m_Method to a
// * localVariable
// * of type std::string
// */
// auto lf_AssignString = [&](const std::string parameter,
// std::string &localVariable) {
// auto it = m_IO.m_Parameters.find(parameter);
// if (it != m_IO.m_Parameters.end())
// {
// localVariable = it->second;
// }
// };
// /**
// * Lambda function that assigns a parameter in m_Method to a
// * localVariable
// * of type int
// */
// auto lf_AssignInt = [&](const std::string parameter,
// int &localVariable) {
// auto it = m_IO.m_Parameters.find(parameter);
// if (it != m_IO.m_Parameters.end())
// {
// localVariable = std::stoi(it->second);
// }
// };
// auto lf_IsNumber = [](const std::string &s) {
// return !s.empty() && std::find_if(s.begin(), s.end(), [](char c)
// {
// return !std::isdigit(c);
// }) == s.end();
// };
// json jmsg;
// for (const auto &i : m_IO.m_Parameters)
// {
// if (lf_IsNumber(i.second))
// {
// jmsg[i.first] = std::stoi(i.second);
// }
// else
// {
// jmsg[i.first] = i.second;
// }
// }
// jmsg["stream_mode"] = "sender";
// m_Man.add_stream(jmsg);
// std::string method_type;
// lf_AssignString("method_type", method_type);
// int num_channels = 0;
// lf_AssignInt("num_channels", num_channels);
// }
lf_SetBoolParameter("FFSmarshal", m_FFSmarshal);
}
#define declare_type(T) \
void SstWriter::DoPutSync(Variable<T> &variable, const T *values) \
{ \
PutSyncCommon(variable, values); \
}
} \
void SstWriter::DoPutDeferred(Variable<T> &variable, const T *values) \
{ \
PutSyncCommon(variable, values); \
} \
void SstWriter::DoPutDeferred(Variable<T> &, const T &value) {}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
......
......@@ -34,6 +34,7 @@ public:
virtual ~SstWriter() = default;
StepStatus BeginStep(StepMode mode, const float timeoutSeconds = 0.f) final;
void PerformPuts() final;
void EndStep() final;
void Close(const int transportIndex = -1) final;
......@@ -43,14 +44,20 @@ private:
/// called from constructor
#define declare_type(T) \
void DoPutSync(Variable<T> &variable, const T *values) final;
void DoPutSync(Variable<T> &variable, const T *values) final; \
void DoPutDeferred(Variable<T> &, const T *) final; \
void DoPutDeferred(Variable<T> &, const T &) final;
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
template <class T>
void PutSyncCommon(Variable<T> &variable, const T *values);
template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *values);
SstStream m_Output;
bool m_FFSmarshal = true;
};
} // end namespace adios
......
......@@ -35,11 +35,55 @@ void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values)
variable.m_Start.assign(variable.m_Count.size(), 0);
}
std::cout << "I am hooked to the Sst library\n";
std::cout << "Variable " << variable.m_Name << "\n";
std::cout << "putshape " << variable.m_Count.size() << "\n";
std::cout << "varshape " << variable.m_Shape.size() << "\n";
std::cout << "offset " << variable.m_Start.size() << "\n";
std::cout << "Shape ID ";
switch (variable.m_ShapeID)
{
case ShapeID::GlobalValue:
std::cout << "GlobalValue : ";
break;
case ShapeID::GlobalArray:
std::cout << "GlobalArray : ";
break;
case ShapeID::JoinedArray:
std::cout << "JoinedArray : ";
break;
case ShapeID::LocalValue:
std::cout << "LocalValue : ";
break;
case ShapeID::LocalArray:
std::cout << "LocalArray : ";
break;
}
std::cout << "putshape ";
for (auto it = variable.m_Count.begin(); it != variable.m_Count.end(); ++it)
{
std::cout << ' ' << *it;
}
std::cout << '\n';
std::cout << "varshape ";
for (auto it = variable.m_Shape.begin(); it != variable.m_Shape.end(); ++it)
{
std::cout << ' ' << *it;
}
std::cout << '\n';
std::cout << "offsets ";
for (auto it = variable.m_Start.begin(); it != variable.m_Start.end(); ++it)
{
std::cout << ' ' << *it;
}
std::cout << '\n';
if (m_FFSmarshal)
{
SstMarshal(m_Output, (void *)&variable, variable.m_Name.c_str(),
variable.m_Type.c_str(), variable.m_ElementSize,
variable.m_Shape.size(), variable.m_Shape.data(),
variable.m_Count.data(), variable.m_Start.data(), values);
}
else
{
// Do BP marshaling
}
}
} // end namespace adios
......
......@@ -3,6 +3,7 @@ add_library(sst
dp/dummy_dp.c
cp/cp.c
cp/cp_common.c
cp/ffs_marshal.c
)
target_compile_features(sst PRIVATE c_std_99)
......
......@@ -252,6 +252,15 @@ static void waitForReaderResponse(WS_ReaderInfo Reader)
CP_verbose(Stream, "Reader ready on WSR %p, Stream established.\n", Reader);
}
static char *TrimSuffix(const char *Name)
{
char *Ret = strdup(Name);
int Len = strlen(Name);
if (strcmp(Name + Len - 3, ".bp") == 0)
Ret[Len - 3] = 0;
return Ret;
}
SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
{
SstStream Stream;
......@@ -260,6 +269,7 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
Stream->Role = WriterRole;
CP_parseParams(Stream, params);
char *Filename = TrimSuffix(Name);
Stream->DP_Interface = LoadDP("dummy");
Stream->CPInfo = CP_getCPInfo(Stream->DP_Interface);
......@@ -282,10 +292,10 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
if (Stream->Rank == 0)
{
writeContactInfo(Name, Stream);
writeContactInfo(Filename, Stream);
}
CP_verbose(Stream, "Opening Stream \"%s\"\n", Name);
CP_verbose(Stream, "Opening Stream \"%s\"\n", Filename);
if (Stream->WaitForFirstReader)
{
......@@ -315,7 +325,7 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
MPI_Barrier(Stream->mpiComm);
gettimeofday(&Stream->ValidStartTime, NULL);
}
CP_verbose(Stream, "Finish opening Stream \"%s\"\n", Name);
CP_verbose(Stream, "Finish opening Stream \"%s\"\n", Filename);
return Stream;
}
......@@ -333,6 +343,7 @@ void sendOneToEachReaderRank(SstStream s, CMFormat f, void *Msg,
/* add the reader-rank-specific Stream identifier to each outgoing
* message */
*RS_StreamPtr = CP_WSR_Stream->Connections[peer].RemoteStreamID;
CP_verbose(s, "Sending a message to reader %d\n", peer);
CMwrite(conn, f, Msg);
j++;
}
......@@ -366,8 +377,41 @@ void SstWriterClose(SstStream Stream)
CP_verbose(Stream, "All timesteps are released in WriterClose\n");
}
void SstProvideTimestep(SstStream s, SstMetadata LocalMetadata, SstData Data,
long Timestep)
static FFSFormatList AddUniqueFormats(FFSFormatList List,
FFSFormatList Candidates)
{
FFSFormatList Tmp = List;
FFSFormatList Ret = List;
// If nothing to add, return original
if (!Candidates)
return Ret;
// Add tail of candidates list first
Ret = AddUniqueFormats(List, Candidates->Next);
while (Tmp)
{
if ((Tmp->FormatIDRepLen == Candidates->FormatIDRepLen) &&
(memcmp(Tmp->FormatIDRep, Candidates->FormatIDRep,
Tmp->FormatIDRepLen) == 0))
{
// Identical format already in List, don't add this one
return Ret;
}
Tmp = Tmp->Next;
}
// New format not in list, add him to head and return.
// This is destructive of candidates list, but that is unimportant for
// deallocation in this circumstance.
Candidates->Next = Ret;
return Candidates;
}
extern void SstInternalProvideTimestep(SstStream s, SstData LocalMetadata,
SstData Data, long Timestep,
FFSFormatList Formats,
void *DataFreeFunc, void *FreeClientData)
{
void *data_block;
MetadataPlusDPInfo *pointers;
......@@ -375,18 +419,20 @@ void SstProvideTimestep(SstStream s, SstMetadata LocalMetadata, SstData Data,
void *DP_TimestepInfo = NULL;
struct _MetadataPlusDPInfo Md;
CPTimestepList Entry = malloc(sizeof(struct _CPTimestepEntry));
FFSFormatList XmitFormats = NULL;
s->DP_Interface->provideTimestep(&Svcs, s->DP_Stream, Data, LocalMetadata,
Timestep, &DP_TimestepInfo);
Md.Metadata = LocalMetadata;
Md.Formats = Formats;
Md.Metadata = (SstBlock)LocalMetadata;
Md.DP_TimestepInfo = DP_TimestepInfo;
pointers = (MetadataPlusDPInfo *)CP_consolidateDataToAll(
s, &Md, s->CPInfo->PerRankMetadataFormat, &data_block);
Msg.CohortSize = s->CohortSize;
Msg.Timestep = s->WriterTimestep++;
Msg.Timestep = s->WriterTimestep;
/* separate metadata and DP_info to separate arrays */
Msg.Metadata = malloc(s->CohortSize * sizeof(void *));
......@@ -398,12 +444,14 @@ void SstProvideTimestep(SstStream s, SstMetadata LocalMetadata, SstData Data,
Msg.DP_TimestepInfo[i] = pointers[i]->DP_TimestepInfo;
if (pointers[i]->DP_TimestepInfo == NULL)
NullCount++;
XmitFormats = AddUniqueFormats(XmitFormats, pointers[i]->Formats);
}
if (NullCount == s->CohortSize)
{
free(Msg.DP_TimestepInfo);
Msg.DP_TimestepInfo = NULL;
}
Msg.Formats = XmitFormats;
CP_verbose(s,
"Sending TimestepMetadata for timestep %d, one to each reader\n",
......@@ -418,6 +466,8 @@ void SstProvideTimestep(SstStream s, SstMetadata LocalMetadata, SstData Data,
Entry->Timestep = Timestep;
Entry->MetadataArray = Msg.Metadata;
Entry->DP_TimestepInfo = Msg.DP_TimestepInfo;
Entry->DataFreeFunc = DataFreeFunc;
Entry->FreeClientData = FreeClientData;
Entry->Next = s->QueuedTimesteps;
s->QueuedTimesteps = Entry;
s->QueuedTimestepCount++;
......@@ -463,6 +513,7 @@ SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm)
struct _ReaderActivateMsg Msg;
struct timeval Start, Stop, Diff;
int i;
char *Filename = TrimSuffix(Name);
Stream = CP_newStream();
Stream->Role = ReaderRole;
......@@ -477,7 +528,6 @@ SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm)
MPI_Comm_rank(Stream->mpiComm, &Stream->Rank);
MPI_Comm_size(Stream->mpiComm, &Stream->CohortSize);
printf("READER COHORT SIZE %d\n", Stream->CohortSize);
Stream->DP_Stream =
Stream->DP_Interface->initReader(&Svcs, Stream, &dpInfo);
......@@ -490,7 +540,7 @@ SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm)
if (Stream->Rank == 0)
{
char *writer_0_contact = readContactInfo(Name, Stream);
char *writer_0_contact = readContactInfo(Filename, Stream);
void *writer_file_ID;
char *cm_contact_string =
malloc(strlen(writer_0_contact)); /* at least long enough */
......@@ -527,7 +577,7 @@ SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm)
CP_verbose(
Stream,
"Waiting for writer response message in SstReadOpen(\"%s\")\n",
Name, reader_register.WriterResponseCondition);
Filename, reader_register.WriterResponseCondition);
CMCondition_wait(Stream->CPInfo->cm,
reader_register.WriterResponseCondition);
CP_verbose(Stream,
......@@ -553,6 +603,7 @@ SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm)
// ReturnData, 1024000);
// printf("\n");
Stream->WriterCohortSize = ReturnData->WriterCohortSize;
Stream->ConnectionsToWriter =
calloc(sizeof(CP_PeerConnection), ReturnData->WriterCohortSize);
for (i = 0; i < ReturnData->WriterCohortSize; i++)
......@@ -584,10 +635,11 @@ SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm)