Newer
Older
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* DataMan.cpp
*
* Created on: Apr 12, 2017
* Author: Jason Wang
*/
#include "DataMan.h"
int DataMan::put(const void *a_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_putshape,
std::vector<size_t> p_varshape, std::vector<size_t> p_offset,
size_t p_timestep, int p_tolerance, int p_priority)
return DataMan::put(a_data, p_doid, p_var, p_dtype, p_putshape, p_varshape,
p_offset, p_timestep, p_tolerance, p_priority);
}
check_shape(a_jmsg);
m_cache.put(a_data, a_jmsg);
}
else
{
put_begin(a_data, a_jmsg);
put_end(a_data, a_jmsg);
}
void DataMan::add_file(std::string p_method) {}
std::string method;
logging("Streaming method " + method + " added");
if (a_jmsg["cachesize"].is_number())
{
m_cache_size = a_jmsg["cachesize"].get<size_t>();
}
if (m_tolerance.size() < m_num_channels)
{
for (int i = 0; i < m_num_channels; i++)
{
m_tolerance.push_back(0);
}
}
if (m_priority.size() < m_num_channels)
{
for (int i = 0; i < m_num_channels; i++)
{
m_priority.push_back(100 / (i + 1));
}
}
auto man = get_man(method);
if (man)
{
this->add_next(method, man);
}
add_man_to_path(a_jmsg["compression_method"], method, a_jmsg);
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
void DataMan::flush()
{
m_timestep++;
if (m_cache_size > 0)
{
if (m_cache_size == m_cache.get_timesteps_cached())
{
for (int i = 0; i < m_cache_size; i++)
{
std::vector<std::string> do_list = m_cache.get_do_list();
for (auto j : do_list)
{
std::vector<std::string> var_list = m_cache.get_var_list(j);
for (auto k : var_list)
{
json jmsg = m_cache.get_jmsg(j, k);
put_begin(m_cache.get(j, k), jmsg);
put_end(m_cache.get(j, k), jmsg);
}
}
flush_next();
m_cache.pop();
}
}
else
{
m_cache.push();
}
}
else
{
flush_next();
}
}
int DataMan::get(void *a_data, json &a_jmsg) { return 0; }