Skip to content
Snippets Groups Projects
Unverified Commit c57d4a0c authored by Jason Wang's avatar Jason Wang Committed by GitHub
Browse files

Merge pull request #292 from JasonRuonanWang/dataman_rebase

fixed a few problems of DataMan/ZeroMQ, and added a simple Transport:…
parents 95e3bd67 e175fb70
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,7 @@ void UserCallBack(const void *data, std::string doid, std::string var,
std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1,
std::multiplies<std::size_t>());
for (unsigned int i = 0; i < varsize; ++i)
for (size_t i = 0; i < varsize; ++i)
std::cout << ((float *)data)[i] << " ";
std::cout << std::endl;
}
......@@ -60,7 +60,7 @@ int main(int argc, char *argv[])
dataManReader->SetCallBack(UserCallBack);
for (unsigned int i = 0; i < 3; ++i)
for (unsigned int i = 0; i < 30; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
......
......@@ -23,7 +23,7 @@ DataManReader::DataManReader(IO &io, const std::string &name,
Init();
}
void DataManReader::SetCallback(
void DataManReader::SetCallBack(
std::function<void(const void *, std::string, std::string, std::string,
Dims)>
callback)
......
......@@ -45,7 +45,7 @@ public:
* @param callback function (get) provided by the user to be applied in
* DataMan
*/
void SetCallback(std::function<void(const void *, std::string, std::string,
void SetCallBack(std::function<void(const void *, std::string, std::string,
std::string, Dims)>
callback);
......
......@@ -31,6 +31,10 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values)
// This part will go away, this is just to monitor variables per rank
if (variable.m_Shape.empty())
{
variable.m_Shape = variable.m_Count;
}
if (variable.m_Count.empty())
{
variable.m_Count = variable.m_Shape;
......
......@@ -122,13 +122,10 @@ void WANZmq::Write(const char *buffer, size_t size)
m_Profiler.Timers.at("write").Resume();
}
/*
int status = zmq_send(m_Socket, buffer, size, 0);
char ret[10];
zmq_recv(m_Socket, ret, 10, 0);
if (m_Profiler.IsActive)
{
m_Profiler.Timers.at("write").Pause();
......@@ -145,12 +142,12 @@ void WANZmq::Write(const char *buffer, size_t size)
", in call to WANZmq write\n");
}
}
*/
}
void WANZmq::Read(char *buffer, size_t size)
{
// TODO: Implement read function
zmq_recv(m_Socket, buffer, size, 0);
int status = zmq_send(m_Socket, "OK", 4, 0);
}
void WANZmq::Flush() {}
......
......@@ -34,12 +34,6 @@ void DataMan::OpenWANTransports(const std::string &name,
{
std::shared_ptr<Transport> wanTransport, controlTransport;
// to be removed
for (auto &i : parameters)
{
std::cout << i.first << " " << i.second << std::endl;
}
const std::string type(
GetParameter("type", parameters, true, m_DebugMode, ""));
......@@ -67,7 +61,7 @@ void DataMan::OpenWANTransports(const std::string &name,
messageName = name;
}
if (type == "wan") // need to create directory
if (type == "wan")
{
if (trans == "zmq")
{
......@@ -95,9 +89,16 @@ void DataMan::OpenWANTransports(const std::string &name,
}
wanTransport->Open(messageName, openMode);
m_Transports.push_back(std::move(wanTransport));
m_Transports.push_back(wanTransport);
controlTransport->Open(messageName, openMode);
m_ControlTransports.push_back(std::move(controlTransport));
m_ControlTransports.push_back(controlTransport);
if (openMode == OpenMode::Read)
{
m_Listening = true;
m_ControlThreads.push_back(std::thread(
&DataMan::ReadThread, this, wanTransport, controlTransport));
}
}
}
......@@ -109,6 +110,8 @@ void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg)
jmsg["bytes"].get<size_t>());
}
void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {}
void DataMan::SetCallback(std::function<void(const void *, std::string,
std::string, std::string, Dims)>
callback)
......@@ -121,18 +124,28 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
{
while (m_Listening)
{
// Wait for Read API to be implemented
/*
if (ctl_trans->Read() >= 0)
{
std::string smsg;
nlohmann::json jmsg = json::parse(smsg);
}
else
char buffer[1024];
size_t bytes;
nlohmann::json jmsg;
ctl_trans->Read(buffer, 1024);
std::string smsg(buffer);
jmsg = nlohmann::json::parse(smsg);
bytes = jmsg.value("bytes", 0);
if (bytes > 0)
{
usleep(1);
std::vector<char> data(bytes);
trans->Read(data.data(), bytes);
std::string doid = jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype = jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_CallBack)
{
m_CallBack(data.data(), doid, var, dtype, putshape);
}
}
*/
}
}
......
......@@ -33,6 +33,7 @@ public:
const bool profile);
void WriteWAN(const void *buffer, nlohmann::json jmsg);
void ReadWAN(void *buffer, nlohmann::json jmsg);
void SetCallback(std::function<void(const void *, std::string, std::string,
std::string, Dims)>
......@@ -49,7 +50,7 @@ private:
std::function<void(const void *, std::string, std::string, std::string,
Dims)>
m_CallBack;
m_CallBack = nullptr;
nlohmann::json m_JMessage;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment