Skip to content
Snippets Groups Projects
Commit f3549eaf authored by earocorn's avatar earocorn
Browse files

Separate system outputs

parent 60d4325c
No related branches found
No related tags found
1 merge request!53Alarm occupancy process fix
......@@ -110,11 +110,20 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
return false;
outputData.clear();
db.getDataStreamStore().values().forEach(ds -> {
var dsTopicId = EventUtils.getDataStreamDataTopicID(ds);
outputData.add(dsTopicId, ds.getRecordStructure().copy());
db.getSystemDescStore().values().forEach(sys -> {
var systemOutputBuilder = fac.createRecord();
var systemDatastreams = db.getDataStreamStore().select(new DataStreamFilter.Builder()
.withSystems(
new SystemFilter.Builder().withUniqueIDs(sys.getUniqueIdentifier()).build()).build())
.collect(Collectors.toList());
for(var ds : systemDatastreams)
systemOutputBuilder.addField(ds.getOutputName(), ds.getRecordStructure());
outputData.add(sys.getUniqueIdentifier(), systemOutputBuilder.build());
});
return !outputData.isEmpty();
}
......@@ -124,13 +133,10 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
DataComponent gammaAlarm = occupancyInput.getComponent(gammaAlarmName);
DataComponent neutronAlarm = occupancyInput.getComponent(neutronAlarmName);
if(gammaAlarm.getData().getBooleanValue() || neutronAlarm.getData().getBooleanValue()) {
return true;
}
return false;
return gammaAlarm.getData().getBooleanValue() || neutronAlarm.getData().getBooleanValue();
}
private List<IObsData> getPastData(String outputName) {
private List<IObsData> getPastData(String systemUID, String outputName) {
DataComponent occupancyInput = inputData.getComponent(OCCUPANCY_NAME);
DataComponent startTime = occupancyInput.getComponent(startTimeName);
......@@ -145,8 +151,14 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
var db = hub.getSystemDriverRegistry().getDatabase(inputSystemID);
SystemFilter systemFilter = new SystemFilter.Builder()
.withUniqueIDs(systemUID)
.includeMembers(false)
.build();
DataStreamFilter dsFilter = new DataStreamFilter.Builder()
.withOutputNames(outputName)
.withSystems(systemFilter)
.build();
ObsFilter filter = new ObsFilter.Builder()
.withDataStreams(dsFilter)
......@@ -165,11 +177,11 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
if(isTriggered()) {
int size = outputData.size();
for(int i = 0; i < size; i++) {
DataComponent item = outputData.getComponent(i);
String itemName = item.getName();
DataComponent system = outputData.getComponent(i);
String itemName = system.getName();
for(IObsData data : getPastData(itemName)) {
item.setData(data.getResult());
for(IObsData data : getPastData(system.getName(), itemName)) {
system.setData(data.getResult());
try {
publishData();
publishData(itemName);
......@@ -177,6 +189,21 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
throw new RuntimeException(e);
}
}
for (int j = 0; j < system.getComponentCount(); j++) {
var output = system.getComponent(i);
for(IObsData data : getPastData(system.getName(), output.getName())) {
output.setData(data.getResult());
try {
publishData();
publishData(itemName);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
}
......
......@@ -15,14 +15,8 @@ public class RapiscanProcessConfig extends ProcessConfig {
@DisplayInfo.Required
@DisplayInfo.FieldType(DisplayInfo.FieldType.Type.SYSTEM_UID)
@DisplayInfo(label = "Rapiscan Datastream Source", desc = "Datasource to read occupancy data")
public String datastreamUID;
@DisplayInfo.Required
@DisplayInfo.FieldType(DisplayInfo.FieldType.Type.MODULE_ID)
@DisplayInfo.ModuleType(IDatabase.class)
@DisplayInfo(label = "Input Database", desc = "Module ID of system database to query")
public String databaseModuleID;
@DisplayInfo(label = "Parent System (Containing RPM)", desc = "Parent system to read occupancy data from subsystem RPM")
public String systemUID;
}
......@@ -5,6 +5,8 @@ import net.opengis.swe.v20.AbstractSWEIdentifiable;
import net.opengis.swe.v20.DataComponent;
import org.sensorhub.api.ISensorHub;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.api.datastore.obs.DataStreamFilter;
import org.sensorhub.api.datastore.system.SystemFilter;
import org.sensorhub.api.module.ModuleEvent;
import org.sensorhub.api.processing.ProcessingException;
import org.sensorhub.api.sensor.ISensorModule;
......@@ -21,6 +23,7 @@ import org.vast.sensorML.SMLUtils;
import java.io.*;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
// Based on SMLProcessImpl
public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcessConfig> {
......@@ -72,14 +75,33 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
ProcessHelper processHelper = new ProcessHelper();
processHelper.getAggregateProcess().setUniqueIdentifier(processUniqueID);
var sysFilter = new SystemFilter.Builder()
.withUniqueIDs(config.systemUID)
.includeMembers(true)
.build();
var db = getParentHub().getDatabaseRegistry().getFederatedDatabase();
var matchingDs = db.getDataStreamStore().select(new DataStreamFilter.Builder()
.withSystems(sysFilter)
.withCurrentVersion()
.withOutputNames(AlarmRecorder.OCCUPANCY_NAME)
.withLimit(1)
.build());
var dsList = matchingDs.collect(Collectors.toList());
if(dsList.size() != 1)
throw new SensorHubException("Unable to find RPM datastream in system");
var rpmDs = dsList.get(0);
String datastreamUID = rpmDs.getSystemID().getUniqueID();
AlarmRecorder process = new AlarmRecorder();
OshAsserts.checkValidUID(config.datastreamUID);
OshAsserts.checkValidUID(datastreamUID);
processHelper.addDataSource("source0", config.datastreamUID);
processHelper.addDataSource("source0", datastreamUID);
process.getParameterList().getComponent(AlarmRecorder.DATABASE_INPUT_PARAM).getData().setStringValue(config.databaseModuleID);
process.getParameterList().getComponent(AlarmRecorder.DATASTREAM_INPUT_PARAM).getData().setStringValue(config.datastreamUID);
process.getParameterList().getComponent(AlarmRecorder.SYSTEM_INPUT_PARAM).getData().setStringValue(config.systemUID);
process.setParentHub(getParentHub());
process.notifyParamChange();
......@@ -91,9 +113,17 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
processHelper.addConnection("components/source0/outputs/" + AlarmRecorder.OCCUPANCY_NAME
,"components/process0/inputs/" + AlarmRecorder.OCCUPANCY_NAME);
for(AbstractSWEIdentifiable output : process.getOutputList()) {
DataComponent component = (DataComponent) output;
processHelper.addConnection("components/process0/outputs/" + component.getName(), "outputs/" + component.getName());
for(AbstractSWEIdentifiable systemOutput : process.getOutputList()) {
DataComponent systemComponent = (DataComponent) systemOutput;
if(systemComponent.getComponentCount() > 0)
processHelper.addConnection("components/process0/outputs/" + systemComponent.getName(),
"outputs/" + systemComponent.getName());
// for(int i = 0; i < systemComponent.getComponentCount(); i++) {
// var output = systemComponent.getComponent(i);
// processHelper.addConnection("components/process0/outputs/" + systemComponent.getName() + "/" + output.getName(),
// "outputs/" + systemComponent.getName() + "/" + output.getName());
// }
}
try {
......@@ -106,7 +136,7 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
}
}
protected void initChain() throws SensorHubException
protected void initChain() throws SensorHubException
{
//useThreads = processDescription.getInputList().isEmpty();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment