From 348e042248b338058e27e3695f8ed39f6605e79c Mon Sep 17 00:00:00 2001
From: Ruonan Wang <jason.ruonan.wang@gmail.com>
Date: Sun, 23 Apr 2017 00:58:16 -0400
Subject: [PATCH] MdtmMan worked with ZfpMan for automatic compression and
 decompression

---
 .../adios2/engine/dataman/DataManWriter.tcc   |  9 +-
 source/dataman/CacheMan.h                     |  4 +-
 source/dataman/DataMan.cpp                    |  5 +-
 source/dataman/DataMan.h                      |  2 +-
 source/dataman/DataManBase.cpp                | 41 ++++----
 source/dataman/DataManBase.h                  |  4 +-
 source/dataman/DumpMan.cpp                    | 20 ++--
 source/dataman/DumpMan.h                      |  2 +-
 source/dataman/MdtmMan.cpp                    | 99 +++++++------------
 source/dataman/MdtmMan.h                      |  8 +-
 source/dataman/StreamMan.cpp                  |  6 +-
 source/dataman/TemporalMan.cpp                |  2 +-
 source/dataman/TemporalMan.h                  |  2 +-
 source/dataman/ZfpMan.cpp                     | 32 +++---
 source/dataman/ZfpMan.h                       |  2 +-
 source/dataman/ZmqMan.cpp                     | 49 ++++-----
 source/dataman/ZmqMan.h                       |  2 +-
 17 files changed, 125 insertions(+), 164 deletions(-)

diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc
index e22d2e4b5..1b2db157d 100644
--- a/source/adios2/engine/dataman/DataManWriter.tcc
+++ b/source/adios2/engine/dataman/DataManWriter.tcc
@@ -30,16 +30,17 @@ void DataManWriter::WriteVariableCommon(Variable<T> &variable, const T *values)
 
     // This part will go away, this is just to monitor variables per rank
 
+    if (variable.m_GlobalDimensions.empty())
+        variable.m_GlobalDimensions = variable.m_LocalDimensions;
+    if (variable.m_Offsets.empty())
+        variable.m_Offsets.assign(variable.m_LocalDimensions.size(), 0);
+
     json jmsg;
     jmsg["doid"] = m_Name;
     jmsg["var"] = variable.m_Name;
     jmsg["dtype"] = GetType<T>();
     jmsg["putshape"] = variable.m_LocalDimensions;
-    if (variable.m_GlobalDimensions.size() == 0)
-        variable.m_GlobalDimensions = variable.m_LocalDimensions;
     jmsg["varshape"] = variable.m_GlobalDimensions;
-    if (variable.m_Offsets.size() == 0)
-        variable.m_Offsets.assign(variable.m_LocalDimensions.size(), 0);
     jmsg["offset"] = variable.m_Offsets;
     jmsg["timestep"] = 0;
     m_Man.put(values, jmsg);
diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h
index ca62cfb16..65b38f057 100644
--- a/source/dataman/CacheMan.h
+++ b/source/dataman/CacheMan.h
@@ -25,7 +25,7 @@ public:
     virtual int init(json p_jmsg);
     virtual int put(const void *p_data, json p_jmsg);
     virtual int get(void *p_data, json &p_jmsg);
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg){};
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
 
     void flush();
     std::string name() { return "CacheItem"; }
@@ -97,7 +97,7 @@ public:
     virtual int init(json p_jmsg);
     virtual int put(const void *p_data, json p_jmsg);
     virtual int get(void *p_data, json &p_jmsg);
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg){};
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
 
     void flush();
     std::string name() { return "CacheMan"; }
diff --git a/source/dataman/DataMan.cpp b/source/dataman/DataMan.cpp
index dfcea7687..ccdd8ea19 100644
--- a/source/dataman/DataMan.cpp
+++ b/source/dataman/DataMan.cpp
@@ -65,7 +65,10 @@ void DataMan::add_stream(json p_jmsg)
     }
     if (p_jmsg["compression_method"].is_string())
     {
-        add_man_to_path(p_jmsg["compression_method"], method, p_jmsg);
+        if (p_jmsg["compression_method"] != "null")
+        {
+            add_man_to_path(p_jmsg["compression_method"], method, p_jmsg);
+        }
     }
 }
 
diff --git a/source/dataman/DataMan.h b/source/dataman/DataMan.h
index ec42b199e..0687abc83 100644
--- a/source/dataman/DataMan.h
+++ b/source/dataman/DataMan.h
@@ -30,7 +30,7 @@ public:
     void add_file(std::string p_method);
     std::string name() { return "DataManager"; }
     std::string type() { return "Manager"; }
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg){};
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
 
 private:
     std::string m_local_ip = "";
diff --git a/source/dataman/DataManBase.cpp b/source/dataman/DataManBase.cpp
index 9741a6385..e84461ae6 100644
--- a/source/dataman/DataManBase.cpp
+++ b/source/dataman/DataManBase.cpp
@@ -130,6 +130,14 @@ int DataManBase::put(const void *p_data, std::string p_doid, std::string p_var,
 int DataManBase::put_begin(const void *p_data, json &p_jmsg)
 {
     check_shape(p_jmsg);
+    if (p_jmsg["compressed_size"].is_number())
+    {
+        p_jmsg["sendbytes"] = p_jmsg["compressed_size"].get<size_t>();
+    }
+    else
+    {
+        p_jmsg["sendbytes"] = p_jmsg["putbytes"].get<size_t>();
+    }
     p_jmsg["profiling"] = m_profiling;
     m_step_time = std::chrono::system_clock::now();
     return 0;
@@ -152,14 +160,6 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg)
     m_profiling["manager_mbs"] =
         m_profiling["total_mb"].get<double>() /
         m_profiling["total_manager_time"].get<double>();
-    if (p_jmsg["compressed_size"].is_number())
-    {
-        p_jmsg["putbytes"] = p_jmsg["compressed_size"].get<size_t>();
-    }
-    else
-    {
-        p_jmsg.erase("compressed_size");
-    }
     put_next(p_data, p_jmsg);
     return 0;
 }
@@ -258,22 +258,23 @@ void DataManBase::print_next(std::ostream &out)
     }
 }
 
-bool DataManBase::auto_transform(const void *p_in, void *p_out, json &p_jmsg)
+bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg)
 {
-    if (p_jmsg["compression_method"] != nullptr)
+    if (a_jmsg["compression_method"].is_string() and
+        a_jmsg["compression_method"].get<std::string>() != "null")
     {
-        auto method = p_jmsg["compression_method"];
+        auto method = a_jmsg["compression_method"].get<std::string>();
         auto man = get_man(method);
-        if (man == nullptr)
+        if (not man)
         {
-            logging("Library file for compression method " +
-                    p_jmsg["compression_method"].dump() + " not found!");
+            logging("Library file for compression method " + method +
+                    " not found!");
             return false;
         }
-        man->transform(p_in, p_out, p_jmsg);
-        p_jmsg.erase("compression_method");
-        p_jmsg.erase("compression_rate");
-        p_jmsg.erase("compressed_size");
+        man->transform(a_data, a_jmsg);
+        a_jmsg.erase("compression_method");
+        a_jmsg.erase("compression_rate");
+        a_jmsg.erase("compressed_size");
         return true;
     }
     else
@@ -539,11 +540,11 @@ void DataManBase::check_shape(json &p_jmsg)
     {
         return;
     }
-    if (p_jmsg["putshape"] == nullptr)
+    if (not p_jmsg["putshape"].is_array())
     {
         p_jmsg["putshape"] = varshape;
     }
-    if (p_jmsg["offset"] == nullptr)
+    if (not p_jmsg["offset"].is_array())
     {
         p_jmsg["offset"] = std::vector<size_t>(varshape.size(), 0);
     }
diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h
index a838077ed..dd8a8ff97 100644
--- a/source/dataman/DataManBase.h
+++ b/source/dataman/DataManBase.h
@@ -69,10 +69,10 @@ public:
 
     void print_next(std::ostream &out = std::cout);
 
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg) = 0;
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) = 0;
 
 protected:
-    bool auto_transform(const void *p_in, void *p_out, json &p_jmsg);
+    bool auto_transform(std::vector<char> &a_data, json &a_jmsg);
 
     void add_man_to_path(std::string p_new, std::string p_path, json p_jmsg);
 
diff --git a/source/dataman/DumpMan.cpp b/source/dataman/DumpMan.cpp
index af6120bf6..1a85e4bf3 100644
--- a/source/dataman/DumpMan.cpp
+++ b/source/dataman/DumpMan.cpp
@@ -47,33 +47,27 @@ int DumpMan::put(const void *p_data, json p_jmsg)
         numbers_to_print = product(putshape, 1);
     }
     size_t putbytes = product(putshape, dsize(dtype));
+    size_t sendbytes = p_jmsg["sendbytes"].get<size_t>();
 
     std::cout << p_jmsg.dump(4) << std::endl;
     std::cout << "total MBs = " << product(putshape, dsize(dtype)) / 1000000
               << std::endl;
 
-    const void *data_to_dump;
+    std::vector<char> data(static_cast<const char *>(p_data),
+                           static_cast<const char *>(p_data) + sendbytes);
 
-    std::vector<char> data(putbytes);
-
-    if (auto_transform(p_data, data.data(), p_jmsg))
-    {
-        data_to_dump = data.data();
-    }
-    else
-    {
-        data_to_dump = p_data;
-    }
+    auto_transform(data, p_jmsg);
 
+    void *data_to_print = data.data();
     for (size_t i = 0; i < numbers_to_print; i++)
     {
         if (dtype == "float")
         {
-            std::cout << static_cast<const float *>(data_to_dump)[i] << " ";
+            std::cout << static_cast<float *>(data_to_print)[i] << " ";
         }
         if (dtype == "double")
         {
-            std::cout << static_cast<const double *>(data_to_dump)[i] << " ";
+            std::cout << static_cast<double *>(data_to_print)[i] << " ";
         }
     }
 
diff --git a/source/dataman/DumpMan.h b/source/dataman/DumpMan.h
index 4b327bd6f..bc0647755 100644
--- a/source/dataman/DumpMan.h
+++ b/source/dataman/DumpMan.h
@@ -25,7 +25,7 @@ public:
     void flush();
     std::string name() { return "DumpMan"; }
     std::string type() { return "Dump"; }
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg){};
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
 
 private:
     bool m_dumping = true;
diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp
index 7fd0dd183..7955f8c2d 100644
--- a/source/dataman/MdtmMan.cpp
+++ b/source/dataman/MdtmMan.cpp
@@ -103,22 +103,16 @@ int MdtmMan::init(json p_jmsg)
     return 0;
 }
 
-int MdtmMan::put(const void *p_data, json p_jmsg)
+int MdtmMan::put(const void *a_data, json a_jmsg)
 {
-    put_begin(p_data, p_jmsg);
-
-    std::vector<size_t> putshape =
-        p_jmsg["putshape"].get<std::vector<size_t>>();
-    std::vector<size_t> varshape =
-        p_jmsg["varshape"].get<std::vector<size_t>>();
-    std::string dtype = p_jmsg["dtype"];
+    put_begin(a_data, a_jmsg);
 
+    // determine pipe to use
     int priority = 100;
-    if (p_jmsg["priority"].is_number_integer())
+    if (a_jmsg["priority"].is_number_integer())
     {
-        priority = p_jmsg["priority"].get<int>();
+        priority = a_jmsg["priority"].get<int>();
     }
-
     int index;
     if (m_parallel_mode == "round")
     {
@@ -137,38 +131,23 @@ int MdtmMan::put(const void *p_data, json p_jmsg)
     {
         index = closest(priority, pipe_desc["priority"], true);
     }
+    a_jmsg["pipe"] = pipe_desc["pipe_names"][index];
 
-    p_jmsg["pipe"] = pipe_desc["pipe_names"][index];
-    size_t putbytes = product(putshape, dsize(dtype));
-    p_jmsg["putbytes"] = putbytes;
-    size_t varbytes = product(varshape, dsize(dtype));
-    p_jmsg["varbytes"] = varbytes;
-
-    StreamMan::put(p_data, p_jmsg);
-
-    index = 0;
-    for (int i = 0; i < pipenames.size(); i++)
-    {
-        if (p_jmsg["pipe"].get<std::string>() == pipenames[i])
-        {
-            index = i;
-        }
-    }
-    std::string pipename = pipe_desc["pipe_prefix"].get<std::string>() +
-                           p_jmsg["pipe"].get<std::string>();
-    write(pipes[index], p_data, putbytes);
-    put_end(p_data, p_jmsg);
+    StreamMan::put(a_data, a_jmsg);
+    size_t sendbytes = a_jmsg["sendbytes"].get<size_t>();
+    write(pipes[index], a_data, sendbytes);
+    put_end(a_data, a_jmsg);
     return 0;
 }
 
 int MdtmMan::get(void *p_data, json &p_jmsg) { return 0; }
 
-void MdtmMan::on_recv(json jmsg)
+void MdtmMan::on_recv(json a_jmsg)
 {
 
     // push new request
-    jqueue.push(jmsg);
-    bqueue.push(nullptr);
+    jqueue.push(a_jmsg);
+    vqueue.push(std::vector<char>());
     iqueue.push(0);
 
     // for flush
@@ -176,12 +155,12 @@ void MdtmMan::on_recv(json jmsg)
     {
         callback();
         m_cache.clean_all("nan");
-        bqueue.pop();
-        iqueue.pop();
         jqueue.pop();
+        vqueue.pop();
+        iqueue.pop();
     }
 
-    if (jqueue.size() == 0)
+    if (jqueue.empty())
     {
         return;
     }
@@ -191,23 +170,21 @@ void MdtmMan::on_recv(json jmsg)
     {
         if (jqueue.front()["operation"] == "put")
         {
+            json &jmsg = jqueue.front();
+
             // allocate buffer
-            size_t putbytes = jqueue.front()["putbytes"].get<size_t>();
-            if (!bqueue.front())
-            {
-                bqueue.front() = malloc(putbytes);
-            }
+            size_t sendbytes = jmsg["sendbytes"].get<size_t>();
+            vqueue.front() = std::vector<char>(sendbytes);
 
             // determine the pipe for the head request
-            json msg = jqueue.front();
-            if (msg == nullptr)
+            if (jmsg == nullptr)
             {
                 break;
             }
             int pipeindex = 0;
             for (int i = 0; i < pipenames.size(); i++)
             {
-                if (msg["pipe"].get<std::string>() == pipenames[i])
+                if (jmsg["pipe"].get<std::string>() == pipenames[i])
                 {
                     pipeindex = i;
                 }
@@ -215,16 +192,14 @@ void MdtmMan::on_recv(json jmsg)
 
             // read the head request
             int error_times = 0;
-            int s = iqueue.front();
-            putbytes = msg["putbytes"].get<int>();
-            while (s < putbytes)
+            while (iqueue.front() < sendbytes)
             {
-                int ret =
-                    read(pipes[pipeindex],
-                         static_cast<char *>(bqueue.front()) + s, putbytes - s);
+                int ret = read(pipes[pipeindex],
+                               vqueue.front().data() + iqueue.front(),
+                               sendbytes - iqueue.front());
                 if (ret > 0)
                 {
-                    s += ret;
+                    iqueue.front() += ret;
                 }
                 else
                 {
@@ -237,22 +212,22 @@ void MdtmMan::on_recv(json jmsg)
                 }
             }
 
-            if (s == putbytes)
+            if (iqueue.front() == sendbytes)
             {
-                m_cache.put(bqueue.front(), msg);
-                if (bqueue.front())
+                if (a_jmsg["compression_method"].is_string())
                 {
-                    free(bqueue.front());
+                    if (a_jmsg["compression_method"].get<std::string>() !=
+                        "null")
+                    {
+                        auto_transform(vqueue.front(), a_jmsg);
+                    }
                 }
-                bqueue.pop();
-                iqueue.pop();
+                m_cache.put(vqueue.front().data(), jmsg);
                 jqueue.pop();
+                vqueue.pop();
+                iqueue.pop();
                 break;
             }
-            else
-            {
-                iqueue.front() = s;
-            }
         }
     }
 }
diff --git a/source/dataman/MdtmMan.h b/source/dataman/MdtmMan.h
index 018ded228..4a29da319 100644
--- a/source/dataman/MdtmMan.h
+++ b/source/dataman/MdtmMan.h
@@ -24,21 +24,21 @@ public:
     virtual int init(json p_jmsg);
     virtual int put(const void *p_data, json p_jmsg);
     virtual int get(void *p_data, json &p_jmsg);
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg){};
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
 
     void on_recv(json msg);
     std::string name() { return "MdtmMan"; }
 
 private:
-    void *zmq_ipc_req = NULL;
+    void *zmq_ipc_req = nullptr;
     int zmq_msg_size = 1024;
+    json pipe_desc;
     std::string getmode = "callback";
     std::vector<int> pipes;
     std::vector<std::string> pipenames;
     std::queue<json> jqueue;
-    std::queue<void *> bqueue;
+    std::queue<std::vector<char>> vqueue;
     std::queue<int> iqueue;
-    json pipe_desc;
 
 }; // end of class MdtmMan
 
diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp
index 4879d3588..6874b73eb 100644
--- a/source/dataman/StreamMan.cpp
+++ b/source/dataman/StreamMan.cpp
@@ -124,12 +124,12 @@ void StreamMan::zmq_meta_rep_thread_func()
     while (zmq_meta_rep_thread_active)
     {
         char msg[1024] = "";
-        int err = zmq_recv(zmq_meta, msg, 1024, ZMQ_NOBLOCK);
+        int ret = zmq_recv(zmq_meta, msg, 1024, ZMQ_NOBLOCK);
         std::string smsg = msg;
-        if (err >= 0)
+        if (ret >= 0)
         {
-            logging("StreamMan::zmq_meta_rep_thread_func: " + smsg);
             json j = json::parse(msg);
+            logging("StreamMan::zmq_meta_rep_thread_func: \n" + j.dump(4));
             on_recv(j);
         }
         usleep(10);
diff --git a/source/dataman/TemporalMan.cpp b/source/dataman/TemporalMan.cpp
index a3582e614..c87bf7b10 100644
--- a/source/dataman/TemporalMan.cpp
+++ b/source/dataman/TemporalMan.cpp
@@ -23,4 +23,4 @@ int TemporalMan::get(void *p_data, json &p_jmsg) { return 0; }
 
 void TemporalMan::flush() {}
 
-void TemporalMan::transform(const void *p_in, void *p_out, json &p_jmsg) {}
+void TemporalMan::transform(std::vector<char> &a_data, json &a_jmsg) {}
diff --git a/source/dataman/TemporalMan.h b/source/dataman/TemporalMan.h
index 8e120b8e4..5f0362d11 100644
--- a/source/dataman/TemporalMan.h
+++ b/source/dataman/TemporalMan.h
@@ -22,7 +22,7 @@ public:
     virtual int put(const void *p_data, json p_jmsg);
     virtual int get(void *p_data, json &p_jmsg);
     virtual void flush();
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg);
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg);
     std::string name() { return "TemporalMan"; }
 };
 
diff --git a/source/dataman/ZfpMan.cpp b/source/dataman/ZfpMan.cpp
index c1cf3fa9f..77a14ff78 100644
--- a/source/dataman/ZfpMan.cpp
+++ b/source/dataman/ZfpMan.cpp
@@ -25,7 +25,7 @@ int ZfpMan::put(const void *p_data, json p_jmsg)
 {
     put_begin(p_data, p_jmsg);
 
-    void *compressed_data = NULL;
+    void *compressed_data = nullptr;
     if (check_json(p_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan"))
     {
         if (not p_jmsg["compression_rate"].is_number())
@@ -171,8 +171,7 @@ void *ZfpMan::decompress(void *p_data, json p_jmsg)
         type = zfp_type_double;
     }
 
-    void *data;
-    data = malloc(product(shape, dsize(dtype)));
+    void *data = malloc(product(shape, dsize(dtype)));
 
     switch (shape.size())
     {
@@ -208,19 +207,20 @@ void *ZfpMan::decompress(void *p_data, json p_jmsg)
     return data;
 }
 
-void ZfpMan::transform(const void *p_in, void *p_out, json &p_jmsg)
+void ZfpMan::transform(std::vector<char> &a_data, json &a_jmsg)
 {
-
-    std::string dtype = p_jmsg["dtype"];
-    std::vector<size_t> shape = p_jmsg["putshape"].get<std::vector<size_t>>();
-    int compression_rate = p_jmsg["compression_rate"].get<int>();
+    std::string dtype = a_jmsg["dtype"];
+    std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>();
+    int compression_rate = a_jmsg["compression_rate"].get<int>();
+    size_t putbytes = a_jmsg["putbytes"].get<size_t>();
+    std::vector<char> output(putbytes);
 
     int status = 0; // return value: 0 = success
     uint dim = 1;
     zfp_type type = zfp_type_none; // array scalar type
     zfp_field *field;              // array meta data
     zfp_stream *zfp;               // compressed stream
-    size_t bufsize = p_jmsg["compressed_size"]
+    size_t bufsize = a_jmsg["compressed_size"]
                          .get<size_t>(); // byte size of compressed buffer
     bitstream *stream;                   // bit stream to write to or read from
     size_t zfpsize;                      // byte size of compressed stream
@@ -246,23 +246,23 @@ void ZfpMan::transform(const void *p_in, void *p_out, json &p_jmsg)
     switch (shape.size())
     {
     case 3:
-        field = zfp_field_3d(p_out, type, shape[0], shape[1], shape[2]);
+        field = zfp_field_3d(output.data(), type, shape[0], shape[1], shape[2]);
         dim = 3;
         break;
     case 2:
-        field = zfp_field_2d(p_out, type, shape[0], shape[1]);
+        field = zfp_field_2d(output.data(), type, shape[0], shape[1]);
         dim = 2;
         break;
     case 1:
-        field = zfp_field_1d(p_out, type, shape[0]);
+        field = zfp_field_1d(output.data(), type, shape[0]);
         break;
     default:
-        field = zfp_field_1d(p_out, type, product(shape));
+        field = zfp_field_1d(output.data(), type, product(shape));
     }
 
     zfp = zfp_stream_open(NULL);
     zfp_stream_set_rate(zfp, compression_rate, type, dim, 0);
-    stream = stream_open(const_cast<void *>(p_in), bufsize);
+    stream = stream_open(a_data.data(), bufsize);
     zfp_stream_set_bit_stream(zfp, stream);
     zfp_stream_rewind(zfp);
     if (!zfp_decompress(zfp, field))
@@ -274,7 +274,5 @@ void ZfpMan::transform(const void *p_in, void *p_out, json &p_jmsg)
     zfp_stream_close(zfp);
     stream_close(stream);
 
-    p_jmsg.erase("compression_rate");
-    p_jmsg.erase("compression_method");
-    p_jmsg.erase("compressed_size");
+    a_data = output;
 }
diff --git a/source/dataman/ZfpMan.h b/source/dataman/ZfpMan.h
index 94f75f92a..3af5c3353 100644
--- a/source/dataman/ZfpMan.h
+++ b/source/dataman/ZfpMan.h
@@ -24,7 +24,7 @@ public:
     virtual void flush();
     void *compress(void *p_data, json &p_jmsg);
     void *decompress(void *p_data, json p_jmsg);
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg);
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg);
     std::string name() { return "ZfpMan"; }
 private:
     double m_compression_rate = 8;
diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp
index 839a8eb81..1bed8f86b 100644
--- a/source/dataman/ZmqMan.cpp
+++ b/source/dataman/ZmqMan.cpp
@@ -21,9 +21,9 @@ ZmqMan::~ZmqMan()
         zmq_close(zmq_data);
 }
 
-int ZmqMan::init(json p_jmsg)
+int ZmqMan::init(json a_jmsg)
 {
-    StreamMan::init(p_jmsg);
+    StreamMan::init(a_jmsg);
     zmq_data = zmq_socket(zmq_context, ZMQ_PAIR);
     std::string local_address =
         make_address(m_local_ip, m_local_port + 1, "tcp");
@@ -42,44 +42,33 @@ int ZmqMan::init(json p_jmsg)
     return 0;
 }
 
-int ZmqMan::put(const void *p_data, json p_jmsg)
+int ZmqMan::put(const void *a_data, json a_jmsg)
 {
-    put_begin(p_data, p_jmsg);
-    StreamMan::put(p_data, p_jmsg);
-    zmq_send(zmq_data, p_data, p_jmsg["putbytes"], 0);
-    put_end(p_data, p_jmsg);
+    put_begin(a_data, a_jmsg);
+    StreamMan::put(a_data, a_jmsg);
+    zmq_send(zmq_data, a_data, a_jmsg["sendbytes"].get<size_t>(), 0);
+    put_end(a_data, a_jmsg);
     return 0;
 }
 
-int ZmqMan::get(void *p_data, json &p_jmsg) { return 0; }
+int ZmqMan::get(void *a_data, json &a_jmsg) { return 0; }
 
-void ZmqMan::on_recv(json msg)
+void ZmqMan::on_recv(json a_jmsg)
 {
-    if (msg["operation"] == "put")
+    if (a_jmsg["operation"].get<std::string>() == "put")
     {
-        if (msg["compression_method"] == nullptr)
-        {
-            size_t putbytes = msg["putbytes"].get<size_t>();
-            std::vector<char> data;
-            data.resize(putbytes);
-            int err = zmq_recv(zmq_data, data.data(), putbytes, 0);
-            m_cache.put(data.data(), msg);
-        }
-        else
+        size_t sendbytes = a_jmsg["sendbytes"].get<size_t>();
+        std::vector<char> data(sendbytes);
+        int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0);
+
+        if (a_jmsg["compression_method"].is_string() and
+            a_jmsg["compression_method"].get<std::string>() != "null")
         {
-            size_t putbytes = msg["putbytes"].get<size_t>();
-            size_t compressed_size = msg["compressed_size"].get<size_t>();
-            std::vector<char> compressed_data;
-            compressed_data.resize(compressed_size);
-            std::vector<char> data;
-            data.resize(putbytes);
-            int err =
-                zmq_recv(zmq_data, compressed_data.data(), compressed_size, 0);
-            auto_transform(compressed_data.data(), data.data(), msg);
-            m_cache.put(data.data(), msg);
+            auto_transform(data, a_jmsg);
         }
+        m_cache.put(data.data(), a_jmsg);
     }
-    else if (msg["operation"] == "flush")
+    else if (a_jmsg["operation"].get<std::string>() == "flush")
     {
         callback();
         m_cache.flush();
diff --git a/source/dataman/ZmqMan.h b/source/dataman/ZmqMan.h
index fc14026a7..48fa2bbe0 100644
--- a/source/dataman/ZmqMan.h
+++ b/source/dataman/ZmqMan.h
@@ -22,7 +22,7 @@ public:
     virtual int init(json p_jmsg);
     virtual int put(const void *p_data, json p_jmsg);
     virtual int get(void *p_data, json &p_jmsg);
-    virtual void transform(const void *p_in, void *p_out, json &p_jmsg){};
+    virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
 
     virtual void on_recv(json msg);
     std::string name() { return "ZmqMan"; }
-- 
GitLab