From 6bd33c49c3de69a3fbd1959af2eeac61d2f5c43a Mon Sep 17 00:00:00 2001
From: Ruonan Wang <jason.ruonan.wang@gmail.com>
Date: Wed, 19 Apr 2017 23:33:55 -0400
Subject: [PATCH] added all dataman plugins

---
 source/dataman/CMakeLists.txt  |  34 ++++-
 source/dataman/CompressMan.h   |  18 +++
 source/dataman/DumpMan.cpp     |  62 ++++++++
 source/dataman/DumpMan.h       |  32 +++++
 source/dataman/MdtmMan.cpp     | 206 +++++++++++++++++++++++++++
 source/dataman/MdtmMan.h       |  63 ++++++++
 source/dataman/StreamMan.cpp   | 102 +++++++++++++
 source/dataman/StreamMan.h     |  57 ++++++++
 source/dataman/TemporalMan.cpp |  34 +++++
 source/dataman/TemporalMan.h   |  26 ++++
 source/dataman/ZfpMan.cpp      | 253 +++++++++++++++++++++++++++++++++
 source/dataman/ZfpMan.h        |  26 ++++
 source/dataman/ZmqMan.cpp      |  64 +++++++++
 source/dataman/ZmqMan.h        |  35 +++++
 14 files changed, 1005 insertions(+), 7 deletions(-)
 create mode 100644 source/dataman/CompressMan.h
 create mode 100644 source/dataman/DumpMan.cpp
 create mode 100644 source/dataman/DumpMan.h
 create mode 100644 source/dataman/MdtmMan.cpp
 create mode 100644 source/dataman/MdtmMan.h
 create mode 100644 source/dataman/StreamMan.cpp
 create mode 100644 source/dataman/StreamMan.h
 create mode 100644 source/dataman/TemporalMan.cpp
 create mode 100644 source/dataman/TemporalMan.h
 create mode 100644 source/dataman/ZfpMan.cpp
 create mode 100644 source/dataman/ZfpMan.h
 create mode 100644 source/dataman/ZmqMan.cpp
 create mode 100644 source/dataman/ZmqMan.h

diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt
index a35ae5b88..d4bdcc86d 100644
--- a/source/dataman/CMakeLists.txt
+++ b/source/dataman/CMakeLists.txt
@@ -3,12 +3,7 @@
 # accompanying file Copyright.txt for details.
 #------------------------------------------------------------------------------#
 
-set(dataman_targets)
-
-add_library(dataman
-  DataManBase.h DataManBase.cpp
-  DataMan.h DataMan.cpp
-)
+add_library(dataman DataMan.cpp DataManBase.cpp)
 target_include_directories(dataman PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
 target_link_libraries(dataman PRIVATE adios2sys)
 list(APPEND dataman_targets dataman)
@@ -17,7 +12,7 @@ list(APPEND dataman_targets dataman)
 # MODULE libraries are designed to be plugins, i.e. shared libs that nobody
 # else links to.
 
-add_library(cacheman MODULE CacheMan.h CacheMan.cpp)
+add_library(cacheman SHARED CacheMan.cpp)
 target_link_libraries(cacheman PRIVATE dataman)
 list(APPEND dataman_targets cacheman)
 
@@ -46,9 +41,34 @@ if(ADIOS_USE_DataMan_ZFP)
 #  list(APPEND dataman_targets zfpman)
 endif()
 
+add_library(dumpman SHARED DumpMan.cpp)
+target_link_libraries(dumpman PRIVATE dataman)
+
+add_library(streamman SHARED StreamMan.cpp)
+target_link_libraries(streamman PRIVATE dataman cacheman zmq)
+
+add_library(mdtmman SHARED MdtmMan.cpp)
+target_link_libraries(mdtmman PRIVATE dataman cacheman streamman zmq)
+
+add_library(zmqman SHARED ZmqMan.cpp)
+target_link_libraries(zmqman PRIVATE dataman cacheman streamman zmq)
+
+add_library(zfpman SHARED ZfpMan.cpp)
+target_link_libraries(zfpman PRIVATE dataman zfp)
+
+add_library(temporalman SHARED TemporalMan.cpp)
+target_link_libraries(temporalman PRIVATE dataman)
+
 install(
+<<<<<<< 3cc571b50c67adec8a481ab201f5437f1a8d923b
   TARGETS ${dataman_targets} EXPORT adios2
+=======
+  TARGETS dataman cacheman dumpman mdtmman zmqman zfpman temporalman EXPORT adios2
+>>>>>>> added all dataman plugins
   RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
   LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
   ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
 )
+
+
+
diff --git a/source/dataman/CompressMan.h b/source/dataman/CompressMan.h
new file mode 100644
index 000000000..5a0b91a44
--- /dev/null
+++ b/source/dataman/CompressMan.h
@@ -0,0 +1,18 @@
+#ifndef COMPRESSMAN_H_
+#define COMPRESSMAN_H_
+
+
+#include "DataMan.h"
+
+using namespace std;
+
+
+class CompressMan : public DataManBase{
+    public:
+        CompressMan() = default;
+        ~CompressMan() = default;
+        virtual string type(){return "Compress";}
+};
+
+
+#endif
diff --git a/source/dataman/DumpMan.cpp b/source/dataman/DumpMan.cpp
new file mode 100644
index 000000000..4c712afb0
--- /dev/null
+++ b/source/dataman/DumpMan.cpp
@@ -0,0 +1,62 @@
+#include "DumpMan.h"
+
+
+int DumpMan::init(json p_jmsg){
+    if(p_jmsg["dumping"]!=nullptr){
+        m_dumping = p_jmsg["dumping"].get<bool>();
+    }
+    return 0;
+}
+int DumpMan::get(void *p_data, json &p_jmsg){
+    return 0;
+}
+
+int DumpMan::put(const void *p_data, json p_jmsg){
+    put_begin(p_data, p_jmsg);
+
+    if(!m_dumping){
+        return 0;
+    }
+
+    string doid = p_jmsg["doid"];
+    string var = p_jmsg["var"];
+    string dtype = p_jmsg["dtype"];
+    vector<size_t> putshape = p_jmsg["putshape"].get<vector<size_t>>();
+    vector<size_t> varshape = p_jmsg["varshape"].get<vector<size_t>>();
+    vector<size_t> offset = p_jmsg["offset"].get<vector<size_t>>();
+    int numbers_to_print = 100;
+    if(numbers_to_print > product(putshape,1)){
+        numbers_to_print = product(putshape,1);
+    }
+    size_t putbytes = product(putshape, dsize(dtype));
+
+    cout << p_jmsg.dump(4) << endl;
+    cout << "total MBs = " << product(putshape,dsize(dtype)) / 1000000 << endl;
+
+    const void *data_to_dump;
+
+    vector<char> data;
+    data.resize(putbytes);
+
+    if(auto_transform(p_data, data.data(), p_jmsg)){
+        data_to_dump = data.data();
+    }
+    else{
+        data_to_dump = p_data;
+    }
+
+    if(dtype == "float")
+        for (size_t i=0; i<numbers_to_print; i++) cout << ((float*)data_to_dump)[i] << " ";
+    if(dtype == "double")
+        for (size_t i=0; i<numbers_to_print; i++) cout << ((double*)data_to_dump)[i] << " ";
+
+    cout << endl;
+    put_end(p_data, p_jmsg);
+    return 0;
+}
+
+void DumpMan::flush(){
+}
+
+
+
diff --git a/source/dataman/DumpMan.h b/source/dataman/DumpMan.h
new file mode 100644
index 000000000..5f34bac95
--- /dev/null
+++ b/source/dataman/DumpMan.h
@@ -0,0 +1,32 @@
+#ifndef DUMPMAN_H_
+#define DUMPMAN_H_
+
+
+#include "DataMan.h"
+
+using namespace std;
+
+class DumpMan : public DataManBase{
+    public:
+        DumpMan() = default;
+        virtual ~DumpMan() = default;
+
+        virtual int init(json p_jmsg);
+        virtual int put(const void *p_data, json p_jmsg);
+        virtual int get(void *p_data, json &p_jmsg);
+        void flush();
+        string name(){return "DumpMan";}
+        string type(){return "Dump";}
+        virtual void transform(const void* p_in, void* p_out, json &p_jmsg){};
+
+    private:
+        bool m_dumping = true;
+};
+
+extern "C" DataManBase* getMan(){
+    return new DumpMan;
+}
+
+
+
+#endif
diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp
new file mode 100644
index 000000000..6f7eb0f89
--- /dev/null
+++ b/source/dataman/MdtmMan.cpp
@@ -0,0 +1,206 @@
+#include <sys/stat.h>
+#include <unistd.h>
+#include "zmq.h"
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include "MdtmMan.h"
+
+
+MdtmMan::MdtmMan()
+    :StreamMan()
+{}
+
+int MdtmMan::init(json p_jmsg){
+
+    StreamMan::init(p_jmsg);
+
+    if(p_jmsg["pipe_prefix"] == nullptr){
+        pipe_desc["pipe_prefix"] = "/tmp/MdtmManPipes/";
+    }
+    else{
+        pipe_desc["pipe_prefix"] = p_jmsg["pipe_prefix"];
+    }
+
+    pipe_desc["operation"] = "init";
+    pipe_desc["mode"] = m_stream_mode;
+
+    string pipename_prefix = "MdtmManPipe";
+    for(int i=0; i<m_num_channels; i++){
+        stringstream pipename;
+        pipename << pipename_prefix << i;
+        if(i==0){
+            pipe_desc["pipe_names"] = {pipename.str()};
+            pipe_desc["loss_tolerance"] = {m_tolerance[i]};
+            pipe_desc["priority"] = {m_priority[i]};
+        }
+        else{
+            pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(), pipename.str());
+            pipe_desc["loss_tolerance"].insert(pipe_desc["loss_tolerance"].end(), m_tolerance[i]);
+            pipe_desc["priority"].insert(pipe_desc["priority"].end(), m_priority[i]);
+        }
+    }
+
+    // ZMQ_DataMan_MDTM
+    if(m_stream_mode=="sender"){
+        zmq_ipc_req = zmq_socket (zmq_context, ZMQ_REQ);
+        zmq_connect (zmq_ipc_req, "ipc:///tmp/ADIOS_MDTM_pipe");
+        char buffer_return[10];
+        zmq_send (zmq_ipc_req, pipe_desc.dump().c_str(), pipe_desc.dump().length(), 0);
+        zmq_recv (zmq_ipc_req, buffer_return, 10, 0);
+    }
+
+    // Pipes
+    mkdir(pipe_desc["pipe_prefix"].get<string>().c_str(), 0755);
+    for (int i=0; i<pipe_desc["pipe_names"].size(); i++){
+        string filename = pipe_desc["pipe_prefix"].get<string>() + pipe_desc["pipe_names"][i].get<string>();
+        mkfifo(filename.c_str(), 0666);
+    }
+
+    for(int i=0; i<m_num_channels; i++){
+        stringstream pipename;
+        pipename << pipename_prefix << i;
+        string fullpipename = pipe_desc["pipe_prefix"].get<string>() + pipename.str();
+        if (m_stream_mode == "sender"){
+            int fp = open(fullpipename.c_str(), O_WRONLY);
+            pipes.push_back(fp);
+        }
+        if (m_stream_mode == "receiver"){
+            int fp = open(fullpipename.c_str(), O_RDONLY | O_NONBLOCK);
+            pipes.push_back(fp);
+        }
+        pipenames.push_back(pipename.str());
+    }
+    return 0;
+}
+
+MdtmMan::~MdtmMan(){
+    run_cmd("rm -rf " + pipe_desc["pipe_prefix"].get<string>());
+    if(zmq_ipc_req) zmq_close(zmq_ipc_req);
+}
+
+int MdtmMan::put(const void *p_data, json p_jmsg){
+    put_begin(p_data, p_jmsg);
+
+    vector<size_t> putshape = p_jmsg["putshape"].get<vector<size_t>>();
+    vector<size_t> varshape = p_jmsg["varshape"].get<vector<size_t>>();
+    string dtype = p_jmsg["dtype"];
+
+    int priority = 100;
+    if(p_jmsg["priority"] != nullptr){
+        priority = p_jmsg["priority"].get<int>();
+    }
+
+    int index;
+    if(m_parallel_mode == "round"){
+        if(m_current_channel == m_num_channels - 1){
+            index = 0;
+            m_current_channel = 0;
+        }
+        else{
+            m_current_channel ++;
+            index = m_current_channel;
+        }
+    }
+    else if(m_parallel_mode == "priority"){
+        index = closest(priority, pipe_desc["priority"], true);
+    }
+
+    p_jmsg["pipe"] = pipe_desc["pipe_names"][index];
+    size_t putbytes = product(putshape, dsize(dtype));
+    p_jmsg["putbytes"] = putbytes;
+    size_t varbytes = product(varshape, dsize(dtype));
+    p_jmsg["varbytes"] = varbytes;
+
+    StreamMan::put(p_data, p_jmsg);
+
+    index=0;
+    for(int i=0; i<pipenames.size(); i++){
+        if(p_jmsg["pipe"].get<string>() == pipenames[i]){
+            index=i;
+        }
+    }
+    string pipename = pipe_desc["pipe_prefix"].get<string>() + p_jmsg["pipe"].get<string>();
+    write(pipes[index], p_data, putbytes);
+    put_end(p_data, p_jmsg);
+    return 0;
+}
+
+int MdtmMan::get(void *p_data, json &p_jmsg){
+    return 0;
+}
+
+void MdtmMan::on_recv(json jmsg){
+    cout << "MdtmMan::on_recv " << endl;
+
+    // push new request
+    jqueue.push(jmsg);
+    bqueue.push(NULL);
+    iqueue.push(0);
+
+    // for flush
+    if(jqueue.front()["operation"] == "flush"){
+        callback();
+        m_cache.clean_all("nan");
+        bqueue.pop();
+        iqueue.pop();
+        jqueue.pop();
+    }
+
+    if(jqueue.size() == 0){
+        return;
+    }
+
+    // for put
+    for(int outloop=0; outloop<jqueue.size()*2; outloop++){
+        if(jqueue.front()["operation"] == "put"){
+            // allocate buffer
+            size_t putbytes = jqueue.front()["putbytes"].get<size_t>();
+            if(bqueue.front() == NULL) bqueue.front() = malloc(putbytes);
+
+            // determine the pipe for the head request
+            json msg = jqueue.front();
+            if(msg == nullptr) break;
+            int pipeindex=0;
+            for(int i=0; i<pipenames.size(); i++){
+                if(msg["pipe"].get<string>() == pipenames[i]){
+                    pipeindex=i;
+                }
+            }
+
+            // read the head request
+            int error_times=0;
+            int s = iqueue.front();
+            putbytes = msg["putbytes"].get<int>();
+            while(s<putbytes){
+                int ret = read(pipes[pipeindex], ((char*)bqueue.front()) + s, putbytes - s);
+                if(ret > 0){
+                    s += ret;
+                }
+                else{
+                    error_times++;
+                    continue;
+                }
+                if(error_times > 1000000){
+                    break;
+                }
+            }
+
+            if(s == putbytes){
+                m_cache.put(bqueue.front(),msg);
+                if(bqueue.front()) free(bqueue.front());
+                bqueue.pop();
+                iqueue.pop();
+                jqueue.pop();
+                break;
+            }
+            else{
+                iqueue.front()=s;
+            }
+        }
+    }
+}
+
+
+
diff --git a/source/dataman/MdtmMan.h b/source/dataman/MdtmMan.h
new file mode 100644
index 000000000..b760bdfbc
--- /dev/null
+++ b/source/dataman/MdtmMan.h
@@ -0,0 +1,63 @@
+#ifndef MDTMMAN_H_
+#define MDTMMAN_H_
+
+#include"StreamMan.h"
+#include <queue>
+
+using namespace std;
+
+class MdtmMan : public StreamMan{
+    public:
+        MdtmMan();
+        ~MdtmMan();
+
+        virtual int init(json p_jmsg);
+        virtual int put(const void *p_data, json p_jmsg);
+        virtual int get(void *p_data, json &p_jmsg);
+        virtual void transform(const void* p_in, void* p_out, json &p_jmsg){};
+
+        void on_recv(json msg);
+        string name(){return "MdtmMan";}
+
+    private:
+        void *zmq_ipc_req = NULL;
+        int zmq_msg_size = 1024;
+        string getmode = "callback";
+        json pipe_desc;
+        vector<int> pipes;
+        vector<string> pipenames;
+        queue<json> jqueue;
+        queue<void*> bqueue;
+        queue<int> iqueue;
+
+        string run_cmd(string cmd)
+        {
+            FILE *pipe = NULL;
+            char buffer[2048];
+            string result;
+            pipe = popen(cmd.c_str(), "r");
+            if (NULL == pipe)
+            {
+                perror("pipe");
+                return "";
+            }
+            while (!feof(pipe))
+            {
+                if (fgets(buffer, sizeof(buffer), pipe) != NULL)
+                    result = buffer;
+            }
+            pclose(pipe);
+            return result;
+        }
+
+}; // end of class MdtmMan
+
+extern "C" DataManBase* getMan(){
+    return new MdtmMan;
+}
+
+
+#endif
+
+
+
diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp
new file mode 100644
index 000000000..027f85ac1
--- /dev/null
+++ b/source/dataman/StreamMan.cpp
@@ -0,0 +1,102 @@
+#include <iostream>
+#include <unistd.h>
+#include <sstream>
+#include "StreamMan.h"
+#include "zmq.h"
+
+
+StreamMan::~StreamMan(){
+    if(zmq_meta) zmq_close(zmq_meta);
+    if(zmq_context) zmq_ctx_destroy(zmq_context);
+    zmq_meta_rep_thread_active = false;
+    if(zmq_meta_rep_thread){
+        if(zmq_meta_rep_thread->joinable())
+            zmq_meta_rep_thread->join();
+        delete zmq_meta_rep_thread;
+    }
+}
+
+int StreamMan::init(json p_jmsg){
+    if(check_json(p_jmsg, {"stream_mode", "remote_ip", "local_ip", "remote_port", "local_port" }, "StreamMan")){
+        m_stream_mode = p_jmsg["stream_mode"];
+        m_local_ip = p_jmsg["local_ip"];
+        m_remote_ip = p_jmsg["remote_ip"];
+        m_local_port = p_jmsg["local_port"];
+        m_remote_port = p_jmsg["remote_port"];
+        string remote_address = make_address(m_remote_ip, m_remote_port, "tcp");
+        string local_address = make_address(m_local_ip, m_local_port, "tcp");
+
+        m_tolerance.assign(m_num_channels, 0);
+        m_priority.assign(m_num_channels, 100);
+        if(p_jmsg["num_channels"] != nullptr) m_num_channels = p_jmsg["num_channels"];
+        if(p_jmsg["tolerance"] != nullptr) m_tolerance = p_jmsg["tolerance"].get<vector<int>>();
+        if(p_jmsg["priority"] != nullptr) m_priority = p_jmsg["priority"].get<vector<int>>();
+
+        if(!zmq_context){
+            zmq_context = zmq_ctx_new ();
+            zmq_meta = zmq_socket (zmq_context, ZMQ_PAIR);
+            if(m_stream_mode == "sender"){
+                zmq_connect (zmq_meta, remote_address.c_str());
+                cout << "StreamMan::init " << remote_address << " connected" << endl;
+            }
+            else if(m_stream_mode == "receiver"){
+                zmq_bind (zmq_meta, local_address.c_str());
+                cout << "StreamMan::init " << local_address << " bound" << endl;
+            }
+            zmq_meta_rep_thread_active = true;
+            zmq_meta_rep_thread = new thread(&StreamMan::zmq_meta_rep_thread_func, this);
+        }
+        return 0;
+    }
+    else{
+        return -1;
+    }
+}
+
+void StreamMan::callback(){
+    if(m_callback){
+        vector<string> do_list = m_cache.get_do_list();
+        for(string i : do_list){
+            vector<string> var_list = m_cache.get_var_list(i);
+            for(string j : var_list){
+                m_callback(m_cache.get_buffer(i,j),
+                        i,
+                        j,
+                        m_cache.get_dtype(i, j),
+                        m_cache.get_shape(i, j)
+                        );
+            }
+        }
+    }
+    else{
+        logging("callback called but callback function not registered!");
+    }
+}
+
+void StreamMan::flush(){
+    json msg;
+    msg["operation"] = "flush";
+    zmq_send(zmq_meta, msg.dump().c_str(), msg.dump().length(), 0);
+}
+
+void StreamMan::zmq_meta_rep_thread_func(){
+    while (zmq_meta_rep_thread_active){
+        char msg[1024]="";
+        int err = zmq_recv (zmq_meta, msg, 1024, ZMQ_NOBLOCK);
+        if (err>=0){
+            cout << "StreamMan::zmq_meta_rep_thread_func: " << msg << endl;
+            json j = json::parse(msg);
+            on_recv(j);
+        }
+        usleep(10);
+    }
+}
+
+int StreamMan::put(const void *p_data, json p_jmsg){
+    p_jmsg["operation"] = "put";
+    zmq_send(zmq_meta, p_jmsg.dump().c_str(), p_jmsg.dump().length(), 0);
+    return 0;
+}
+
+
+
diff --git a/source/dataman/StreamMan.h b/source/dataman/StreamMan.h
new file mode 100644
index 000000000..86fc359dc
--- /dev/null
+++ b/source/dataman/StreamMan.h
@@ -0,0 +1,57 @@
+#ifndef STREAMMAN_H_
+#define STREAMMAN_H_
+
+
+
+#include "DataMan.h"
+#include "CacheMan.h"
+#include <thread>
+
+using namespace std;
+
+
+class StreamMan : public DataManBase{
+    public:
+        StreamMan() = default;
+        virtual ~StreamMan();
+
+        virtual int init(json p_jmsg);
+        virtual int put(const void *p_data, json p_jmsg);
+        virtual void on_recv(json msg) = 0;
+        void flush();
+        virtual string type(){return "Stream";}
+
+    protected:
+        void *zmq_context = NULL;
+        CacheMan m_cache;
+        void callback();
+
+        string m_get_mode = "callback";
+        string m_stream_mode;
+        string m_local_ip;
+        string m_remote_ip;
+        int m_local_port;
+        int m_remote_port;
+        int m_num_channels = 1;
+        vector<int> m_tolerance;
+        vector<int> m_priority;
+
+        // parallel
+        string m_parallel_mode = "round"; // round, priority
+        int m_current_channel = 0;
+
+        inline string make_address(string ip, int port, string protocol){
+            stringstream address;
+            address << protocol << "://" << ip << ":" << port;
+            return address.str();
+        }
+
+    private:
+        void *zmq_meta = NULL;
+        void zmq_meta_rep_thread_func();
+        bool zmq_meta_rep_thread_active;
+        thread *zmq_meta_rep_thread=NULL;
+};
+
+
+#endif
diff --git a/source/dataman/TemporalMan.cpp b/source/dataman/TemporalMan.cpp
new file mode 100644
index 000000000..f26ee96c5
--- /dev/null
+++ b/source/dataman/TemporalMan.cpp
@@ -0,0 +1,34 @@
+#include "TemporalMan.h"
+
+
+TemporalMan::TemporalMan()
+    :CompressMan()
+{
+}
+
+TemporalMan::~TemporalMan()
+{
+}
+
+int TemporalMan::init(json p_jmsg){
+    return 0;
+}
+
+int TemporalMan::put(const void *p_data, json p_jmsg){
+    put_begin(p_data, p_jmsg);
+    put_end(p_data, p_jmsg);
+    return 0;
+}
+
+int TemporalMan::get(void *p_data, json &p_jmsg){
+    return 0;
+}
+
+void TemporalMan::flush(){
+
+}
+
+void TemporalMan::transform(const void* p_in, void* p_out, json &p_jmsg){
+
+}
+
diff --git a/source/dataman/TemporalMan.h b/source/dataman/TemporalMan.h
new file mode 100644
index 000000000..5327f4088
--- /dev/null
+++ b/source/dataman/TemporalMan.h
@@ -0,0 +1,26 @@
+#ifndef TEMPORALMAN_H_
+#define TEMPORALMAN_H_
+
+#include "CompressMan.h"
+
+
+class TemporalMan : public CompressMan{
+    public:
+        TemporalMan();
+        ~TemporalMan();
+        virtual int init(json p_jmsg);
+        virtual int put(const void *p_data, json p_jmsg);
+        virtual int get(void *p_data, json &p_jmsg);
+        virtual void flush();
+        virtual void transform(const void* p_in, void* p_out, json &p_jmsg);
+        string name(){return "TemporalMan";}
+};
+
+extern "C" DataManBase* getMan(){
+    return new TemporalMan;
+}
+
+
+#endif
+
+
diff --git a/source/dataman/ZfpMan.cpp b/source/dataman/ZfpMan.cpp
new file mode 100644
index 000000000..23a22ecd1
--- /dev/null
+++ b/source/dataman/ZfpMan.cpp
@@ -0,0 +1,253 @@
+#include "ZfpMan.h"
+#include <zfp.h>
+
+
+ZfpMan::ZfpMan()
+    :CompressMan()
+{
+}
+
+ZfpMan::~ZfpMan(){
+}
+
+int ZfpMan::init(json p_jmsg){
+    return 0;
+}
+
+int ZfpMan::put(const void *p_data, json p_jmsg){
+    put_begin(p_data, p_jmsg);
+
+    void *compressed_data = NULL;
+    if (check_json(p_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan")){
+        if (p_jmsg["compression_rate"] == nullptr){
+            p_jmsg["compression_rate"] = 4;
+        }
+        compressed_data = compress(const_cast<void*>(p_data), p_jmsg);
+    }
+
+    put_end(compressed_data, p_jmsg);
+    if(compressed_data) free(compressed_data);
+    return 0;
+}
+
+int ZfpMan::get(void *p_data, json &p_jmsg){
+    return 0;
+}
+
+void ZfpMan::flush(){
+    flush_next();
+}
+
+void* ZfpMan::compress(void* p_data, json &p_jmsg){
+
+    string dtype = p_jmsg["dtype"];
+    vector<size_t> shape = p_jmsg["putshape"].get<vector<size_t>>();
+    int compression_rate = p_jmsg["compression_rate"].get<int>();
+
+    int status = 0;    // return value: 0 = success
+    uint dim = 1;
+    zfp_type type = zfp_type_none;     // array scalar type
+    zfp_field* field;  // array meta data
+    zfp_stream* zfp;   // compressed stream
+    size_t bufsize;    // byte size of compressed buffer
+    bitstream* stream; // bit stream to write to or read from
+    size_t zfpsize;    // byte size of compressed stream
+
+    // allocate meta data for the 3D array a[nz][ny][nx]
+    if(dtype == "int"){
+        type = zfp_type_int32;
+    }
+    else if(dtype == "long"){
+        type = zfp_type_int64;
+    }
+    else if(dtype == "float"){
+        type = zfp_type_float;
+    }
+    else if(dtype == "double"){
+        type = zfp_type_double;
+    }
+
+    switch (shape.size()){
+        case 3:
+            field = zfp_field_3d(p_data, type, shape[0], shape[1], shape[2]);
+            dim = 3;
+            break;
+        case 2:
+            field = zfp_field_2d(p_data, type, shape[0], shape[1]);
+            dim = 2;
+            break;
+        case 1:
+            field = zfp_field_1d(p_data, type, shape[0]);
+            break;
+        default:
+            field = zfp_field_1d(p_data, type, product(shape));
+    }
+
+    // allocate meta data for a compressed stream
+    zfp = zfp_stream_open(NULL);
+
+    // set compression mode and parameters via one of three functions
+    zfp_stream_set_rate(zfp, compression_rate, type, dim, 0);
+    // zfp_stream_set_precision(zfp, m_precision, type);
+    // zfp_stream_set_accuracy(zfp, m_accuracy, type);
+
+    // allocate buffer for compressed data
+    bufsize = zfp_stream_maximum_size(zfp, field);
+    void *buffer = malloc(bufsize);
+
+    // associate bit stream with allocated buffer
+    stream = stream_open(buffer, bufsize);
+    zfp_stream_set_bit_stream(zfp, stream);
+    zfp_stream_rewind(zfp);
+
+    // compress or decompress entire array
+
+    zfpsize = zfp_compress(zfp, field);
+
+    if (!zfpsize) {
+        cout << "compression failed\n";
+        status = 1;
+    }
+
+    p_jmsg["compressed_size"] = bufsize;
+    p_jmsg["compression_method"] = "zfp";
+
+    // clean up
+    zfp_field_free(field);
+    zfp_stream_close(zfp);
+    stream_close(stream);
+
+    return buffer;
+}
+
+void* ZfpMan::decompress(void* p_data, json p_jmsg){
+
+    string dtype = p_jmsg["dtype"];
+    vector<size_t> shape = p_jmsg["putshape"].get<vector<size_t>>();
+    int compression_rate = p_jmsg["compression_rate"].get<int>();
+
+    int status = 0;    // return value: 0 = success
+    uint dim = 1;
+    zfp_type type = zfp_type_none;     // array scalar type
+    zfp_field* field;  // array meta data
+    zfp_stream* zfp;   // compressed stream
+    size_t bufsize = p_jmsg["compressed_size"].get<size_t>();    // byte size of compressed buffer
+    bitstream* stream; // bit stream to write to or read from
+    size_t zfpsize;    // byte size of compressed stream
+
+    // allocate meta data for the 3D array a[nz][ny][nx]
+    if(dtype == "int"){
+        type = zfp_type_int32;
+    }
+    else if(dtype == "long"){
+        type = zfp_type_int64;
+    }
+    else if(dtype == "float"){
+        type = zfp_type_float;
+    }
+    else if(dtype == "double"){
+        type = zfp_type_double;
+    }
+
+    void *data;
+    data=malloc(product(shape,dsize(dtype)));
+
+    switch (shape.size()){
+        case 3:
+            field = zfp_field_3d(data, type, shape[0], shape[1], shape[2]);
+            dim = 3;
+            break;
+        case 2:
+            field = zfp_field_2d(data, type, shape[0], shape[1]);
+            dim = 2;
+            break;
+        case 1:
+            field = zfp_field_1d(data, type, shape[0]);
+            break;
+        default:
+            field = zfp_field_1d(data, type, product(shape));
+    }
+
+    zfp = zfp_stream_open(NULL);
+    zfp_stream_set_rate(zfp, compression_rate, type, dim, 0);
+    stream = stream_open(p_data, bufsize);
+    zfp_stream_set_bit_stream(zfp, stream);
+    zfp_stream_rewind(zfp);
+    if (!zfp_decompress(zfp, field)) {
+      fprintf(stderr, "decompression failed\n");
+      status = 1;
+    }
+    zfp_field_free(field);
+    zfp_stream_close(zfp);
+    stream_close(stream);
+
+    return data;
+
+}
+
+void ZfpMan::transform(const void* p_in, void* p_out, json &p_jmsg){
+
+    string dtype = p_jmsg["dtype"];
+    vector<size_t> shape = p_jmsg["putshape"].get<vector<size_t>>();
+    int compression_rate = p_jmsg["compression_rate"].get<int>();
+
+    int status = 0;    // return value: 0 = success
+    uint dim = 1;
+    zfp_type type = zfp_type_none;     // array scalar type
+    zfp_field* field;  // array meta data
+    zfp_stream* zfp;   // compressed stream
+    size_t bufsize = p_jmsg["compressed_size"].get<size_t>();    // byte size of compressed buffer
+    bitstream* stream; // bit stream to write to or read from
+    size_t zfpsize;    // byte size of compressed stream
+
+    // allocate meta data for the 3D array a[nz][ny][nx]
+    if(dtype == "int"){
+        type = zfp_type_int32;
+    }
+    else if(dtype == "long"){
+        type = zfp_type_int64;
+    }
+    else if(dtype == "float"){
+        type = zfp_type_float;
+    }
+    else if(dtype == "double"){
+        type = zfp_type_double;
+    }
+
+    switch (shape.size()){
+        case 3:
+            field = zfp_field_3d(p_out, type, shape[0], shape[1], shape[2]);
+            dim = 3;
+            break;
+        case 2:
+            field = zfp_field_2d(p_out, type, shape[0], shape[1]);
+            dim = 2;
+            break;
+        case 1:
+            field = zfp_field_1d(p_out, type, shape[0]);
+            break;
+        default:
+            field = zfp_field_1d(p_out, type, product(shape));
+    }
+
+    zfp = zfp_stream_open(NULL);
+    zfp_stream_set_rate(zfp, compression_rate, type, dim, 0);
+    stream = stream_open(const_cast<void*>(p_in), bufsize);
+    zfp_stream_set_bit_stream(zfp, stream);
+    zfp_stream_rewind(zfp);
+    if (!zfp_decompress(zfp, field)) {
+      fprintf(stderr, "decompression failed\n");
+      status = 1;
+    }
+    zfp_field_free(field);
+    zfp_stream_close(zfp);
+    stream_close(stream);
+
+    p_jmsg.erase("compression_rate");
+    p_jmsg.erase("compression_method");
+    p_jmsg.erase("compressed_size");
+
+}
+
+
+
diff --git a/source/dataman/ZfpMan.h b/source/dataman/ZfpMan.h
new file mode 100644
index 000000000..9b887dc86
--- /dev/null
+++ b/source/dataman/ZfpMan.h
@@ -0,0 +1,26 @@
+#ifndef ZFPMAN_H_
+#define ZFPMAN_H_
+
+#include "CompressMan.h"
+
+
+class ZfpMan : public CompressMan{
+    public:
+        ZfpMan();
+        ~ZfpMan();
+        virtual int init(json p_jmsg);
+        virtual int put(const void *p_data, json p_jmsg);
+        virtual int get(void *p_data, json &p_jmsg);
+        virtual void flush();
+        void* compress(void* p_data, json &p_jmsg);
+        void* decompress(void* p_data, json p_jmsg);
+        virtual void transform(const void* p_in, void* p_out, json &p_jmsg);
+        string name(){return "ZfpMan";}
+};
+
+extern "C" DataManBase* getMan(){
+    return new ZfpMan;
+}
+
+
+#endif
diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp
new file mode 100644
index 000000000..f06a4d20a
--- /dev/null
+++ b/source/dataman/ZmqMan.cpp
@@ -0,0 +1,64 @@
+#include <sys/stat.h>
+#include <unistd.h>
+#include "ZmqMan.h"
+#include "zmq.h"
+
+
+ZmqMan::~ZmqMan(){
+    if(zmq_data) zmq_close(zmq_data);
+}
+
+int ZmqMan::init(json p_jmsg){
+    StreamMan::init(p_jmsg);
+    zmq_data = zmq_socket (zmq_context, ZMQ_PAIR);
+    string local_address = make_address(m_local_ip, m_local_port+1, "tcp");
+    string remote_address = make_address(m_remote_ip, m_remote_port+1, "tcp");
+    if(m_stream_mode=="sender"){
+        zmq_connect (zmq_data, remote_address.c_str());
+        cout << "ZmqMan::init " << remote_address << " connected" << endl;
+    }
+    else if(m_stream_mode=="receiver"){
+        zmq_bind (zmq_data, local_address.c_str());
+        cout << "ZmqMan::init " << local_address << " bound" << endl;
+    }
+    return 0;
+}
+
+int ZmqMan::put(const void *p_data, json p_jmsg){
+    put_begin(p_data, p_jmsg);
+    StreamMan::put(p_data, p_jmsg);
+    zmq_send(zmq_data, p_data, p_jmsg["putbytes"], 0);
+    put_end(p_data, p_jmsg);
+    return 0;
+}
+
+int ZmqMan::get(void *p_data, json &p_jmsg){
+    return 0;
+}
+
+void ZmqMan::on_recv(json msg){
+    if (msg["operation"] == "put"){
+        if(msg["compression_method"] == nullptr){
+            size_t putbytes = msg["putbytes"].get<size_t>();
+            vector<char> data;
+            data.resize(putbytes);
+            int err = zmq_recv (zmq_data, data.data(), putbytes, 0);
+            m_cache.put(data.data(), msg);
+        }
+        else{
+            size_t putbytes = msg["putbytes"].get<size_t>();
+            size_t compressed_size = msg["compressed_size"].get<size_t>();
+            vector<char> compressed_data; compressed_data.resize(compressed_size);
+            vector<char> data; data.resize(putbytes);
+            int err = zmq_recv (zmq_data, compressed_data.data(), compressed_size, 0);
+            auto_transform(compressed_data.data(), data.data(), msg);
+            m_cache.put(data.data(), msg);
+        }
+    }
+    else if (msg["operation"] == "flush"){
+        callback();
+        m_cache.flush();
+        m_cache.clean_all("nan");
+    }
+}
+
diff --git a/source/dataman/ZmqMan.h b/source/dataman/ZmqMan.h
new file mode 100644
index 000000000..99ca88e7e
--- /dev/null
+++ b/source/dataman/ZmqMan.h
@@ -0,0 +1,35 @@
+#ifndef ZMQMAN_H_
+#define ZMQMAN_H_
+
+#include"StreamMan.h"
+
+
+class ZmqMan : public StreamMan{
+    public:
+        ZmqMan() = default;
+        ~ZmqMan();
+
+        virtual int init(json p_jmsg);
+        virtual int put(const void *p_data, json p_jmsg);
+        virtual int get(void *p_data, json &p_jmsg);
+        virtual void transform(const void* p_in, void* p_out, json &p_jmsg){};
+
+
+        virtual void on_recv(json msg);
+        string name(){return "ZmqMan";}
+
+
+    private:
+        void *zmq_data = NULL;
+
+
+};
+
+
+extern "C" DataManBase* getMan(){
+    return new ZmqMan;
+}
+
+
+
+#endif
-- 
GitLab