Newer
Older
#include <string.h>
#include "MantidDataHandling/ADARAParser.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/NetException.h"
#include "Poco/Exception.h"
using namespace ADARA;
/* ------------------------------------------------------------------------ */
Parser::Parser(unsigned int buffer_size, unsigned int max_pkt_size) :
m_size(buffer_size), m_max_size(max_pkt_size), m_len(0),
m_oversize_len(0)
{
m_buffer = new uint8_t[buffer_size];
}
Parser::~Parser()
{
delete [] m_buffer;
}
void Parser::reset(void)
{
m_len = 0;
m_oversize_len = 0;
}
#if 0
// Mantid doesn't use this, and trying to make it build on
// Windows would be a nightmare...
#include <unistd.h>
#include <errno.h>
bool Parser::read(int fd, unsigned int max_read)
{
unsigned long bytes_read = 0;
ssize_t rc;
while (!max_read || bytes_read < max_read) {
rc = ::read(fd, m_buffer + m_len, m_size - m_len);
if (rc < 0) {
switch (errno) {
case EINTR:
case EAGAIN:
/* We didn't get any data, but we're OK */
case EPIPE:
case ECONNRESET:
case ETIMEDOUT:
case EHOSTUNREACH:
case ENETUNREACH:
/* The host went away, but this shouldn't
* be fatal.
*/
return false;
default:
/* TODO consider if we should throw an
* exception at all.
*/
int err = errno;
std::string msg("Parser::read(): ");
msg += strerror(err);
throw std::runtime_error(msg);
}
}
if (rc == 0)
return false;
/* m_len cannot overflow when adding in rc, as we'll never
* ask for more data in the read() call than will fit.
m_len += (unsigned int) rc;
bytes_read += rc;
if (parseBuffer())
return false;
}
return true;
}
#endif
/* Added by RGM, 17 May 2012 */
bool Parser::read(Poco::Net::StreamSocket &stream, unsigned int max_read)
{
unsigned long bytes_read = 0;
int rc;
while (!max_read || bytes_read < max_read) {
try {
rc = stream.receiveBytes(m_buffer + m_len, m_size - m_len);
} catch (Poco::Net::NetException &e) {
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
std::string msg("Parser::read(): ");
msg += e.name();
throw std::runtime_error(msg);
}
if (rc == 0)
return false;
/* m_len cannot overflow when adding in rc, as we'll never
* ask for more data in the read() call than will fit.
*/
m_len += (unsigned int) rc;
bytes_read += rc;
if (parseBuffer())
return false;
}
return true;
}
bool Parser::parseBuffer(void)
{
uint8_t *p = m_buffer;
bool stopped = false;
/* If we're processing an oversize packet, then we will find its
* data at the front of the buffer. We'll either consume our
* entire buffer, or find the end of the oversize packet and
* process the rest of the buffer as normal.
*/
if (m_oversize_len) {
unsigned int chunk_len;
chunk_len = m_len < m_oversize_len ? m_len : m_oversize_len;
stopped = rxOversizePkt(NULL, p, m_oversize_offset, chunk_len);
m_oversize_offset += chunk_len;
m_oversize_len -= chunk_len;
m_len -= chunk_len;
p += chunk_len;
}
while (!stopped && m_len >= PacketHeader::header_length()) {
PacketHeader hdr(p);
if (hdr.payload_length() % 4)
throw invalid_packet("Payload length not "
"multiple of 4");
if (m_max_size < hdr.packet_length()) {
/* This packet is over the maximum limit; we'll
* call the oversize handler with this first
* chunk, consuming our entire buffer.
*/
stopped = rxOversizePkt(&hdr, p, 0, m_len);
m_oversize_len = hdr.payload_length() - m_len;
m_oversize_offset = m_len;
m_len = 0;
return stopped;
}
if (m_size < hdr.packet_length()) {
/* This packet is too big to possibly fit in our
* current buffer, so we need to grow. Once we've
* resized, return to our caller as we obviously
* don't have the full packet yet.
*/
uint8_t *new_buffer;
do {
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
} while (new_size < hdr.packet_length());
if (new_size > m_max_size)
new_size = m_max_size;
new_buffer = new uint8_t[new_size];
memcpy(new_buffer, p, m_len);
delete m_buffer;
m_buffer = new_buffer;
m_size = new_size;
return false;
}
if (m_len < hdr.packet_length())
break;
Packet pkt(p, hdr.packet_length());
p += hdr.packet_length();
m_len -= hdr.packet_length();
if (rxPacket(pkt)) {
stopped = true;
break;
}
}
/* If we have anything left over, shove it to the front.
*/
if (m_len && p != m_buffer)
memmove(m_buffer, p, m_len);
return stopped;
}
bool Parser::rxPacket(const Packet &pkt)
{
#define MAP_TYPE(pkt_type, obj_type) \
case pkt_type: { \
obj_type raw(pkt.packet(), pkt.packet_length()); \
return rxPacket(raw); \
}
switch (pkt.type()) {
MAP_TYPE(PacketType::RAW_EVENT_V0, RawDataPkt);
MAP_TYPE(PacketType::RTDL_V0, RTDLPkt);
MAP_TYPE(PacketType::SOURCE_LIST_V0, SourceListPkt);
MAP_TYPE(PacketType::BANKED_EVENT_V0, BankedEventPkt);
MAP_TYPE(PacketType::BEAM_MONITOR_EVENT_V0, BeamMonitorPkt);
MAP_TYPE(PacketType::PIXEL_MAPPING_V0, PixelMappingPkt);
MAP_TYPE(PacketType::RUN_STATUS_V0, RunStatusPkt);
MAP_TYPE(PacketType::RUN_INFO_V0, RunInfoPkt);
MAP_TYPE(PacketType::TRANS_COMPLETE_V0, TransCompletePkt);
MAP_TYPE(PacketType::CLIENT_HELLO_V0, ClientHelloPkt);
MAP_TYPE(PacketType::STREAM_ANNOTATION_V0, AnnotationPkt);
MAP_TYPE(PacketType::SYNC_V0, SyncPkt);
MAP_TYPE(PacketType::HEARTBEAT_V0, HeartbeatPkt);
MAP_TYPE(PacketType::GEOMETRY_V0, GeometryPkt);
MAP_TYPE(PacketType::BEAMLINE_INFO_V0, BeamlineInfoPkt);
MAP_TYPE(PacketType::DEVICE_DESC_V0, DeviceDescriptorPkt);
MAP_TYPE(PacketType::VAR_VALUE_U32_V0, VariableU32Pkt);
MAP_TYPE(PacketType::VAR_VALUE_DOUBLE_V0, VariableDoublePkt);
MAP_TYPE(PacketType::VAR_VALUE_STRING_V0, VariableStringPkt);
/* No default handler; we want the compiler to warn about
* the unhandled PacketType values when we add new packets.
*/
}
return rxUnknownPkt(pkt);
#undef MAP_TYPE
}
bool Parser::rxUnknownPkt(const Packet &)
{
/* Default is to discard the data */
return false;
}
bool Parser::rxOversizePkt(const PacketHeader *, const uint8_t *,
unsigned int, unsigned int)
{
/* Default is to discard the data */
return false;
}
#define EXPAND_HANDLER(type) \
bool Parser::rxPacket(const type &) { return false; }
EXPAND_HANDLER(RawDataPkt)
EXPAND_HANDLER(RTDLPkt)
EXPAND_HANDLER(BankedEventPkt)
EXPAND_HANDLER(BeamMonitorPkt)
EXPAND_HANDLER(PixelMappingPkt)
EXPAND_HANDLER(RunStatusPkt)
EXPAND_HANDLER(RunInfoPkt)
EXPAND_HANDLER(TransCompletePkt)
EXPAND_HANDLER(ClientHelloPkt)
EXPAND_HANDLER(SyncPkt)
EXPAND_HANDLER(HeartbeatPkt)
EXPAND_HANDLER(GeometryPkt)
EXPAND_HANDLER(BeamlineInfoPkt)
EXPAND_HANDLER(DeviceDescriptorPkt)
EXPAND_HANDLER(VariableU32Pkt)
EXPAND_HANDLER(VariableDoublePkt)
EXPAND_HANDLER(VariableStringPkt)