Skip to content
Snippets Groups Projects
Commit 94b39f8c authored by Wang, Ruonan's avatar Wang, Ruonan
Browse files

fixed a few problems of DataMan/ZeroMQ, and added a simple Transport::Read()...

fixed a few problems of DataMan/ZeroMQ, and added a simple Transport::Read() API to make the hello world workflow working again
parent 4dc09556
No related branches found
No related tags found
1 merge request!292fixed a few problems of DataMan/ZeroMQ, and added a simple Transport:…
...@@ -26,7 +26,7 @@ void UserCallBack(const void *data, std::string doid, std::string var, ...@@ -26,7 +26,7 @@ void UserCallBack(const void *data, std::string doid, std::string var,
std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1,
std::multiplies<std::size_t>()); std::multiplies<std::size_t>());
for (unsigned int i = 0; i < varsize; ++i) for (size_t i = 0; i < varsize; ++i)
std::cout << ((float *)data)[i] << " "; std::cout << ((float *)data)[i] << " ";
std::cout << std::endl; std::cout << std::endl;
} }
...@@ -60,7 +60,7 @@ int main(int argc, char *argv[]) ...@@ -60,7 +60,7 @@ int main(int argc, char *argv[])
dataManReader->SetCallBack(UserCallBack); dataManReader->SetCallBack(UserCallBack);
for (unsigned int i = 0; i < 3; ++i) for (unsigned int i = 0; i < 30; ++i)
{ {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} }
......
...@@ -23,7 +23,7 @@ DataManReader::DataManReader(IO &io, const std::string &name, ...@@ -23,7 +23,7 @@ DataManReader::DataManReader(IO &io, const std::string &name,
Init(); Init();
} }
void DataManReader::SetCallback( void DataManReader::SetCallBack(
std::function<void(const void *, std::string, std::string, std::string, std::function<void(const void *, std::string, std::string, std::string,
Dims)> Dims)>
callback) callback)
......
...@@ -45,7 +45,7 @@ public: ...@@ -45,7 +45,7 @@ public:
* @param callback function (get) provided by the user to be applied in * @param callback function (get) provided by the user to be applied in
* DataMan * DataMan
*/ */
void SetCallback(std::function<void(const void *, std::string, std::string, void SetCallBack(std::function<void(const void *, std::string, std::string,
std::string, Dims)> std::string, Dims)>
callback); callback);
......
...@@ -31,6 +31,10 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values) ...@@ -31,6 +31,10 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values)
// This part will go away, this is just to monitor variables per rank // This part will go away, this is just to monitor variables per rank
if (variable.m_Shape.empty())
{
variable.m_Shape = variable.m_Count;
}
if (variable.m_Count.empty()) if (variable.m_Count.empty())
{ {
variable.m_Count = variable.m_Shape; variable.m_Count = variable.m_Shape;
......
...@@ -122,13 +122,10 @@ void WANZmq::Write(const char *buffer, size_t size) ...@@ -122,13 +122,10 @@ void WANZmq::Write(const char *buffer, size_t size)
m_Profiler.Timers.at("write").Resume(); m_Profiler.Timers.at("write").Resume();
} }
/*
int status = zmq_send(m_Socket, buffer, size, 0); int status = zmq_send(m_Socket, buffer, size, 0);
char ret[10]; char ret[10];
zmq_recv(m_Socket, ret, 10, 0); zmq_recv(m_Socket, ret, 10, 0);
if (m_Profiler.IsActive) if (m_Profiler.IsActive)
{ {
m_Profiler.Timers.at("write").Pause(); m_Profiler.Timers.at("write").Pause();
...@@ -145,12 +142,12 @@ void WANZmq::Write(const char *buffer, size_t size) ...@@ -145,12 +142,12 @@ void WANZmq::Write(const char *buffer, size_t size)
", in call to WANZmq write\n"); ", in call to WANZmq write\n");
} }
} }
*/
} }
void WANZmq::Read(char *buffer, size_t size) void WANZmq::Read(char *buffer, size_t size)
{ {
// TODO: Implement read function zmq_recv(m_Socket, buffer, size, 0);
int status = zmq_send(m_Socket, "OK", 4, 0);
} }
void WANZmq::Flush() {} void WANZmq::Flush() {}
......
...@@ -34,12 +34,6 @@ void DataMan::OpenWANTransports(const std::string &name, ...@@ -34,12 +34,6 @@ void DataMan::OpenWANTransports(const std::string &name,
{ {
std::shared_ptr<Transport> wanTransport, controlTransport; std::shared_ptr<Transport> wanTransport, controlTransport;
// to be removed
for (auto &i : parameters)
{
std::cout << i.first << " " << i.second << std::endl;
}
const std::string type( const std::string type(
GetParameter("type", parameters, true, m_DebugMode, "")); GetParameter("type", parameters, true, m_DebugMode, ""));
...@@ -67,7 +61,7 @@ void DataMan::OpenWANTransports(const std::string &name, ...@@ -67,7 +61,7 @@ void DataMan::OpenWANTransports(const std::string &name,
messageName = name; messageName = name;
} }
if (type == "wan") // need to create directory if (type == "wan")
{ {
if (trans == "zmq") if (trans == "zmq")
{ {
...@@ -95,9 +89,16 @@ void DataMan::OpenWANTransports(const std::string &name, ...@@ -95,9 +89,16 @@ void DataMan::OpenWANTransports(const std::string &name,
} }
wanTransport->Open(messageName, openMode); wanTransport->Open(messageName, openMode);
m_Transports.push_back(std::move(wanTransport)); m_Transports.push_back(wanTransport);
controlTransport->Open(messageName, openMode); controlTransport->Open(messageName, openMode);
m_ControlTransports.push_back(std::move(controlTransport)); m_ControlTransports.push_back(controlTransport);
if (openMode == OpenMode::Read)
{
m_Listening = true;
m_ControlThreads.push_back(std::thread(
&DataMan::ReadThread, this, wanTransport, controlTransport));
}
} }
} }
...@@ -109,6 +110,8 @@ void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg) ...@@ -109,6 +110,8 @@ void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg)
jmsg["bytes"].get<size_t>()); jmsg["bytes"].get<size_t>());
} }
void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {}
void DataMan::SetCallback(std::function<void(const void *, std::string, void DataMan::SetCallback(std::function<void(const void *, std::string,
std::string, std::string, Dims)> std::string, std::string, Dims)>
callback) callback)
...@@ -121,18 +124,28 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans, ...@@ -121,18 +124,28 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
{ {
while (m_Listening) while (m_Listening)
{ {
// Wait for Read API to be implemented char buffer[1024];
/* size_t bytes;
if (ctl_trans->Read() >= 0) nlohmann::json jmsg;
{ ctl_trans->Read(buffer, 1024);
std::string smsg; std::string smsg(buffer);
nlohmann::json jmsg = json::parse(smsg); jmsg = nlohmann::json::parse(smsg);
} bytes = jmsg.value("bytes", 0);
else
if (bytes > 0)
{ {
usleep(1); char data[bytes];
trans->Read(data, bytes);
std::string doid = jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype = jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_CallBack)
{
m_CallBack(data, doid, var, dtype, putshape);
}
} }
*/
} }
} }
......
...@@ -33,6 +33,7 @@ public: ...@@ -33,6 +33,7 @@ public:
const bool profile); const bool profile);
void WriteWAN(const void *buffer, nlohmann::json jmsg); void WriteWAN(const void *buffer, nlohmann::json jmsg);
void ReadWAN(void *buffer, nlohmann::json jmsg);
void SetCallback(std::function<void(const void *, std::string, std::string, void SetCallback(std::function<void(const void *, std::string, std::string,
std::string, Dims)> std::string, Dims)>
...@@ -49,7 +50,7 @@ private: ...@@ -49,7 +50,7 @@ private:
std::function<void(const void *, std::string, std::string, std::string, std::function<void(const void *, std::string, std::string, std::string,
Dims)> Dims)>
m_CallBack; m_CallBack = nullptr;
nlohmann::json m_JMessage; nlohmann::json m_JMessage;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment