Skip to content
Snippets Groups Projects
Unverified Commit 98e67b62 authored by Jason Wang's avatar Jason Wang Committed by GitHub
Browse files

Merge pull request #2 from williamfgc/zmq

Transport non-blocking IRead IWrite interface
parents 7d39f264 b36cef43
No related branches found
No related tags found
1 merge request!307DataMan and ZMQ transport worked with the newly refactored architecture
...@@ -24,6 +24,17 @@ Transport::Transport(const std::string type, const std::string library, ...@@ -24,6 +24,17 @@ Transport::Transport(const std::string type, const std::string library,
MPI_Comm_size(m_MPIComm, &m_SizeMPI); MPI_Comm_size(m_MPIComm, &m_SizeMPI);
} }
void Transport::IWrite(const char *buffer, size_t size, Status &status,
size_t start)
{
throw std::invalid_argument("ERROR: this class doesn't implement IWrite\n");
}
void Transport::IRead(char *buffer, size_t size, Status &status, size_t start)
{
throw std::invalid_argument("ERROR: this class doesn't implement IRead\n");
}
void Transport::InitProfiler(const Mode openMode, const TimeUnit timeUnit) void Transport::InitProfiler(const Mode openMode, const TimeUnit timeUnit)
{ {
m_Profiler.IsActive = true; m_Profiler.IsActive = true;
......
...@@ -38,6 +38,14 @@ public: ...@@ -38,6 +38,14 @@ public:
int m_SizeMPI = 1; ///< from MPI_Comm_Size int m_SizeMPI = 1; ///< from MPI_Comm_Size
profiling::IOChrono m_Profiler; ///< profiles Open, Write/Read, Close profiling::IOChrono m_Profiler; ///< profiles Open, Write/Read, Close
struct Status
{
size_t Bytes;
bool Running;
bool Successful;
// TODO add more thing...time?
};
/** /**
* Base constructor that all derived classes pass * Base constructor that all derived classes pass
* @param type from derived class * @param type from derived class
...@@ -76,6 +84,9 @@ public: ...@@ -76,6 +84,9 @@ public:
virtual void Write(const char *buffer, size_t size, virtual void Write(const char *buffer, size_t size,
size_t start = MaxSizeT) = 0; size_t start = MaxSizeT) = 0;
virtual void IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT);
/** /**
* Reads from transport "size" bytes from a certain position. Note that size * Reads from transport "size" bytes from a certain position. Note that size
* and position and non-const due to the nature of underlying transport * and position and non-const due to the nature of underlying transport
...@@ -88,6 +99,9 @@ public: ...@@ -88,6 +99,9 @@ public:
*/ */
virtual void Read(char *buffer, size_t size, size_t start = MaxSizeT) = 0; virtual void Read(char *buffer, size_t size, size_t start = MaxSizeT) = 0;
virtual void IRead(char *buffer, size_t size, Status &status,
size_t start = MaxSizeT);
/** /**
* Returns the size of current data in transport * Returns the size of current data in transport
* @return size as size_t * @return size as size_t
......
...@@ -124,12 +124,19 @@ void WANZmq::Write(const char *buffer, size_t size, size_t start) ...@@ -124,12 +124,19 @@ void WANZmq::Write(const char *buffer, size_t size, size_t start)
} }
} }
void WANZmq::IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT)
{
}
void WANZmq::Read(char *buffer, size_t size, size_t start) void WANZmq::Read(char *buffer, size_t size, size_t start)
{ {
zmq_recv(m_Socket, buffer, size, 0); zmq_recv(m_Socket, buffer, size, 0);
int status = zmq_send(m_Socket, "OK", 4, 0); int status = zmq_send(m_Socket, "OK", 4, 0);
} }
void WANZmq::IRead(char *buffer, size_t size, Status &status, size_t start) {}
void WANZmq::Flush() {} void WANZmq::Flush() {}
void WANZmq::Close() void WANZmq::Close()
......
...@@ -40,8 +40,14 @@ public: ...@@ -40,8 +40,14 @@ public:
void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final;
void IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT) final;
void Read(char *buffer, size_t size, size_t start = MaxSizeT) final; void Read(char *buffer, size_t size, size_t start = MaxSizeT) final;
void IRead(char *buffer, size_t size, Status &status,
size_t start = MaxSizeT) final;
void Flush() final; void Flush() final;
void Close() final; void Close() final;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment