Skip to content
Snippets Groups Projects
Commit 492e6a97 authored by Wang, Ruonan's avatar Wang, Ruonan
Browse files

added asynchronous IWrite and IRead API to ZMQ transport for metadata channel

parent 98e67b62
No related branches found
No related tags found
1 merge request!307DataMan and ZMQ transport worked with the newly refactored architecture
......@@ -43,6 +43,12 @@ int main(int argc, char *argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
int timeout = 5;
if(argc == 2){
timeout = atoi(argv[1]);
}
try
{
adios2::ADIOS adios(adios2::DebugON);
......@@ -63,18 +69,7 @@ int main(int argc, char *argv[])
adios2::Engine &dataManReader =
dataManIO.Open("myDoubles.bp", adios2::Mode::Read);
for (unsigned int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
adios2::Variable<double> *ioMyDoubles =
dataManIO.InquireVariable<double>("ioMyDoubles");
if (ioMyDoubles == nullptr)
{
std::cout << "Variable ioMyDoubles not read...yet\n";
}
std::this_thread::sleep_for(std::chrono::seconds(timeout));
dataManReader.Close();
}
......
......@@ -125,7 +125,7 @@ void WANZmq::Write(const char *buffer, size_t size, size_t start)
}
void WANZmq::IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT)
size_t start)
{
}
......@@ -135,7 +135,29 @@ void WANZmq::Read(char *buffer, size_t size, size_t start)
int status = zmq_send(m_Socket, "OK", 4, 0);
}
void WANZmq::IRead(char *buffer, size_t size, Status &status, size_t start) {}
void WANZmq::IRead(char *buffer, size_t size, Status &status, size_t start)
{
int bytes = zmq_recv(m_Socket, buffer, size, ZMQ_DONTWAIT);
int ret = zmq_send(m_Socket, "OK", 4, 0);
if (bytes > 0)
{
status.Bytes = bytes;
status.Running = true;
}
else
{
status.Bytes = 0;
status.Running = true;
}
if (bytes == size)
{
status.Successful = true;
}
else
{
status.Successful = false;
}
}
void WANZmq::Flush() {}
......
......@@ -30,6 +30,7 @@ DataMan::~DataMan()
{
for (auto &controlThread : m_ControlThreads)
{
m_Listening = false;
controlThread.join();
}
}
......@@ -143,23 +144,27 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
char buffer[1024];
size_t bytes = 0;
nlohmann::json jmsg;
ctl_trans->Read(buffer, 1024, 0);
std::string smsg(buffer);
jmsg = nlohmann::json::parse(smsg);
bytes = jmsg.value("bytes", 0);
if (bytes > 0)
adios2::Transport::Status status;
ctl_trans->IRead(buffer, 1024, status, 0);
if (status.Bytes > 0)
{
std::vector<char> data(bytes);
trans->Read(data.data(), bytes);
std::string doid = jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype = jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_Callback != nullptr && m_Callback->m_Type == "Signature2")
std::string smsg(buffer);
jmsg = nlohmann::json::parse(smsg);
bytes = jmsg.value("bytes", 0);
if (bytes > 0)
{
m_Callback->RunCallback2(data.data(), doid, var, dtype,
putshape);
std::vector<char> data(bytes);
trans->Read(data.data(), bytes);
std::string doid = jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype = jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_Callback != nullptr && m_Callback->m_Type == "Signature2")
{
m_Callback->RunCallback2(data.data(), doid, var, dtype,
putshape);
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment