Skip to content
Snippets Groups Projects
Commit b36cef43 authored by William F Godoy's avatar William F Godoy
Browse files

Transport non-blocking IRead IWrite interface

Also in WANZmq, need to implement in cpp
parent 7d39f264
No related branches found
No related tags found
1 merge request!307DataMan and ZMQ transport worked with the newly refactored architecture
......@@ -24,6 +24,17 @@ Transport::Transport(const std::string type, const std::string library,
MPI_Comm_size(m_MPIComm, &m_SizeMPI);
}
void Transport::IWrite(const char *buffer, size_t size, Status &status,
size_t start)
{
throw std::invalid_argument("ERROR: this class doesn't implement IWrite\n");
}
void Transport::IRead(char *buffer, size_t size, Status &status, size_t start)
{
throw std::invalid_argument("ERROR: this class doesn't implement IRead\n");
}
void Transport::InitProfiler(const Mode openMode, const TimeUnit timeUnit)
{
m_Profiler.IsActive = true;
......
......@@ -38,6 +38,14 @@ public:
int m_SizeMPI = 1; ///< from MPI_Comm_Size
profiling::IOChrono m_Profiler; ///< profiles Open, Write/Read, Close
struct Status
{
size_t Bytes;
bool Running;
bool Successful;
// TODO add more thing...time?
};
/**
* Base constructor that all derived classes pass
* @param type from derived class
......@@ -76,6 +84,9 @@ public:
virtual void Write(const char *buffer, size_t size,
size_t start = MaxSizeT) = 0;
virtual void IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT);
/**
* Reads from transport "size" bytes from a certain position. Note that size
* and position and non-const due to the nature of underlying transport
......@@ -88,6 +99,9 @@ public:
*/
virtual void Read(char *buffer, size_t size, size_t start = MaxSizeT) = 0;
virtual void IRead(char *buffer, size_t size, Status &status,
size_t start = MaxSizeT);
/**
* Returns the size of current data in transport
* @return size as size_t
......
......@@ -124,12 +124,19 @@ void WANZmq::Write(const char *buffer, size_t size, size_t start)
}
}
void WANZmq::IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT)
{
}
void WANZmq::Read(char *buffer, size_t size, size_t start)
{
zmq_recv(m_Socket, buffer, size, 0);
int status = zmq_send(m_Socket, "OK", 4, 0);
}
void WANZmq::IRead(char *buffer, size_t size, Status &status, size_t start) {}
void WANZmq::Flush() {}
void WANZmq::Close()
......
......@@ -40,8 +40,14 @@ public:
void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final;
void IWrite(const char *buffer, size_t size, Status &status,
size_t start = MaxSizeT) final;
void Read(char *buffer, size_t size, size_t start = MaxSizeT) final;
void IRead(char *buffer, size_t size, Status &status,
size_t start = MaxSizeT) final;
void Flush() final;
void Close() final;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment