Skip to content
Snippets Groups Projects
Unverified Commit b2979e8a authored by Kalyn Stricklin's avatar Kalyn Stricklin Committed by GitHub
Browse files

Merge pull request #57 from Botts-Innovative-Research/process-docs

Process Documentation :astonished::flushed:
parents c5b6a76d ee8f811a
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,9 @@ import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* Executable process that takes system UID as input, and outputs all driver outputs from input system, when occupancy is detected.
*/
public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubProcess {
public static final OSHProcessInfo INFO = new OSHProcessInfo("alarmrecorder", "Alarm data recording process", null, AlarmRecorder.class);
ISensorHub hub;
......@@ -41,6 +44,7 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
fac = new RADHelper();
// Get latest output names from RADHelper. We should do this for occupancy too, but it's not likely to change.
gammaAlarmName = fac.createGammaAlarm().getName();
neutronAlarmName = fac.createNeutronAlarm().getName();
startTimeName = fac.createOccupancyStartTime().getName();
......@@ -55,6 +59,9 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
outputEncodingMap = new HashMap<>();
}
/**
* Get new system UID input and ensure it has data streams and is connected to a database.
*/
@Override
public void notifyParamChange() {
super.notifyParamChange();
......@@ -62,19 +69,23 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
if(!Objects.equals(inputSystemID, "")) {
try {
Async.waitForCondition(this::checkDatastreamInput, 500, 10000);
Async.waitForCondition(this::checkDataStreamInput, 500, 10000);
Async.waitForCondition(this::checkDatabaseInput, 500, 10000);
} catch (TimeoutException e) {
if(processInfo == null)
throw new IllegalStateException("RPM datastream " + inputSystemID + " not found", e);
throw new IllegalStateException("RPM data stream " + inputSystemID + " not found", e);
else
throw new IllegalStateException("RPM datastream " + inputSystemID + " has no data", e);
throw new IllegalStateException("RPM data stream " + inputSystemID + " has no data", e);
}
}
}
// Ensure system contains a rapiscan datastream
private boolean checkDatastreamInput() {
/**
* Ensure the input system has an RPM data stream with "occupancy" output.
* Also adds process input as "occupancy" output, if exists.
* @return true if occupancy output exists
*/
private boolean checkDataStreamInput() {
// Clear old inputs
inputData.clear();
......@@ -101,6 +112,11 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
return !inputData.isEmpty();
}
/**
* Ensure input system is a member of a database, so we can get historical records.
* Also creates process outputs from input system's data stream outputs.
* @return true if system has database
*/
private boolean checkDatabaseInput() {
var db = hub.getSystemDriverRegistry().getDatabase(inputSystemID);
if(db == null)
......@@ -119,6 +135,10 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
return !outputData.isEmpty();
}
/**
* Check if occupancy had gamma or neutron alarm.
* @return true if an alarm was triggered during the latest occupancy
*/
private boolean isTriggered() {
DataComponent occupancyInput = inputData.getComponent(OCCUPANCY_NAME);
......@@ -128,6 +148,11 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
return gammaAlarm.getData().getBooleanValue() || neutronAlarm.getData().getBooleanValue();
}
/**
* Retrieve past data for the specified process output.
* @param fullOutputName process output name in the format "{systemUID}:{outputName}" (e.g. urn:osh:sensor:testsensor1:myOutput)
* @return list of observations for time specified during occupancy startTime and endTime
*/
private List<IObsData> getPastData(String fullOutputName) {
int separator = fullOutputName.lastIndexOf(':');
String systemUID = fullOutputName.substring(0, separator);
......@@ -163,12 +188,15 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
return db.getObservationStore().select(filter).collect(Collectors.toList());
}
/**
* Executes whenever we receive a new occupancy.
* If occupancy has alarm, we retrieve historical records and publish those as process outputs.
*/
@Override
public void execute() throws ProcessException {
public void execute() {
if(inputSystemID == null) {
inputSystemID = systemInputParam.getData().getStringValue();
}
// TODO: Use radhelper to get names of occupancy inputs such as start time, end time, alarms
if(isTriggered()) {
int size = outputData.size();
for(int i = 0; i < size; i++) {
......@@ -189,13 +217,17 @@ public class AlarmRecorder extends ExecutableProcessImpl implements ISensorHubPr
}
}
/**
* Get the encodings for driver outputs
* @return map of <output name, output encoding>
*/
public Map<String, DataEncoding> getOutputEncodingMap() {
return outputEncodingMap;
}
@Override
public void setParentHub(ISensorHub hub) {
this.hub = hub;
}
public Map<String, DataEncoding> getOutputEncodingMap() {
return outputEncodingMap;
}
}
......@@ -18,5 +18,4 @@ public class RapiscanProcessConfig extends ProcessConfig {
@DisplayInfo(label = "Parent System (Containing RPM)", desc = "Parent system to read occupancy data from subsystem RPM")
public String systemUID;
}
......@@ -36,11 +36,12 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
String processUniqueID;
AlarmRecorder alarmProcess;
/**
* Standalone processing module to be recognized by OSH module provider, accessible in the OSH Admin UI.
*/
public RapiscanProcessModule()
{
wrapperProcess = new AggregateProcessImpl();
wrapperProcess.setUniqueIdentifier(UUID.randomUUID().toString());
initAsync = true;
}
......@@ -73,6 +74,12 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
}
}
/**
* Build SensorML process description via ProcessHelper.
* @return process chain implementation to be executed
* @throws ProcessException if not able to read process
* @throws SensorHubException if no RPM data streams
*/
public AggregateProcessImpl buildProcess() throws ProcessException, SensorHubException {
ProcessHelper processHelper = new ProcessHelper();
processHelper.getAggregateProcess().setUniqueIdentifier(processUniqueID);
......@@ -95,13 +102,13 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
throw new SensorHubException("Unable to find RPM datastream in system");
var rpmDs = dsList.get(0);
String datastreamUID = rpmDs.getSystemID().getUniqueID();
String dataStreamUID = rpmDs.getSystemID().getUniqueID();
this.alarmProcess = new AlarmRecorder();
OshAsserts.checkValidUID(datastreamUID);
OshAsserts.checkValidUID(dataStreamUID);
processHelper.addDataSource("source0", datastreamUID);
processHelper.addDataSource("source0", dataStreamUID);
alarmProcess.getParameterList().getComponent(AlarmRecorder.SYSTEM_INPUT_PARAM).getData().setStringValue(config.systemUID);
......@@ -132,25 +139,17 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
}
}
protected void initChain() throws SensorHubException
{
//useThreads = processDescription.getInputList().isEmpty();
protected void initChain() throws SensorHubException {
// make process executable
try
{
try {
//smlUtils.makeProcessExecutable(wrapperProcess, true);
wrapperProcess = (AggregateProcessImpl)smlUtils.getExecutableInstance((AggregateProcessImpl)processDescription, useThreads);
wrapperProcess.setInstanceName("chain");
wrapperProcess.setParentLogger(getLogger());
wrapperProcess.init();
}
catch (SMLException e)
{
} catch (SMLException e) {
throw new ProcessingException("Cannot prepare process chain for execution", e);
}
catch (ProcessException e)
{
} catch (ProcessException e) {
throw new ProcessingException(e.getMessage(), e.getCause());
}
......@@ -167,8 +166,14 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
}
protected void refreshIOList(OgcPropertyList<AbstractSWEIdentifiable> ioList, Map<String, DataComponent> ioMap, Map<String, DataEncoding> encodingMap) throws ProcessingException
{
/**
* Add output interface connections
* @param ioList list of all process components
* @param ioMap map of all inputs/outputs
* @param encodingMap map of data encodings
* @throws ProcessingException if unable to get IO component
*/
protected void refreshIOList(OgcPropertyList<AbstractSWEIdentifiable> ioList, Map<String, DataComponent> ioMap, Map<String, DataEncoding> encodingMap) throws ProcessingException {
ioMap.clear();
if (ioMap == inputs)
controlInterfaces.clear();
......@@ -195,7 +200,10 @@ public class RapiscanProcessModule extends AbstractProcessModule<RapiscanProcess
}
}
/**
* Begin processing chain
* @throws SensorHubException if unable to start process or invalid process
*/
@Override
protected void doStart() throws SensorHubException
{
......
......@@ -28,7 +28,9 @@ public class RapiscanOutputInterface implements IStreamingDataInterface {
double avgSamplingPeriod = 1.0;
int avgSampleCount = 0;
/**
* Output queue used to publish process outputs as data events
*/
protected DataQueue outputQueue = new DataQueue()
{
@Override
......@@ -59,7 +61,13 @@ public class RapiscanOutputInterface implements IStreamingDataInterface {
}
};
/**
* Output interface to facilitate connection between process outputs and output queue
* @param parentProcess OSH process module
* @param outputDescriptor output to connect to data queue
* @param encoding data encoding retrieved from data stream info
* @throws ProcessingException if unable to connect output and process
*/
public RapiscanOutputInterface(RapiscanProcessModule parentProcess, AbstractSWEIdentifiable outputDescriptor, DataEncoding encoding) throws ProcessingException
{
this.parentProcess = parentProcess;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment