Skip to content
Snippets Groups Projects
Unverified Commit 3dd2ccde authored by Jason Wang's avatar Jason Wang Committed by GitHub
Browse files

Merge pull request #5 from williamfgc/bpindataman

Bpindataman
parents 2cc88425 555daca3
No related branches found
No related tags found
1 merge request!340for BP buffer over WAN
......@@ -187,8 +187,8 @@ ADIOS2_FOREACH_TYPE_1ARG(declare_type)
Operator &ADIOS::DefineCallBack(
const std::string name,
const std::function<void(void *, const std::string, const std::string,
const std::string, const Dims &)> &function,
const std::function<void(void *, const std::string &, const std::string &,
const std::string &, const Dims &)> &function,
const Params &parameters)
{
std::shared_ptr<Operator> callbackOperator =
......
......@@ -173,8 +173,9 @@ private:
/** define CallBack2 */
Operator &DefineCallBack(
const std::string name,
const std::function<void(void *, const std::string, const std::string,
const std::string, const Dims &)> &function,
const std::function<void(void *, const std::string &,
const std::string &, const std::string &,
const Dims &)> &function,
const Params &parameters);
};
......
......@@ -95,7 +95,7 @@ void BPFileReader::InitBuffer()
BroadcastVector(m_BP3Deserializer.m_Metadata.m_Buffer, m_MPIComm);
// fills IO with Variables and Attributes
m_BP3Deserializer.ParseMetadata(m_IO);
m_BP3Deserializer.ParseMetadata(m_BP3Deserializer.m_Metadata, m_IO);
}
#define declare_type(T) \
......
......@@ -32,11 +32,11 @@ BP3Deserializer::BP3Deserializer(MPI_Comm mpiComm, const bool debugMode)
{
}
void BP3Deserializer::ParseMetadata(IO &io)
void BP3Deserializer::ParseMetadata(const BufferSTL &bufferSTL, IO &io)
{
ParseMinifooter();
ParsePGIndex();
ParseVariablesIndex(io);
ParseMinifooter(bufferSTL);
ParsePGIndex(bufferSTL);
ParseVariablesIndex(bufferSTL, io);
// ParseAttributesIndex(io);
}
......@@ -66,7 +66,7 @@ void BP3Deserializer::ClipContiguousMemory(
}
// PRIVATE
void BP3Deserializer::ParseMinifooter()
void BP3Deserializer::ParseMinifooter(const BufferSTL &bufferSTL)
{
auto lf_GetEndianness = [](const uint8_t endianness, bool &isLittleEndian) {
......@@ -81,7 +81,7 @@ void BP3Deserializer::ParseMinifooter()
}
};
const auto &buffer = m_Metadata.m_Buffer;
const auto &buffer = bufferSTL.m_Buffer;
const size_t bufferSize = buffer.size();
size_t position = bufferSize - 4;
......@@ -110,9 +110,9 @@ void BP3Deserializer::ParseMinifooter()
m_Minifooter.AttributesIndexStart = ReadValue<uint64_t>(buffer, position);
}
void BP3Deserializer::ParsePGIndex()
void BP3Deserializer::ParsePGIndex(const BufferSTL &bufferSTL)
{
const auto &buffer = m_Metadata.m_Buffer;
const auto &buffer = bufferSTL.m_Buffer;
auto &position = m_Metadata.m_Position;
position = m_Minifooter.PGIndexStart;
......@@ -128,7 +128,7 @@ void BP3Deserializer::ParsePGIndex()
}
}
void BP3Deserializer::ParseVariablesIndex(IO &io)
void BP3Deserializer::ParseVariablesIndex(const BufferSTL &bufferSTL, IO &io)
{
auto lf_ReadElementIndex = [&](IO &io, const std::vector<char> &buffer,
size_t position) {
......@@ -238,7 +238,7 @@ void BP3Deserializer::ParseVariablesIndex(IO &io)
} // end switch
};
const auto &buffer = m_Metadata.m_Buffer;
const auto &buffer = bufferSTL.m_Buffer;
size_t position = m_Minifooter.VarsIndexStart;
const uint32_t count = ReadValue<uint32_t>(buffer, position);
......
......@@ -40,7 +40,7 @@ public:
~BP3Deserializer() = default;
void ParseMetadata(IO &io);
void ParseMetadata(const BufferSTL &bufferSTL, IO &io);
// Sync functions
template <class T>
......@@ -64,9 +64,9 @@ private:
static std::mutex m_Mutex;
void ParseMinifooter();
void ParsePGIndex();
void ParseVariablesIndex(IO &io);
void ParseMinifooter(const BufferSTL &bufferSTL);
void ParsePGIndex(const BufferSTL &bufferSTL);
void ParseVariablesIndex(const BufferSTL &bufferSTL, IO &io);
void ParseAttributesIndex(IO &io);
/**
......
......@@ -8,6 +8,8 @@
* Author: Jason Wang wangr1@ornl.gov
*/
#include <fstream> //TODO go away
#include "DataMan.h"
#include "adios2/helper/adiosFunctions.h"
......@@ -161,8 +163,13 @@ void DataMan::WriteWAN(const void *buffer, size_t size)
m_Transports[m_CurrentTransport]->Write(
reinterpret_cast<const char *>(buffer), size);
std::ofstream bpfile("datamanW.bp", std::ios_base::binary);
bpfile.write(reinterpret_cast<const char *>(buffer), size);
bpfile.close();
for (int i = 0; i < size / 4; i++)
{
std::cout << static_cast<const float *>(buffer)[i] << " ";
}
}
......@@ -225,25 +232,33 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
{
while (m_Listening)
{
char *buffer = new char[m_BufferSize];
// char *buffer = new char[m_BufferSize];
std::vector<char> buffer(m_BufferSize);
Transport::Status status;
trans->IRead(buffer, m_BufferSize, status);
trans->IRead(buffer.data(), m_BufferSize, status);
if (status.Bytes > 0)
{
m_BP3Deserializer->m_Data.Resize(
status.Bytes, "in DataMan Streaming Listener");
std::memcpy(m_BP3Deserializer->m_Data.m_Buffer.data(), buffer,
status.Bytes);
std::memcpy(m_BP3Deserializer->m_Data.m_Buffer.data(),
buffer.data(), status.Bytes);
/* TODO: remove this part */
m_Callback->RunCallback2(buffer, "ss", "rr", "char", {128});
m_Callback->RunCallback2(buffer.data(), "ss", "rr", "char",
adios2::Dims{128});
std::ofstream bpfile("datamanR.bp", std::ios_base::binary);
bpfile.write(m_BP3Deserializer->m_Data.m_Buffer.data(),
m_BP3Deserializer->m_Data.m_Buffer.size());
bpfile.close();
m_BP3Deserializer->ParseMetadata(*m_IO);
m_BP3Deserializer->ParseMetadata(m_BP3Deserializer->m_Data,
*m_IO);
/*
const auto variablesInfo = m_IO->GetAvailableVariables();
for (const auto &variableInfoPair : variablesInfo)
{
......@@ -256,9 +271,7 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
<< "\t Value: " << parameter.second << "\n";
}
}
*/
}
delete[] buffer;
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment