Unverified Commit b50a6bd2 authored by williamfgc's avatar williamfgc Committed by GitHub
Browse files

Merge branch 'release' into docs

parents 799556ef e0b797bb
<?xml version="1.0"?>
<!-- Config XML file fo the
- heatTransfer_write_adios2
- heatTransfer_read
executables in build/bin -->
<adios-config>
<!--====================================
Configuration for the Writer
====================================-->
<io name="writer">
<engine type="SST">
<!-- for vectorized memory operations, make sure your system
enables threads-->
<parameter key="Threads" value="2"/>
<!-- Microseconds (default), Milliseconds, Seconds,
Minutes, Hours -->
<parameter key="ProfileUnits" value="Microseconds"/>
</engine>
<transport type="File">
<!-- POSIX, stdio (C FILE*), fstream (C++) -->
<parameter key="Library" value="MPI"/>
<!-- For read/write, Microseconds (default), Milliseconds, Seconds,
Minutes, Hours. open/close always in Microseconds -->
<parameter key="ProfileUnits" value="Microseconds"/>
</transport>
</io>
<!--====================================
Configuration for the Reader
====================================-->
<io name="reader">
<engine type="SST">
<!-- for vectorized memory operations, make sure your system
enables threads-->
<parameter key="Threads" value="2"/>
<!-- Microseconds (default), Milliseconds, Seconds,
Minutes, Hours -->
<parameter key="ProfileUnits" value="Microseconds"/>
</engine>
<transport type="File">
<!-- POSIX, stdio (C FILE*), fstream (C++) -->
<parameter key="Library" value="POSIX"/>
<!-- For read/write, Microseconds (default), Milliseconds, Seconds,
Minutes, Hours. open/close always in Microseconds -->
<parameter key="ProfileUnits" value="Microseconds"/>
</transport>
</io>
</adios-config>
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
* Author: Greg Eisenhauer * Author: Greg Eisenhauer
*/ */
#include "adios2/helper/adiosFunctions.h"
#include <cstring> #include <cstring>
#include <string> #include <string>
...@@ -25,10 +26,80 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode, ...@@ -25,10 +26,80 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
std::strcpy(cstr, name.c_str()); std::strcpy(cstr, name.c_str());
m_Input = SstReaderOpen(cstr, NULL, mpiComm); m_Input = SstReaderOpen(cstr, NULL, mpiComm);
auto varCallback = [](void *reader, const char *variableName,
const char *type, void *data) {
std::string Type(type);
typename SstReader::SstReader *Reader =
reinterpret_cast<typename SstReader::SstReader *>(reader);
if (Type == "compound")
{
return (void *)NULL;
}
#define declare_type(T) \
else if (Type == GetType<T>()) \
{ \
Variable<T> *variable = \
&(Reader->m_IO.DefineVariable<T>(variableName)); \
variable->SetData((T *)data); \
variable->m_AvailableStepsCount = 1; \
return (void *)variable; \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
return (void *)NULL;
};
auto arrayCallback = [](void *reader, const char *variableName,
const char *type, int DimCount, size_t *Shape,
size_t *Start, size_t *Count) {
std::vector<size_t> VecShape;
std::vector<size_t> VecStart;
std::vector<size_t> VecCount;
std::string Type(type);
typename SstReader::SstReader *Reader =
reinterpret_cast<typename SstReader::SstReader *>(reader);
for (int i = 0; i < DimCount; i++)
{
VecShape.push_back(Shape[i]);
VecStart.push_back(Start[i]);
VecCount.push_back(Count[i]);
}
if (Type == "compound")
{
return (void *)NULL;
}
#define declare_type(T) \
else if (Type == GetType<T>()) \
{ \
Variable<T> *variable = &(Reader->m_IO.DefineVariable<T>( \
variableName, VecShape, VecStart, VecCount)); \
variable->m_AvailableStepsCount = 1; \
return (void *)variable; \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
return (void *)NULL;
};
SstReaderInitCallback(m_Input, this, varCallback, arrayCallback);
Init(); Init();
delete[] cstr; delete[] cstr;
} }
StepStatus SstReader::BeginStep(StepMode mode, const float timeout_sec)
{
return (StepStatus)SstAdvanceStep(m_Input, (int)mode, timeout_sec);
}
void SstReader::EndStep()
{
m_IO.RemoveAllVariables();
SstReleaseStep(m_Input);
}
void SstReader::Close(const int transportIndex) { SstReaderClose(m_Input); } void SstReader::Close(const int transportIndex) { SstReaderClose(m_Input); }
// PRIVATE // PRIVATE
...@@ -37,4 +108,29 @@ void SstReader::Init() ...@@ -37,4 +108,29 @@ void SstReader::Init()
auto itRealTime = m_IO.m_Parameters.find("real_time"); auto itRealTime = m_IO.m_Parameters.find("real_time");
} }
#define declare_type(T) \
void SstReader::DoGetSync(Variable<T> &variable, T *data) \
{ \
SstGetDeferred(m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
SstPerformGets(m_Input); \
} \
void SstReader::DoGetDeferred(Variable<T> &variable, T *data) \
{ \
SstGetDeferred(m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), data); \
} \
void SstReader::DoGetDeferred(Variable<T> &variable, T &data) \
{ \
SstGetDeferred(m_Input, (void *)&variable, variable.m_Name.c_str(), \
variable.m_Start.size(), variable.m_Start.data(), \
variable.m_Count.data(), &data); \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
void SstReader::PerformGets() { SstPerformGets(m_Input); }
} // end namespace adios } // end namespace adios
...@@ -42,11 +42,29 @@ public: ...@@ -42,11 +42,29 @@ public:
virtual ~SstReader() = default; virtual ~SstReader() = default;
StepStatus BeginStep();
StepStatus BeginStep(StepMode mode, const float timeoutSeconds = 0.f);
void EndStep();
void Close(const int transportIndex = -1); void Close(const int transportIndex = -1);
void PerformGets();
private: private:
void Init(); void Init();
SstStream m_Input; SstStream m_Input;
#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
void DoGetDeferred(Variable<T> &, T *) final; \
void DoGetDeferred(Variable<T> &, T &) final;
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
template <class T>
void GetSyncCommon(Variable<T> &variable, T *data);
template <class T>
void GetDeferredCommon(Variable<T> &variable, T *data);
}; };
} // end namespace adios } // end namespace adios
......
...@@ -32,11 +32,33 @@ SstWriter::SstWriter(IO &io, const std::string &name, const Mode mode, ...@@ -32,11 +32,33 @@ SstWriter::SstWriter(IO &io, const std::string &name, const Mode mode,
StepStatus SstWriter::BeginStep(StepMode mode, const float timeout_sec) StepStatus SstWriter::BeginStep(StepMode mode, const float timeout_sec)
{ {
if (m_FFSmarshal)
{
return (StepStatus)SstWriterBeginStep(m_Output, (int)mode, timeout_sec);
}
else
{
// When BP marshalling/unmarshaling complete, this should call
// SstProvideTimestep and clean up at this level
}
return StepStatus::OK; return StepStatus::OK;
} }
void SstWriter::EndStep() {}
void SstWriter::EndStep()
{
if (m_FFSmarshal)
{
SstWriterEndStep(m_Output);
}
else
{
// When BP marshalling/unmarshaling complete, this should call
// SstProvideTimestep and clean up at this level
}
}
void SstWriter::Close(const int transportIndex) { SstWriterClose(m_Output); } void SstWriter::Close(const int transportIndex) { SstWriterClose(m_Output); }
void SstWriter::PerformPuts() {}
// PRIVATE functions below // PRIVATE functions below
void SstWriter::Init() void SstWriter::Init()
...@@ -56,75 +78,19 @@ void SstWriter::Init() ...@@ -56,75 +78,19 @@ void SstWriter::Init()
} }
} }
}; };
lf_SetBoolParameter("FFSmarshal", m_FFSmarshal);
// lf_SetBoolParameter("real_time", m_DoRealTime);
// lf_SetBoolParameter("monitoring", m_DoMonitor);
// if (m_DoRealTime)
// {
// /**
// * Lambda function that assigns a parameter in m_Method to a
// * localVariable
// * of type std::string
// */
// auto lf_AssignString = [&](const std::string parameter,
// std::string &localVariable) {
// auto it = m_IO.m_Parameters.find(parameter);
// if (it != m_IO.m_Parameters.end())
// {
// localVariable = it->second;
// }
// };
// /**
// * Lambda function that assigns a parameter in m_Method to a
// * localVariable
// * of type int
// */
// auto lf_AssignInt = [&](const std::string parameter,
// int &localVariable) {
// auto it = m_IO.m_Parameters.find(parameter);
// if (it != m_IO.m_Parameters.end())
// {
// localVariable = std::stoi(it->second);
// }
// };
// auto lf_IsNumber = [](const std::string &s) {
// return !s.empty() && std::find_if(s.begin(), s.end(), [](char c)
// {
// return !std::isdigit(c);
// }) == s.end();
// };
// json jmsg;
// for (const auto &i : m_IO.m_Parameters)
// {
// if (lf_IsNumber(i.second))
// {
// jmsg[i.first] = std::stoi(i.second);
// }
// else
// {
// jmsg[i.first] = i.second;
// }
// }
// jmsg["stream_mode"] = "sender";
// m_Man.add_stream(jmsg);
// std::string method_type;
// lf_AssignString("method_type", method_type);
// int num_channels = 0;
// lf_AssignInt("num_channels", num_channels);
// }
} }
#define declare_type(T) \ #define declare_type(T) \
void SstWriter::DoPutSync(Variable<T> &variable, const T *values) \ void SstWriter::DoPutSync(Variable<T> &variable, const T *values) \
{ \ { \
PutSyncCommon(variable, values); \ PutSyncCommon(variable, values); \
} } \
void SstWriter::DoPutDeferred(Variable<T> &variable, const T *values) \
{ \
PutSyncCommon(variable, values); \
} \
void SstWriter::DoPutDeferred(Variable<T> &, const T &value) {}
ADIOS2_FOREACH_TYPE_1ARG(declare_type) ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type #undef declare_type
......
...@@ -34,6 +34,7 @@ public: ...@@ -34,6 +34,7 @@ public:
virtual ~SstWriter() = default; virtual ~SstWriter() = default;
StepStatus BeginStep(StepMode mode, const float timeoutSeconds = 0.f) final; StepStatus BeginStep(StepMode mode, const float timeoutSeconds = 0.f) final;
void PerformPuts() final;
void EndStep() final; void EndStep() final;
void Close(const int transportIndex = -1) final; void Close(const int transportIndex = -1) final;
...@@ -43,14 +44,20 @@ private: ...@@ -43,14 +44,20 @@ private:
/// called from constructor /// called from constructor
#define declare_type(T) \ #define declare_type(T) \
void DoPutSync(Variable<T> &variable, const T *values) final; void DoPutSync(Variable<T> &variable, const T *values) final; \
void DoPutDeferred(Variable<T> &, const T *) final; \
void DoPutDeferred(Variable<T> &, const T &) final;
ADIOS2_FOREACH_TYPE_1ARG(declare_type) ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type #undef declare_type
template <class T> template <class T>
void PutSyncCommon(Variable<T> &variable, const T *values); void PutSyncCommon(Variable<T> &variable, const T *values);
template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *values);
SstStream m_Output; SstStream m_Output;
bool m_FFSmarshal = true;
}; };
} // end namespace adios } // end namespace adios
......
...@@ -35,11 +35,55 @@ void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values) ...@@ -35,11 +35,55 @@ void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values)
variable.m_Start.assign(variable.m_Count.size(), 0); variable.m_Start.assign(variable.m_Count.size(), 0);
} }
std::cout << "I am hooked to the Sst library\n";
std::cout << "Variable " << variable.m_Name << "\n"; std::cout << "Variable " << variable.m_Name << "\n";
std::cout << "putshape " << variable.m_Count.size() << "\n"; std::cout << "Shape ID ";
std::cout << "varshape " << variable.m_Shape.size() << "\n"; switch (variable.m_ShapeID)
std::cout << "offset " << variable.m_Start.size() << "\n"; {
case ShapeID::GlobalValue:
std::cout << "GlobalValue : ";
break;
case ShapeID::GlobalArray:
std::cout << "GlobalArray : ";
break;
case ShapeID::JoinedArray:
std::cout << "JoinedArray : ";
break;
case ShapeID::LocalValue:
std::cout << "LocalValue : ";
break;
case ShapeID::LocalArray:
std::cout << "LocalArray : ";
break;
}
std::cout << "putshape ";
for (auto it = variable.m_Count.begin(); it != variable.m_Count.end(); ++it)
{
std::cout << ' ' << *it;
}
std::cout << '\n';
std::cout << "varshape ";
for (auto it = variable.m_Shape.begin(); it != variable.m_Shape.end(); ++it)
{
std::cout << ' ' << *it;
}
std::cout << '\n';
std::cout << "offsets ";
for (auto it = variable.m_Start.begin(); it != variable.m_Start.end(); ++it)
{
std::cout << ' ' << *it;
}
std::cout << '\n';
if (m_FFSmarshal)
{
SstMarshal(m_Output, (void *)&variable, variable.m_Name.c_str(),
variable.m_Type.c_str(), variable.m_ElementSize,
variable.m_Shape.size(), variable.m_Shape.data(),
variable.m_Count.data(), variable.m_Start.data(), values);
}
else
{
// Do BP marshaling
}
} }
} // end namespace adios } // end namespace adios
......
...@@ -3,6 +3,7 @@ add_library(sst ...@@ -3,6 +3,7 @@ add_library(sst
dp/dummy_dp.c dp/dummy_dp.c
cp/cp.c cp/cp.c
cp/cp_common.c cp/cp_common.c
cp/ffs_marshal.c
) )
target_compile_features(sst PRIVATE c_std_99) target_compile_features(sst PRIVATE c_std_99)
......
...@@ -252,6 +252,15 @@ static void waitForReaderResponse(WS_ReaderInfo Reader) ...@@ -252,6 +252,15 @@ static void waitForReaderResponse(WS_ReaderInfo Reader)
CP_verbose(Stream, "Reader ready on WSR %p, Stream established.\n", Reader); CP_verbose(Stream, "Reader ready on WSR %p, Stream established.\n", Reader);
} }
static char *TrimSuffix(const char *Name)
{
char *Ret = strdup(Name);
int Len = strlen(Name);
if (strcmp(Name + Len - 3, ".bp") == 0)
Ret[Len - 3] = 0;
return Ret;
}
SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm) SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
{ {
SstStream Stream; SstStream Stream;
...@@ -260,6 +269,7 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm) ...@@ -260,6 +269,7 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
Stream->Role = WriterRole; Stream->Role = WriterRole;
CP_parseParams(Stream, params); CP_parseParams(Stream, params);
char *Filename = TrimSuffix(Name);
Stream->DP_Interface = LoadDP("dummy"); Stream->DP_Interface = LoadDP("dummy");
Stream->CPInfo = CP_getCPInfo(Stream->DP_Interface); Stream->CPInfo = CP_getCPInfo(Stream->DP_Interface);
...@@ -282,10 +292,10 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm) ...@@ -282,10 +292,10 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
if (Stream->Rank == 0) if (Stream->Rank == 0)
{ {
writeContactInfo(Name, Stream); writeContactInfo(Filename, Stream);
} }
CP_verbose(Stream, "Opening Stream \"%s\"\n", Name); CP_verbose(Stream, "Opening Stream \"%s\"\n", Filename);
if (Stream->WaitForFirstReader) if (Stream->WaitForFirstReader)
{ {
...@@ -315,7 +325,7 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm) ...@@ -315,7 +325,7 @@ SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm)
MPI_Barrier(Stream->mpiComm); MPI_Barrier(Stream->mpiComm);
gettimeofday(&Stream->ValidStartTime, NULL); gettimeofday(&Stream->ValidStartTime, NULL);
} }
CP_verbose(Stream, "Finish opening Stream \"%s\"\n", Name); CP_verbose(Stream, "Finish opening Stream \"%s\"\n", Filename);
return Stream; return Stream;
} }
...@@ -333,6 +343,7 @@ void sendOneToEachReaderRank(SstStream s, CMFormat f, void *Msg, ...@@ -333,6 +343,7 @@ void sendOneToEachReaderRank(SstStream s, CMFormat f, void *Msg,
/* add the reader-rank-specific Stream identifier to each outgoing /* add the reader-rank-specific Stream identifier to each outgoing
* message */ * message */
*RS_StreamPtr = CP_WSR_Stream->Connections[peer].RemoteStreamID; *RS_StreamPtr = CP_WSR_Stream->Connections[peer].RemoteStreamID;
CP_verbose(s, "Sending a message to reader %d\n", peer);
CMwrite(conn, f, Msg); CMwrite(conn, f, Msg);
j++; j++;
} }
...@@ -366,8 +377,41 @@ void SstWriterClose(SstStream Stream) ...@@ -366,8 +377,41 @@ void SstWriterClose(SstStream Stream)
CP_verbose(Stream, "All timesteps are released in WriterClose\n"); CP_verbose(Stream, "All timesteps are released in WriterClose\n");
} }
void SstProvideTimestep(SstStream s, SstMetadata LocalMetadata, SstData Data, static FFSFormatList AddUniqueFormats(FFSFormatList List,
long Timestep) FFSFormatList Candidates)
{
FFSFormatList Tmp = List;
FFSFormatList Ret = List;
// If nothing to add, return original
if (!Candidates)
return Ret;
// Add tail of candidates list first
Ret = AddUniqueFormats(List, Candidates->Next);
while (Tmp)
{
if ((Tmp->FormatIDRepLen == Candidates->FormatIDRepLen) &&
(memcmp(Tmp->FormatIDRep, Candidates->FormatIDRep,
Tmp->FormatIDRepLen) == 0))
{
// Identical format already in List, don't add this one
return Ret;
}
Tmp = Tmp->Next;
}
// New format not in list, add him to head and return.
// This is destructive of candidates list, but that is unimportant for
// deallocation in this circumstance.
Candidates->Next = Ret;
return Candidates;
}
extern void SstInternalProvideTimestep(SstStream s, SstData LocalMetadata,