Skip to content
Snippets Groups Projects
Commit 3aabfd40 authored by Ruonan Wang's avatar Ruonan Wang
Browse files

follow-ups for PR #87

parent 348e0422
No related branches found
No related tags found
1 merge request!87MdtmMan worked with ZfpMan for automatic compression and decompression
......@@ -31,9 +31,13 @@ void DataManWriter::WriteVariableCommon(Variable<T> &variable, const T *values)
// This part will go away, this is just to monitor variables per rank
if (variable.m_GlobalDimensions.empty())
{
variable.m_GlobalDimensions = variable.m_LocalDimensions;
}
if (variable.m_Offsets.empty())
{
variable.m_Offsets.assign(variable.m_LocalDimensions.size(), 0);
}
json jmsg;
jmsg["doid"] = m_Name;
......
......@@ -13,17 +13,15 @@
#include <algorithm>
#include <limits>
int CacheMan::init(json p_jmsg) { return 0; }
int CacheItem::init(json p_jmsg)
{
m_doid = p_jmsg["doid"];
m_var = p_jmsg["var"];
m_dtype = p_jmsg["dtype"];
m_doid = p_jmsg["doid"].get<std::string>();
m_var = p_jmsg["var"].get<std::string>();
m_dtype = p_jmsg["dtype"].get<std::string>();
m_varshape = p_jmsg["varshape"].get<std::vector<size_t>>();
m_bytes = dsize(m_dtype);
m_varsize = product(m_varshape);
m_varbytes = m_varsize * m_bytes;
m_dsize = p_jmsg["dsize"].get<size_t>();
m_varsize = p_jmsg["varsize"].get<size_t>();
m_varbytes = p_jmsg["varbytes"].get<size_t>();
if (m_buffer[m_timestep].size() != m_varbytes)
{
......@@ -34,12 +32,6 @@ int CacheItem::init(json p_jmsg)
int CacheItem::put(const void *p_data, json p_jmsg)
{
if (!check_json(p_jmsg,
{"doid", "var", "dtype", "varshape", "putshape", "offset"},
"CacheItem"))
{
return -1;
}
init(p_jmsg);
std::vector<size_t> p_putshape =
p_jmsg["putshape"].get<std::vector<size_t>>();
......@@ -47,23 +39,21 @@ int CacheItem::put(const void *p_data, json p_jmsg)
p_jmsg["varshape"].get<std::vector<size_t>>();
std::vector<size_t> p_offset = p_jmsg["offset"].get<std::vector<size_t>>();
size_t putsize = product(p_putshape);
size_t putsize = p_jmsg["putsize"].get<size_t>();
size_t chunksize = p_putshape.back();
for (size_t i = 0; i < putsize; i += chunksize)
{
std::vector<size_t> p = one2multi(p_putshape, i);
p = apply_offset(p, p_offset);
size_t ig = multi2one(p_varshape, p);
std::copy((char *)p_data + i * m_bytes,
(char *)p_data + i * m_bytes + chunksize * m_bytes,
m_buffer[m_timestep].data() + ig * m_bytes);
std::copy((char *)p_data + i * m_dsize,
(char *)p_data + i * m_dsize + chunksize * m_dsize,
m_buffer[m_timestep].data() + ig * m_dsize);
}
return 0;
}
int CacheItem::get(void *p_data, json &p_jmsg) { return 0; }
std::vector<size_t> CacheItem::get_shape() { return m_varshape; }
std::string CacheItem::get_dtype() { return m_dtype; }
......@@ -111,17 +101,11 @@ const void *CacheItem::get_buffer() { return m_buffer[m_timestep].data(); }
int CacheMan::put(const void *p_data, json p_jmsg)
{
if (check_json(p_jmsg, {"doid", "var"}, "CacheMan"))
{
std::string doid = p_jmsg["doid"];
std::string var = p_jmsg["var"];
return m_cache[doid][var].put(p_data, p_jmsg);
}
return -1;
std::string doid = p_jmsg["doid"].get<std::string>();
std::string var = p_jmsg["var"].get<std::string>();
return m_cache[doid][var].put(p_data, p_jmsg);
}
int CacheMan::get(void *p_data, json &p_jmsg) { return 0; }
void CacheMan::flush()
{
for (auto i : m_cache)
......@@ -180,5 +164,3 @@ std::string CacheMan::get_dtype(std::string doid, std::string var)
{
return m_cache[doid][var].get_dtype();
}
DataManBase *getMan() { return new CacheMan; }
......@@ -11,11 +11,14 @@
#ifndef DATAMAN_CACHEMAN_H_
#define DATAMAN_CACHEMAN_H_
#include "DataMan.h"
#include <queue>
class CacheItem : public DataManBase
#include "json.hpp"
class CacheItem
{
public:
using json = nlohmann::json;
CacheItem() = default;
virtual ~CacheItem() = default;
......@@ -24,12 +27,9 @@ public:
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual int get(void *p_data, json &p_jmsg);
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
void flush();
std::string name() { return "CacheItem"; }
std::string type() { return "Cache"; }
const void *get_buffer();
void clean(const std::string mode);
void remove(size_t timestep);
......@@ -41,7 +41,7 @@ private:
std::string m_doid;
std::string m_var;
std::string m_dtype;
size_t m_bytes;
size_t m_dsize;
size_t m_varsize;
size_t m_varbytes;
std::vector<size_t> m_varshape;
......@@ -87,21 +87,15 @@ private:
}
};
class CacheMan : public DataManBase
class CacheMan
{
public:
using json = nlohmann::json;
CacheMan() = default;
virtual ~CacheMan() = default;
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual int get(void *p_data, json &p_jmsg);
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
void flush();
std::string name() { return "CacheMan"; }
std::string type() { return "Cache"; }
const void *get_buffer(std::string doid, std::string var);
void clean(std::string doid, std::string var, std::string mode);
void clean_all(std::string mode);
......@@ -118,6 +112,4 @@ private:
CacheDoMap m_cache;
};
extern "C" DataManBase *getMan();
#endif
......@@ -10,38 +10,51 @@
#include "DataMan.h"
int DataMan::init(json p_jmsg) { return 0; }
int DataMan::init(json a_jmsg) { return 0; }
int DataMan::put(const void *p_data, std::string p_doid, std::string p_var,
int DataMan::put(const void *a_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_putshape,
std::vector<size_t> p_varshape, std::vector<size_t> p_offset,
size_t p_timestep, int p_tolerance, int p_priority)
{
return DataMan::put(p_data, p_doid, p_var, p_dtype, p_putshape, p_varshape,
return DataMan::put(a_data, p_doid, p_var, p_dtype, p_putshape, p_varshape,
p_offset, p_timestep, p_tolerance, p_priority);
}
int DataMan::put(const void *p_data, json p_jmsg)
int DataMan::put(const void *a_data, json a_jmsg)
{
put_begin(p_data, p_jmsg);
put_end(p_data, p_jmsg);
if (m_cache_size > 0)
{
// to be implemented
}
else
{
put_begin(a_data, a_jmsg);
put_end(a_data, a_jmsg);
}
return 0;
}
void DataMan::add_file(std::string p_method) {}
void DataMan::add_stream(json p_jmsg)
void DataMan::add_stream(json a_jmsg)
{
std::string method;
if (p_jmsg["method"].is_string())
if (a_jmsg["method"].is_string())
{
method = p_jmsg["method"];
method = a_jmsg["method"];
}
logging("Streaming method " + method + " added");
if (a_jmsg["cachesize"].is_number())
{
m_cache_size = a_jmsg["cachesize"].get<size_t>();
}
if (m_tolerance.size() < m_num_channels)
{
for (int i = 0; i < m_num_channels; i++)
......@@ -60,18 +73,18 @@ void DataMan::add_stream(json p_jmsg)
auto man = get_man(method);
if (man)
{
man->init(p_jmsg);
man->init(a_jmsg);
this->add_next(method, man);
}
if (p_jmsg["compression_method"].is_string())
if (a_jmsg["compression_method"].is_string())
{
if (p_jmsg["compression_method"] != "null")
if (a_jmsg["compression_method"] != "null")
{
add_man_to_path(p_jmsg["compression_method"], method, p_jmsg);
add_man_to_path(a_jmsg["compression_method"], method, a_jmsg);
}
}
}
void DataMan::flush() { flush_next(); }
int DataMan::get(void *p_data, json &p_jmsg) { return 0; }
int DataMan::get(void *a_data, json &a_jmsg) { return 0; }
......@@ -11,6 +11,7 @@
#ifndef DATAMAN_DATAMAN_H_
#define DATAMAN_DATAMAN_H_
#include "CacheMan.h"
#include "DataManBase.h"
class DataMan : public DataManBase
......@@ -33,13 +34,16 @@ public:
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
private:
std::string m_local_ip = "";
std::string m_remote_ip = "";
std::string m_local_ip;
std::string m_remote_ip;
int m_local_port = 0;
int m_remote_port = 0;
int m_num_channels = 0;
std::vector<int> m_tolerance;
std::vector<int> m_priority;
std::queue<CacheMan> m_cache_q;
size_t m_cache_size = 0;
size_t m_timestep = 0;
};
#endif /* DATAMAN_H_ */
......@@ -260,12 +260,12 @@ void DataManBase::print_next(std::ostream &out)
bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg)
{
if (a_jmsg["compression_method"].is_string() and
if (a_jmsg["compression_method"].is_string() &&
a_jmsg["compression_method"].get<std::string>() != "null")
{
auto method = a_jmsg["compression_method"].get<std::string>();
auto man = get_man(method);
if (not man)
if (!man)
{
logging("Library file for compression method " + method +
" not found!");
......@@ -548,6 +548,11 @@ void DataManBase::check_shape(json &p_jmsg)
{
p_jmsg["offset"] = std::vector<size_t>(varshape.size(), 0);
}
p_jmsg["dsize"] = dsize(p_jmsg["dtype"].get<std::string>());
p_jmsg["putsize"] = product(p_jmsg["putshape"].get<std::vector<size_t>>());
p_jmsg["varsize"] = product(varshape);
p_jmsg["putbytes"] = product(p_jmsg["putshape"].get<std::vector<size_t>>(),
dsize(p_jmsg["dtype"].get<std::string>()));
p_jmsg["varbytes"] =
......
......@@ -12,6 +12,7 @@
#define DATAMAN_STREAMMAN_H_
#include "CacheMan.h"
#include "DataManBase.h"
#include <thread>
......
......@@ -12,47 +12,42 @@
#include <zfp.h>
int ZfpMan::init(json p_jmsg)
int ZfpMan::init(json a_jmsg)
{
if (p_jmsg["compression_rate"].is_number())
if (a_jmsg["compression_rate"].is_number())
{
m_compression_rate = p_jmsg["compression_rate"].get<double>();
m_compression_rate = a_jmsg["compression_rate"].get<double>();
}
return 0;
}
int ZfpMan::put(const void *p_data, json p_jmsg)
int ZfpMan::put(const void *a_data, json a_jmsg)
{
put_begin(p_data, p_jmsg);
put_begin(a_data, a_jmsg);
void *compressed_data = nullptr;
if (check_json(p_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan"))
std::vector<char> compressed_data;
if (check_json(a_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan"))
{
if (not p_jmsg["compression_rate"].is_number())
if (not a_jmsg["compression_rate"].is_number())
{
p_jmsg["compression_rate"] = m_compression_rate;
a_jmsg["compression_rate"] = m_compression_rate;
}
compressed_data = compress(const_cast<void *>(p_data), p_jmsg);
compress(const_cast<void *>(a_data), compressed_data, a_jmsg);
}
put_end(compressed_data, p_jmsg);
if (compressed_data)
{
free(compressed_data);
}
put_end(compressed_data.data(), a_jmsg);
return 0;
}
int ZfpMan::get(void *p_data, json &p_jmsg) { return 0; }
int ZfpMan::get(void *a_data, json &a_jmsg) { return 0; }
void ZfpMan::flush() { flush_next(); }
void *ZfpMan::compress(void *p_data, json &p_jmsg)
int ZfpMan::compress(void *a_input, std::vector<char> &a_output, json &a_jmsg)
{
std::string dtype = p_jmsg["dtype"];
std::vector<size_t> shape = p_jmsg["putshape"].get<std::vector<size_t>>();
int compression_rate = p_jmsg["compression_rate"].get<int>();
std::string dtype = a_jmsg["dtype"];
std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>();
int compression_rate = a_jmsg["compression_rate"].get<int>();
int status = 0; // return value: 0 = success
uint dim = 1;
......@@ -84,18 +79,18 @@ void *ZfpMan::compress(void *p_data, json &p_jmsg)
switch (shape.size())
{
case 3:
field = zfp_field_3d(p_data, type, shape[0], shape[1], shape[2]);
field = zfp_field_3d(a_input, type, shape[0], shape[1], shape[2]);
dim = 3;
break;
case 2:
field = zfp_field_2d(p_data, type, shape[0], shape[1]);
field = zfp_field_2d(a_input, type, shape[0], shape[1]);
dim = 2;
break;
case 1:
field = zfp_field_1d(p_data, type, shape[0]);
field = zfp_field_1d(a_input, type, shape[0]);
break;
default:
field = zfp_field_1d(p_data, type, product(shape));
field = zfp_field_1d(a_input, type, product(shape));
}
// allocate meta data for a compressed stream
......@@ -108,10 +103,10 @@ void *ZfpMan::compress(void *p_data, json &p_jmsg)
// allocate buffer for compressed data
bufsize = zfp_stream_maximum_size(zfp, field);
void *buffer = malloc(bufsize);
a_output.resize(bufsize);
// associate bit stream with allocated buffer
stream = stream_open(buffer, bufsize);
stream = stream_open(a_output.data(), bufsize);
zfp_stream_set_bit_stream(zfp, stream);
zfp_stream_rewind(zfp);
......@@ -125,30 +120,30 @@ void *ZfpMan::compress(void *p_data, json &p_jmsg)
status = 1;
}
p_jmsg["compressed_size"] = bufsize;
p_jmsg["compression_method"] = "zfp";
a_jmsg["compressed_size"] = bufsize;
a_jmsg["compression_method"] = "zfp";
// clean up
zfp_field_free(field);
zfp_stream_close(zfp);
stream_close(stream);
return buffer;
return 0;
}
void *ZfpMan::decompress(void *p_data, json p_jmsg)
int ZfpMan::decompress(void *a_input, std::vector<char> &a_output, json &a_jmsg)
{
std::string dtype = p_jmsg["dtype"];
std::vector<size_t> shape = p_jmsg["putshape"].get<std::vector<size_t>>();
int compression_rate = p_jmsg["compression_rate"].get<int>();
std::string dtype = a_jmsg["dtype"];
std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>();
int compression_rate = a_jmsg["compression_rate"].get<int>();
int status = 0; // return value: 0 = success
uint dim = 1;
zfp_type type = zfp_type_none; // array scalar type
zfp_field *field; // array meta data
zfp_stream *zfp; // compressed stream
size_t bufsize = p_jmsg["compressed_size"]
size_t bufsize = a_jmsg["compressed_size"]
.get<size_t>(); // byte size of compressed buffer
bitstream *stream; // bit stream to write to or read from
size_t zfpsize; // byte size of compressed stream
......@@ -171,28 +166,29 @@ void *ZfpMan::decompress(void *p_data, json p_jmsg)
type = zfp_type_double;
}
void *data = malloc(product(shape, dsize(dtype)));
a_output.resize(product(shape, dsize(dtype)));
switch (shape.size())
{
case 3:
field = zfp_field_3d(data, type, shape[0], shape[1], shape[2]);
field =
zfp_field_3d(a_output.data(), type, shape[0], shape[1], shape[2]);
dim = 3;
break;
case 2:
field = zfp_field_2d(data, type, shape[0], shape[1]);
field = zfp_field_2d(a_output.data(), type, shape[0], shape[1]);
dim = 2;
break;
case 1:
field = zfp_field_1d(data, type, shape[0]);
field = zfp_field_1d(a_output.data(), type, shape[0]);
break;
default:
field = zfp_field_1d(data, type, product(shape));
field = zfp_field_1d(a_output.data(), type, product(shape));
}
zfp = zfp_stream_open(NULL);
zfp_stream_set_rate(zfp, compression_rate, type, dim, 0);
stream = stream_open(p_data, bufsize);
stream = stream_open(a_input, bufsize);
zfp_stream_set_bit_stream(zfp, stream);
zfp_stream_rewind(zfp);
if (!zfp_decompress(zfp, field))
......@@ -204,7 +200,7 @@ void *ZfpMan::decompress(void *p_data, json p_jmsg)
zfp_stream_close(zfp);
stream_close(stream);
return data;
return 0;
}
void ZfpMan::transform(std::vector<char> &a_data, json &a_jmsg)
......
......@@ -18,13 +18,13 @@ class ZfpMan : public CompressMan
public:
ZfpMan() = default;
virtual ~ZfpMan() = default;
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual int get(void *p_data, json &p_jmsg);
virtual void flush();
void *compress(void *p_data, json &p_jmsg);
void *decompress(void *p_data, json p_jmsg);
virtual int init(json a_jmsg);
virtual int put(const void *a_data, json a_jmsg);
virtual int get(void *a_data, json &a_jmsg);
virtual void transform(std::vector<char> &a_data, json &a_jmsg);
virtual void flush();
int compress(void *a_input, std::vector<char> &a_output, json &a_jmsg);
int decompress(void *a_input, std::vector<char> &a_output, json &a_jmsg);
std::string name() { return "ZfpMan"; }
private:
double m_compression_rate = 8;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment