Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include "MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h"
#include "MantidKernel/make_unique.h"
#include "MantidKernel/WarningSuppressions.h"
GCC_DIAG_OFF(conversion)
#include "private/Kafka/Schema/event_schema_generated.h"
GCC_DIAG_ON(conversion)
#include <iostream>
namespace Mantid {
namespace LiveData {
// -----------------------------------------------------------------------------
// Public members
// -----------------------------------------------------------------------------
/**
* Constructor
* @param broker A reference to a Broker object for creating topic streams
* @param eventTopic The name of the topic streaming the event data
* @param runInfoTopic The name of the topic streaming the run information
* @param spDetTopic The name of the topic streaming the spectrum-detector
* mapping
*/
ISISKafkaEventStreamDecoder::ISISKafkaEventStreamDecoder(
const IKafkaBroker &broker, std::string eventTopic,
std::string runInfoTopic, std::string spDetTopic)
: m_eventStream(broker.subscribe(eventTopic)) /*,
m_runInfoStream(broker.subscribe(runInfoTopic)),
m_spDetStream(broker.subscribe(spDetTopic))*/ {}
/**
* Start decoding data from the streams into the internal buffers. This can be
* run in a separate thread by passing the address of this object to
* Poco::Thread::start()
*/
void ISISKafkaEventStreamDecoder::run() {
std::string resizableBuffer;
m_eventStream->consumeMessage(&resizableBuffer);
if (resizableBuffer.empty()) {
std::cerr << "Empty event stream - Is anything running?\n";
} else {
const uint8_t *buf =
reinterpret_cast<const uint8_t *>(resizableBuffer.c_str());
auto messageData = ISISDAE::GetEventMessage(buf);
if (messageData->message_type() == ISISDAE::MessageTypes_FramePart) {
auto frameData =
static_cast<const ISISDAE::FramePart *>(messageData->message());
auto eventData = frameData->n_events();
auto evtSpecNum = eventData->spec();
auto evtTof = eventData->tof();
std::cerr << "Consumed " << evtSpecNum->size()
<< " events from the stream\n";
std::cerr << "First TOF = " << (*evtTof)[0] << "\n";
}
}
}
} // namespace LiveData
} // namespace Mantid