Skip to content
Snippets Groups Projects
Unverified Commit df0f3357 authored by Gigg, Martyn Anthony's avatar Gigg, Martyn Anthony Committed by GitHub
Browse files

Merge pull request #26721 from DanNixon/v20_1to1_kafka_spec_det_map

Create 1:1 spec-det map when message is missing
parents 631b5e2a 52414d55
No related branches found
No related tags found
No related merge requests found
...@@ -101,8 +101,13 @@ void IKafkaStreamDecoder::startCapture(bool startNow) { ...@@ -101,8 +101,13 @@ void IKafkaStreamDecoder::startCapture(bool startNow) {
// Get last two messages in run topic to ensure we get a runStart message // Get last two messages in run topic to ensure we get a runStart message
m_runStream = m_runStream =
m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO); m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO);
m_spDetStream = try {
m_broker->subscribe({m_spDetTopic}, SubscribeAtOption::LASTONE); m_spDetStream =
m_broker->subscribe({m_spDetTopic}, SubscribeAtOption::LASTONE);
} catch (const std::runtime_error &) {
g_log.debug()
<< "No detector-spectrum map message found, will assume a 1:1 mapping.";
}
m_thread = std::thread([this]() { this->captureImpl(); }); m_thread = std::thread([this]() { this->captureImpl(); });
m_thread.detach(); m_thread.detach();
...@@ -341,21 +346,25 @@ void IKafkaStreamDecoder::waitForRunEndObservation() { ...@@ -341,21 +346,25 @@ void IKafkaStreamDecoder::waitForRunEndObservation() {
* current run start time * current run start time
* *
* @param runStartStruct details of the current run * @param runStartStruct details of the current run
* @return received detector-spectrum map message buffer * @return received detector-spectrum map message buffer, empty string if a
* mapping was not streamed
*/ */
std::string IKafkaStreamDecoder::getDetSpecMapForRun( std::string IKafkaStreamDecoder::getDetSpecMapForRun(
const IKafkaStreamDecoder::RunStartStruct &runStartStruct) { const IKafkaStreamDecoder::RunStartStruct &runStartStruct) {
std::string rawMsgBuffer; std::string rawMsgBuffer;
int64_t offset; try {
int32_t partition; m_spDetStream = m_broker->subscribe(
std::string topicName; {m_spDetTopic}, nanosecondsToMilliseconds(runStartStruct.startTime),
m_spDetStream = m_broker->subscribe( SubscribeAtOption::TIME);
{m_spDetTopic}, nanosecondsToMilliseconds(runStartStruct.startTime), int64_t offset;
SubscribeAtOption::TIME); int32_t partition;
m_spDetStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName); std::string topicName;
m_spDetStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName);
} catch (const std::runtime_error &) {
}
if (rawMsgBuffer.empty()) { if (rawMsgBuffer.empty()) {
std::runtime_error("No detector-spectrum map message found for run " + g_log.debug() << "No detector-spectrum map message found for run "
runStartStruct.runId); << runStartStruct.runId << ", will assume a 1:1 mapping.";
} }
return rawMsgBuffer; return rawMsgBuffer;
} }
......
...@@ -206,7 +206,9 @@ void KafkaEventStreamDecoder::captureImplExcept() { ...@@ -206,7 +206,9 @@ void KafkaEventStreamDecoder::captureImplExcept() {
int64_t offset; int64_t offset;
int32_t partition; int32_t partition;
std::string topicName; std::string topicName;
m_spDetStream->consumeMessage(&buffer, offset, partition, topicName); if (m_spDetStream) {
m_spDetStream->consumeMessage(&buffer, offset, partition, topicName);
}
auto runStartStruct = getRunStartMessage(runBuffer); auto runStartStruct = getRunStartMessage(runBuffer);
initLocalCaches(buffer, runStartStruct); initLocalCaches(buffer, runStartStruct);
...@@ -530,40 +532,60 @@ void KafkaEventStreamDecoder::sampleDataFromMessage(const std::string &buffer) { ...@@ -530,40 +532,60 @@ void KafkaEventStreamDecoder::sampleDataFromMessage(const std::string &buffer) {
*/ */
void KafkaEventStreamDecoder::initLocalCaches( void KafkaEventStreamDecoder::initLocalCaches(
const std::string &rawMsgBuffer, const RunStartStruct &runStartData) { const std::string &rawMsgBuffer, const RunStartStruct &runStartData) {
if (rawMsgBuffer.empty()) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from spectrum-detector "
"topic. Unable to continue");
}
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra());
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec="
<< nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
m_runId = runStartData.runId; m_runId = runStartData.runId;
// Create buffer const auto jsonGeometry = runStartData.nexusStructure;
auto eventBuffer = createBufferWorkspace<DataObjects::EventWorkspace>( const auto instName = runStartData.instrumentName;
"EventWorkspace", static_cast<size_t>(spDetMsg->n_spectra()),
spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet); DataObjects::EventWorkspace_sptr eventBuffer;
if (rawMsgBuffer.empty()) {
// Load the instrument if possible but continue if we can't /* Load the instrument to get the number of spectra :c */
auto jsonGeometry = runStartData.nexusStructure; auto ws =
auto instName = runStartData.instrumentName; API::WorkspaceFactory::Instance().create("EventWorkspace", 1, 2, 1);
if (!instName.empty()) loadInstrument<API::MatrixWorkspace>(instName, ws, jsonGeometry);
loadInstrument<DataObjects::EventWorkspace>(instName, eventBuffer, const auto nspec = ws->getInstrument()->getNumberDetectors();
jsonGeometry);
else // Create buffer
g_log.warning( eventBuffer = boost::static_pointer_cast<DataObjects::EventWorkspace>(
"Empty instrument name received. Continuing without instrument"); API::WorkspaceFactory::Instance().create("EventWorkspace", nspec, 2,
1));
eventBuffer->setInstrument(ws->getInstrument());
eventBuffer->rebuildSpectraMapping();
eventBuffer->getAxis(0)->unit() =
Kernel::UnitFactory::Instance().create("TOF");
eventBuffer->setYUnit("Counts");
} else {
/* Parse mapping from stream */
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra());
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec="
<< nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
// Create buffer
eventBuffer = createBufferWorkspace<DataObjects::EventWorkspace>(
"EventWorkspace", static_cast<size_t>(spDetMsg->n_spectra()),
spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet);
// Load the instrument if possible but continue if we can't
if (!instName.empty()) {
loadInstrument<DataObjects::EventWorkspace>(instName, eventBuffer,
jsonGeometry);
if (rawMsgBuffer.empty()) {
eventBuffer->rebuildSpectraMapping();
}
} else {
g_log.warning(
"Empty instrument name received. Continuing without instrument");
}
}
auto &mutableRun = eventBuffer->mutableRun(); auto &mutableRun = eventBuffer->mutableRun();
// Run start. Cache locally for computing frame times // Run start. Cache locally for computing frame times
......
...@@ -206,40 +206,57 @@ void KafkaHistoStreamDecoder::captureImplExcept() { ...@@ -206,40 +206,57 @@ void KafkaHistoStreamDecoder::captureImplExcept() {
void KafkaHistoStreamDecoder::initLocalCaches( void KafkaHistoStreamDecoder::initLocalCaches(
const std::string &rawMsgBuffer, const RunStartStruct &runStartData) { const std::string &rawMsgBuffer, const RunStartStruct &runStartData) {
if (rawMsgBuffer.empty()) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from spectrum-detector "
"topic. Unable to continue");
}
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra());
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec="
<< nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
m_runId = runStartData.runId; m_runId = runStartData.runId;
// Create buffer const auto jsonGeometry = runStartData.nexusStructure;
auto histoBuffer = createBufferWorkspace<DataObjects::Workspace2D>( const auto instName = runStartData.instrumentName;
"Workspace2D", static_cast<size_t>(spDetMsg->n_spectra()),
spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet); DataObjects::Workspace2D_sptr histoBuffer;
if (rawMsgBuffer.empty()) {
// Load the instrument if possible but continue if we can't /* Load the instrument to get the number of spectra :c */
auto jsonGeometry = runStartData.nexusStructure; auto ws = API::WorkspaceFactory::Instance().create("Workspace2D", 1, 2, 1);
auto instName = runStartData.instrumentName; loadInstrument<API::MatrixWorkspace>(instName, ws, jsonGeometry);
if (!instName.empty()) const auto nspec = ws->getInstrument()->getNumberDetectors();
loadInstrument<DataObjects::Workspace2D>(instName, histoBuffer,
jsonGeometry); // Create buffer
else histoBuffer = boost::static_pointer_cast<DataObjects::Workspace2D>(
g_log.warning( API::WorkspaceFactory::Instance().create("Workspace2D", nspec, 2, 1));
"Empty instrument name received. Continuing without instrument"); histoBuffer->setInstrument(ws->getInstrument());
histoBuffer->rebuildSpectraMapping();
histoBuffer->getAxis(0)->unit() =
Kernel::UnitFactory::Instance().create("TOF");
histoBuffer->setYUnit("Counts");
} else {
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra());
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec="
<< nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
// Create buffer
histoBuffer = createBufferWorkspace<DataObjects::Workspace2D>(
"Workspace2D", static_cast<size_t>(spDetMsg->n_spectra()),
spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet);
// Load the instrument if possible but continue if we can't
if (!instName.empty()) {
loadInstrument<DataObjects::Workspace2D>(instName, histoBuffer,
jsonGeometry);
if (rawMsgBuffer.empty()) {
histoBuffer->rebuildSpectraMapping();
}
} else {
g_log.warning(
"Empty instrument name received. Continuing without instrument");
}
}
auto &mutableRun = histoBuffer->mutableRun(); auto &mutableRun = histoBuffer->mutableRun();
// Run start. Cache locally for computing frame times // Run start. Cache locally for computing frame times
......
...@@ -443,24 +443,6 @@ public: ...@@ -443,24 +443,6 @@ public:
TS_ASSERT(!decoder->isCapturing()); TS_ASSERT(!decoder->isCapturing());
} }
void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace KafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _))
.Times(Exactly(3))
.WillOnce(Return(new FakeISISEventSubscriber(1)))
.WillOnce(Return(new FakeRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeEmptyStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
startCapturing(*decoder, 1);
TS_ASSERT_THROWS(decoder->extractData(), const std::runtime_error &);
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
}
void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() { void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing; using namespace ::testing;
using namespace KafkaTesting; using namespace KafkaTesting;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment