Skip to content
Snippets Groups Projects
Unverified Commit a5fd1d78 authored by WHITFIELDRE email's avatar WHITFIELDRE email Committed by GitHub
Browse files

Merge pull request #22292 from martyngigg/fake-dae-responsive-cancel

Fake dae responsive cancel
parents e6566ae4 15fed508
No related merge requests found
...@@ -6,23 +6,14 @@ ...@@ -6,23 +6,14 @@
//---------------------------------------------------------------------- //----------------------------------------------------------------------
#include "MantidAPI/Algorithm.h" #include "MantidAPI/Algorithm.h"
#include <mutex>
namespace Poco {
namespace Net {
class TCPServer;
}
}
namespace Mantid { namespace Mantid {
namespace LiveData { namespace LiveData {
/** /**
Simulates ISIS histogram DAE. It runs continuously until canceled and Simulates ISIS histogram DAE. It runs continuously until canceled and
listens to port 6789 for listens to port 6789 for ISIS DAE commands.
ISIS DAE commands.
Copyright &copy; 2008-9 ISIS Rutherford Appleton Laboratory, NScD Oak Ridge Copyright &copy; 2008-9 ISIS Rutherford Appleton Laboratory, NScD Oak Ridge
National Laboratory & European Spallation Source National Laboratory & European Spallation Source
This file is part of Mantid. This file is part of Mantid.
...@@ -44,9 +35,6 @@ namespace LiveData { ...@@ -44,9 +35,6 @@ namespace LiveData {
*/ */
class FakeISISEventDAE : public API::Algorithm { class FakeISISEventDAE : public API::Algorithm {
public: public:
FakeISISEventDAE();
~FakeISISEventDAE() override;
/// Algorithm's name for identification overriding a virtual method /// Algorithm's name for identification overriding a virtual method
const std::string name() const override { return "FakeISISEventDAE"; } const std::string name() const override { return "FakeISISEventDAE"; }
/// Algorithm's version for identification overriding a virtual method /// Algorithm's version for identification overriding a virtual method
...@@ -67,10 +55,6 @@ public: ...@@ -67,10 +55,6 @@ public:
private: private:
void init() override; void init() override;
void exec() override; void exec() override;
/// Poco TCP server
Poco::Net::TCPServer *m_server;
/// Mutex
std::mutex m_mutex;
}; };
} // namespace LiveData } // namespace LiveData
......
...@@ -5,30 +5,22 @@ ...@@ -5,30 +5,22 @@
// Includes // Includes
//---------------------------------------------------------------------- //----------------------------------------------------------------------
#include "MantidAPI/Algorithm.h" #include "MantidAPI/Algorithm.h"
#include <mutex>
namespace Poco {
namespace Net {
class TCPServer;
}
}
namespace Mantid { namespace Mantid {
namespace LiveData { namespace LiveData {
/** /**
Simulates ISIS histogram DAE. It runs continuously until canceled and Simulates ISIS histogram DAE. It runs continuously until canceled and
listens to port 6789 for listens to port 6789 for ISIS DAE commands.
ISIS DAE commands.
Data is generated starting at 10000 microseconds Time of flight, and each Data is generated starting at 10000 microseconds Time of flight, and each
bin requested covers 100 microseconds. bin requested covers 100 microseconds.
The algorithm silently defines three additional spectra with numbers The algorithm silently defines three additional spectra with numbers
NSpectra+1, NSpectra+2 and NSpectra+3 in a NSpectra+1, NSpectra+2 and NSpectra+3 in a
different time regime (they have different binning to the rest of the different time regime (they have different binning to the rest of the
spectra). spectra).
Copyright &copy; 2008-9 ISIS Rutherford Appleton Laboratory, NScD Oak Ridge Copyright &copy; 2008-9 ISIS Rutherford Appleton Laboratory, NScD Oak Ridge
National Laboratory & European Spallation Source National Laboratory & European Spallation Source
This file is part of Mantid. This file is part of Mantid.
...@@ -50,13 +42,10 @@ namespace LiveData { ...@@ -50,13 +42,10 @@ namespace LiveData {
*/ */
class DLLExport FakeISISHistoDAE : public API::Algorithm { class DLLExport FakeISISHistoDAE : public API::Algorithm {
public: public:
FakeISISHistoDAE();
~FakeISISHistoDAE() override;
/// Algorithm's name for identification overriding a virtual method /// Algorithm's name for identification overriding a virtual method
const std::string name() const override { return "FakeISISHistoDAE"; }; const std::string name() const override { return "FakeISISHistoDAE"; }
/// Algorithm's version for identification overriding a virtual method /// Algorithm's version for identification overriding a virtual method
int version() const override { return 1; }; int version() const override { return 1; }
/// Algorithm's category for identification overriding a virtual method /// Algorithm's category for identification overriding a virtual method
const std::string category() const override { const std::string category() const override {
return "DataHandling\\DataAcquisition"; return "DataHandling\\DataAcquisition";
...@@ -73,10 +62,6 @@ private: ...@@ -73,10 +62,6 @@ private:
// Implement abstract Algorithm methods // Implement abstract Algorithm methods
void init() override; void init() override;
void exec() override; void exec() override;
/// Poco TCP server
Poco::Net::TCPServer *m_server;
/// Mutex
std::mutex m_mutex;
}; };
} // namespace LiveData } // namespace LiveData
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
const long RECV_TIMEOUT = 30; const long RECV_TIMEOUT = 30;
// Sleep time in case we need to wait for the data to become available (in // Sleep time in case we need to wait for the data to become available (in
// milliseconds) // milliseconds)
const long RECV_WAIT = 1; const long RECV_WAIT = 10;
//---------------------------------------------------------------------- //----------------------------------------------------------------------
// Forward declarations // Forward declarations
...@@ -164,6 +164,8 @@ protected: ...@@ -164,6 +164,8 @@ protected:
void Receive(T &buffer, const std::string &head, const std::string &msg) { void Receive(T &buffer, const std::string &head, const std::string &msg) {
long timeout = 0; long timeout = 0;
while (m_socket.available() < static_cast<int>(sizeof(buffer))) { while (m_socket.available() < static_cast<int>(sizeof(buffer))) {
if (m_stopThread)
return;
Poco::Thread::sleep(RECV_WAIT); Poco::Thread::sleep(RECV_WAIT);
timeout += RECV_WAIT; timeout += RECV_WAIT;
if (timeout > RECV_TIMEOUT * 1000) if (timeout > RECV_TIMEOUT * 1000)
...@@ -190,7 +192,7 @@ protected: ...@@ -190,7 +192,7 @@ protected:
/// Thread that reads events from the DAE in the background /// Thread that reads events from the DAE in the background
Poco::Thread m_thread; Poco::Thread m_thread;
/// background thread checks this periodically. If true, the thread exits /// background thread checks this periodically. If true, the thread exits
bool m_stopThread; std::atomic<bool> m_stopThread;
/// Holds on to any exceptions that were thrown in the background thread so /// Holds on to any exceptions that were thrown in the background thread so
/// that we /// that we
/// can re-throw them in the forground thread /// can re-throw them in the forground thread
......
...@@ -7,15 +7,12 @@ ...@@ -7,15 +7,12 @@
#include "MantidKernel/MersenneTwister.h" #include "MantidKernel/MersenneTwister.h"
#include "MantidKernel/Timer.h" #include "MantidKernel/Timer.h"
#include <Poco/ActiveResult.h>
#include <Poco/Net/TCPServer.h> #include <Poco/Net/TCPServer.h>
#include <Poco/Net/StreamSocket.h> #include <Poco/Net/StreamSocket.h>
#include <Poco/ActiveResult.h>
#include <Poco/Thread.h>
#include <boost/random/uniform_int.hpp> #include <boost/random/uniform_int.hpp>
#include <numeric>
namespace Mantid { namespace Mantid {
namespace LiveData { namespace LiveData {
// Register the algorithm into the algorithm factory // Register the algorithm into the algorithm factory
...@@ -147,17 +144,6 @@ public: ...@@ -147,17 +144,6 @@ public:
}; };
} // end anonymous } // end anonymous
/// (Empty) Constructor
FakeISISEventDAE::FakeISISEventDAE() : m_server(nullptr) {}
/// Destructor
FakeISISEventDAE::~FakeISISEventDAE() {
if (m_server) {
m_server->stop();
delete m_server;
}
}
/** /**
* Declare the algorithm properties * Declare the algorithm properties
*/ */
...@@ -198,19 +184,18 @@ void FakeISISEventDAE::exec() { ...@@ -198,19 +184,18 @@ void FakeISISEventDAE::exec() {
histoDAE->setProperty("NPeriods", nper); histoDAE->setProperty("NPeriods", nper);
histoDAE->setProperty("NSpectra", nspec); histoDAE->setProperty("NSpectra", nspec);
histoDAE->setProperty("Port", port + 1); histoDAE->setProperty("Port", port + 1);
Poco::ActiveResult<bool> histoDAEHandle = histoDAE->executeAsync(); auto histoDAEHandle = histoDAE->executeAsync();
auto prog = boost::make_shared<Progress>(this, 0.0, 1.0, 100); auto prog = boost::make_shared<Progress>(this, 0.0, 1.0, 100);
prog->setNotifyStep(0); prog->setNotifyStep(0);
prog->report(0, "Waiting for client"); prog->report(0, "Waiting for client");
std::lock_guard<std::mutex> lock(m_mutex);
Poco::Net::ServerSocket socket(static_cast<Poco::UInt16>(port)); Poco::Net::ServerSocket socket(static_cast<Poco::UInt16>(port));
socket.listen(); socket.listen();
m_server = new Poco::Net::TCPServer( Poco::Net::TCPServer server(
TestServerConnectionFactory::Ptr( TestServerConnectionFactory::Ptr(
new TestServerConnectionFactory(nper, nspec, rate, nevents, prog)), new TestServerConnectionFactory(nper, nspec, rate, nevents, prog)),
socket); socket);
m_server->start(); server.start();
// Keep going until you get cancelled // Keep going until you get cancelled
while (true) { while (true) {
try { try {
...@@ -223,19 +208,12 @@ void FakeISISEventDAE::exec() { ...@@ -223,19 +208,12 @@ void FakeISISEventDAE::exec() {
// Sleep for 50 msec // Sleep for 50 msec
Poco::Thread::sleep(50); Poco::Thread::sleep(50);
} }
// It's most likely that we got here from a cancel request
// so calling prog->report after this point
// will generate another CancelException
histoDAE->cancel(); histoDAE->cancel();
histoDAEHandle.wait(); histoDAEHandle.wait();
if (m_server) {
m_server->stop();
m_server = nullptr;
}
socket.close(); socket.close();
prog->report(90, "Closing ISIS event DAE");
histoDAE->setLogging(false); // hide the final closedown message to the log it
// is confusing as it is a child alg.
histoDAE->cancel();
histoDAEHandle.wait();
} }
} // namespace LiveData } // namespace LiveData
......
...@@ -2,11 +2,9 @@ ...@@ -2,11 +2,9 @@
// Includes // Includes
//---------------------------------------------------------------------- //----------------------------------------------------------------------
#include "MantidLiveData/ISIS/FakeISISHistoDAE.h" #include "MantidLiveData/ISIS/FakeISISHistoDAE.h"
#include <numeric>
#include <Poco/Net/TCPServer.h> #include <Poco/Net/TCPServer.h>
#include <Poco/Net/StreamSocket.h> #include <Poco/Net/StreamSocket.h>
#include <Poco/Thread.h>
namespace Mantid { namespace Mantid {
namespace LiveData { namespace LiveData {
...@@ -314,17 +312,6 @@ public: ...@@ -314,17 +312,6 @@ public:
using namespace Kernel; using namespace Kernel;
using namespace API; using namespace API;
/// (Empty) Constructor
FakeISISHistoDAE::FakeISISHistoDAE() : m_server(nullptr) {}
/// Destructor
FakeISISHistoDAE::~FakeISISHistoDAE() {
if (m_server) {
m_server->stop();
delete m_server;
}
}
/** /**
* Declare the algorithm properties * Declare the algorithm properties
*/ */
...@@ -352,16 +339,15 @@ void FakeISISHistoDAE::exec() { ...@@ -352,16 +339,15 @@ void FakeISISHistoDAE::exec() {
int nbins = getProperty("NBins"); int nbins = getProperty("NBins");
int port = getProperty("Port"); int port = getProperty("Port");
std::lock_guard<std::mutex> lock(m_mutex);
Poco::Net::ServerSocket socket(static_cast<Poco::UInt16>(port)); Poco::Net::ServerSocket socket(static_cast<Poco::UInt16>(port));
socket.listen(); socket.listen();
m_server = new Poco::Net::TCPServer( Poco::Net::TCPServer server(
TestServerConnectionFactory::Ptr( TestServerConnectionFactory::Ptr(
new TestServerConnectionFactory(nper, nspec, nbins)), new TestServerConnectionFactory(nper, nspec, nbins)),
socket); socket);
m_server->start(); server.start();
// Keep going until you get cancelled // Keep going until you get cancelled or an error occurs
while (true) { while (true) {
try { try {
// Exit if the user presses cancel // Exit if the user presses cancel
...@@ -374,10 +360,10 @@ void FakeISISHistoDAE::exec() { ...@@ -374,10 +360,10 @@ void FakeISISHistoDAE::exec() {
// Sleep for 50 msec // Sleep for 50 msec
Poco::Thread::sleep(50); Poco::Thread::sleep(50);
} }
if (m_server) { // It's most likely that we got here from a cancel request
m_server->stop(); // so calling prog->report after this point
m_server = nullptr; // will generate another CancelException
} server.stop();
socket.close(); socket.close();
} }
......
...@@ -235,6 +235,8 @@ void ISISLiveEventDataListener::run() { ...@@ -235,6 +235,8 @@ void ISISLiveEventDataListener::run() {
// get the header with the type of the packet // get the header with the type of the packet
Receive(events.head, "Events header", Receive(events.head, "Events header",
"Corrupt stream - you should reconnect."); "Corrupt stream - you should reconnect.");
if (m_stopThread)
break;
if (!(events.head.type == TCPStreamEventHeader::Neutron)) { if (!(events.head.type == TCPStreamEventHeader::Neutron)) {
// don't know what to do with it - stop // don't know what to do with it - stop
throw std::runtime_error("Unknown packet type."); throw std::runtime_error("Unknown packet type.");
...@@ -244,6 +246,8 @@ void ISISLiveEventDataListener::run() { ...@@ -244,6 +246,8 @@ void ISISLiveEventDataListener::run() {
// get the header with the sream size // get the header with the sream size
Receive(events.head_n, "Neutrons header", Receive(events.head_n, "Neutrons header",
"Corrupt stream - you should reconnect."); "Corrupt stream - you should reconnect.");
if (m_stopThread)
break;
CollectJunk(events.head_n); CollectJunk(events.head_n);
// absolute pulse (frame) time // absolute pulse (frame) time
...@@ -282,8 +286,7 @@ void ISISLiveEventDataListener::run() { ...@@ -282,8 +286,7 @@ void ISISLiveEventDataListener::run() {
saveEvents(events.data, pulseTime, events.head_n.period); saveEvents(events.data, pulseTime, events.head_n.period);
} }
} catch (std::runtime_error & } catch (std::runtime_error &e) {
e) { // exception handler for generic runtime exceptions
g_log.error() << "Caught a runtime exception.\nException message: " g_log.error() << "Caught a runtime exception.\nException message: "
<< e.what() << '\n'; << e.what() << '\n';
...@@ -291,8 +294,8 @@ void ISISLiveEventDataListener::run() { ...@@ -291,8 +294,8 @@ void ISISLiveEventDataListener::run() {
m_backgroundException = boost::make_shared<std::runtime_error>(e); m_backgroundException = boost::make_shared<std::runtime_error>(e);
} catch (std::invalid_argument & } catch (std::invalid_argument &e) {
e) { // TimeSeriesProperty (and possibly some other things) can // TimeSeriesProperty (and possibly some other things) can
// can throw these errors // can throw these errors
g_log.error() g_log.error()
<< "Caught an invalid argument exception.\nException message: " << "Caught an invalid argument exception.\nException message: "
...@@ -303,7 +306,7 @@ void ISISLiveEventDataListener::run() { ...@@ -303,7 +306,7 @@ void ISISLiveEventDataListener::run() {
newMsg += e.what(); newMsg += e.what();
m_backgroundException = boost::make_shared<std::runtime_error>(newMsg); m_backgroundException = boost::make_shared<std::runtime_error>(newMsg);
} catch (...) { // Default exception handler } catch (...) {
g_log.error() << "Uncaught exception in ISISLiveEventDataListener network " g_log.error() << "Uncaught exception in ISISLiveEventDataListener network "
"read thread.\n"; "read thread.\n";
m_isConnected = false; m_isConnected = false;
......
...@@ -25,10 +25,7 @@ Usage ...@@ -25,10 +25,7 @@ Usage
**Example:** **Example:**
.. This test is currently hanging on macOS as the MonitorLiveData algorithm .. testcode:: exFakeISISEventDAE
is taking a long time to cancel
.. code-block:: python
from threading import Thread from threading import Thread
import time import time
...@@ -45,20 +42,22 @@ Usage ...@@ -45,20 +42,22 @@ Usage
def captureLive(): def captureLive():
ConfigService.setFacility("TEST_LIVE") ConfigService.setFacility("TEST_LIVE")
# start a Live data listener updating every second, that rebins the data try:
# and replaces the results each time with those of the last second. # start a Live data listener updating every second, that rebins the data
StartLiveData(Instrument='ISIS_Event', OutputWorkspace='wsOut', UpdateEvery=1, # and replaces the results each time with those of the last second.
ProcessingAlgorithm='Rebin', ProcessingProperties='Params=10000,1000,20000;PreserveEvents=1', StartLiveData(Instrument='ISIS_Event', OutputWorkspace='wsOut', UpdateEvery=1,
AccumulationMethod='Add', PreserveEvents=True) ProcessingAlgorithm='Rebin', ProcessingProperties='Params=10000,1000,20000;PreserveEvents=1',
AccumulationMethod='Add', PreserveEvents=True)
# give it a couple of seconds before stopping it
time.sleep(2) # give it a couple of seconds before stopping it
time.sleep(2)
# This will cancel both algorithms finally:
# you can do the same in the GUI # This will cancel both algorithms
# by clicking on the details button on the bottom right # you can do the same in the GUI
AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel() # by clicking on the details button on the bottom right
AlgorithmManager.newestInstanceOf("FakeISISEventDAE").cancel() AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel()
AlgorithmManager.newestInstanceOf("FakeISISEventDAE").cancel()
time.sleep(1)
#-------------------------------------------------------------------------------------------------- #--------------------------------------------------------------------------------------------------
oldFacility = ConfigService.getFacility().name() oldFacility = ConfigService.getFacility().name()
......
...@@ -36,20 +36,21 @@ Usage ...@@ -36,20 +36,21 @@ Usage
def captureLive(): def captureLive():
ConfigService.setFacility("TEST_LIVE") ConfigService.setFacility("TEST_LIVE")
# start a Live data listener updating every second, that rebins the data try:
# and replaces the results each time with those of the last second. # start a Live data listener updating every second, that rebins the data
StartLiveData(Instrument='ISIS_Histogram', OutputWorkspace='wsOut', UpdateEvery=1, # and replaces the results each time with those of the last second.
AccumulationMethod='Replace') StartLiveData(Instrument='ISIS_Histogram', OutputWorkspace='wsOut', UpdateEvery=1,
AccumulationMethod='Replace')
# give it a couple of seconds before stopping it
time.sleep(2) # give it a couple of seconds before stopping it
time.sleep(2)
# This will cancel both algorithms finally:
# you can do the same in the GUI # This will cancel both algorithms
# by clicking on the details button on the bottom right # you can do the same in the GUI
AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel() # by clicking on the details button on the bottom right
AlgorithmManager.newestInstanceOf("FakeISISHistoDAE").cancel() AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel()
time.sleep(1) # give them time to cancel AlgorithmManager.newestInstanceOf("FakeISISHistoDAE").cancel()
time.sleep(1)
#-------------------------------------------------------------------------------------------------- #--------------------------------------------------------------------------------------------------
oldFacility = ConfigService.getFacility().name() oldFacility = ConfigService.getFacility().name()
......
...@@ -123,20 +123,22 @@ Usage ...@@ -123,20 +123,22 @@ Usage
def captureLive(): def captureLive():
ConfigService.setFacility("TEST_LIVE") ConfigService.setFacility("TEST_LIVE")
# start a Live data listener updating every second, that rebins the data try:
# and replaces the results each time with those of the last second. # start a Live data listener updating every second, that rebins the data
StartLiveData(Instrument='ISIS_Event', OutputWorkspace='wsOut', UpdateEvery=1, # and replaces the results each time with those of the last second.
ProcessingAlgorithm='Rebin', ProcessingProperties='Params=10000,1000,20000;PreserveEvents=1', StartLiveData(Instrument='ISIS_Event', OutputWorkspace='wsOut', UpdateEvery=1,
AccumulationMethod='Add', PreserveEvents=True) ProcessingAlgorithm='Rebin', ProcessingProperties='Params=10000,1000,20000;PreserveEvents=1',
AccumulationMethod='Add', PreserveEvents=True)
# give it a couple of seconds before stopping it
time.sleep(2) # give it a couple of seconds before stopping it
time.sleep(2)
# This will cancel both algorithms finally:
# you can do the same in the GUI # This will cancel both algorithms
# by clicking on the details button on the bottom right # you can do the same in the GUI
AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel() # by clicking on the details button on the bottom right
AlgorithmManager.newestInstanceOf("FakeISISEventDAE").cancel() AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel()
AlgorithmManager.newestInstanceOf("FakeISISEventDAE").cancel()
time.sleep(1)
#-------------------------------------------------------------------------------------------------- #--------------------------------------------------------------------------------------------------
oldFacility = ConfigService.getFacility().name() oldFacility = ConfigService.getFacility().name()
...@@ -187,20 +189,22 @@ Output: ...@@ -187,20 +189,22 @@ Output:
def captureLive(): def captureLive():
ConfigService.setFacility("TEST_LIVE") ConfigService.setFacility("TEST_LIVE")
# Start a Live data listener updating every second, try:
# that replaces the results each time with those of the last second. # Start a Live data listener updating every second,
# Load only spectra 2,4, and 6 from periods 1 and 3 # that replaces the results each time with those of the last second.
StartLiveData(Instrument='ISIS_Histogram', OutputWorkspace='wsOut', UpdateEvery=1, # Load only spectra 2,4, and 6 from periods 1 and 3
AccumulationMethod='Replace', PeriodList=[1,3],SpectraList=[2,4,6]) StartLiveData(Instrument='ISIS_Histogram', OutputWorkspace='wsOut', UpdateEvery=1,
AccumulationMethod='Replace', PeriodList=[1,3],SpectraList=[2,4,6])
# give it a couple of seconds before stopping it
time.sleep(2) # give it a couple of seconds before stopping it
time.sleep(2)
# This will cancel both algorithms finally:
# you can do the same in the GUI # This will cancel both algorithms
# by clicking on the details button on the bottom right # you can do the same in the GUI
AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel() # by clicking on the details button on the bottom right
AlgorithmManager.newestInstanceOf("FakeISISHistoDAE").cancel() AlgorithmManager.newestInstanceOf("MonitorLiveData").cancel()
AlgorithmManager.newestInstanceOf("FakeISISHistoDAE").cancel()
time.sleep(1)
#-------------------------------------------------------------------------------------------------- #--------------------------------------------------------------------------------------------------
oldFacility = ConfigService.getFacility().name() oldFacility = ConfigService.getFacility().name()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment