Unverified Commit e131887a authored by Eisenhauer, Greg's avatar Eisenhauer, Greg Committed by GitHub
Browse files

Merge pull request #2007 from eisenhauer/LowLevelAsyncOpen

Move async file open functionality to transports
parents f668f832 ab622040
......@@ -188,16 +188,14 @@ void BP3Writer::InitTransports()
{
if (m_BP3Serializer.m_Parameters.AsyncTasks)
{
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
else
{
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["asynctasks"] = "true";
}
}
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
}
......
......@@ -190,16 +190,14 @@ void BP4Writer::InitTransports()
{
if (m_BP4Serializer.m_Parameters.AsyncTasks)
{
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}
else
{
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["asynctasks"] = "true";
}
}
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}
if (m_BP4Serializer.m_RankMPI == 0)
......
......@@ -58,11 +58,14 @@ public:
void InitProfiler(const Mode openMode, const TimeUnit timeUnit);
/**
* Opens transport, required before SetBuffer, Write, Read, Flush, Close
* Opens transport, possibly asynchronously, required before SetBuffer,
* Write, Read, Flush, Close
* @param name
* @param openMode
* @param async
*/
virtual void Open(const std::string &name, const Mode openMode) = 0;
virtual void Open(const std::string &name, const Mode openMode,
const bool async = false) = 0;
/**
* If OS buffered (FILE* or fstream), sets the buffer size
......
......@@ -23,8 +23,31 @@ FileFStream::FileFStream(helper::Comm const &comm, const bool debugMode)
{
}
void FileFStream::Open(const std::string &name, const Mode openMode)
void FileFStream::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}
void FileFStream::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> void {
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
};
m_Name = name;
CheckName();
m_OpenMode = openMode;
......@@ -32,10 +55,19 @@ void FileFStream::Open(const std::string &name, const Mode openMode)
switch (m_OpenMode)
{
case (Mode::Write):
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
}
break;
case (Mode::Append):
......@@ -59,9 +91,13 @@ void FileFStream::Open(const std::string &name, const Mode openMode)
", in call to stream open");
}
CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to fstream open");
m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to fstream open");
m_IsOpen = true;
}
}
void FileFStream::SetBuffer(char *buffer, size_t size)
......@@ -81,6 +117,7 @@ void FileFStream::Write(const char *buffer, size_t size, size_t start)
", in call to fstream write");
};
WaitForOpen();
if (start != MaxSizeT)
{
m_FileStream.seekp(start);
......@@ -117,6 +154,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start)
", in call to fstream read");
};
WaitForOpen();
if (start != MaxSizeT)
{
m_FileStream.seekg(start);
......@@ -145,6 +183,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start)
size_t FileFStream::GetSize()
{
WaitForOpen();
const auto currentPosition = m_FileStream.tellg();
m_FileStream.seekg(0, std::ios_base::end);
const std::streampos size = m_FileStream.tellg();
......@@ -159,6 +198,7 @@ size_t FileFStream::GetSize()
void FileFStream::Flush()
{
WaitForOpen();
ProfilerStart("write");
m_FileStream.flush();
ProfilerStart("write");
......@@ -168,6 +208,7 @@ void FileFStream::Flush()
void FileFStream::Close()
{
WaitForOpen();
ProfilerStart("close");
m_FileStream.close();
ProfilerStop("close");
......@@ -186,6 +227,7 @@ void FileFStream::CheckFile(const std::string hint) const
void FileFStream::SeekToEnd()
{
WaitForOpen();
m_FileStream.seekp(0, std::ios_base::end);
CheckFile("couldn't move to the end of file " + m_Name +
", in call to fstream seekp");
......@@ -193,6 +235,7 @@ void FileFStream::SeekToEnd()
void FileFStream::SeekToBegin()
{
WaitForOpen();
m_FileStream.seekp(0, std::ios_base::beg);
CheckFile("couldn't move to the beginning of file " + m_Name +
", in call to fstream seekp");
......
......@@ -12,6 +12,7 @@
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILESTREAM_H_
#include <fstream>
#include <future> //std::async, std::future
#include "adios2/common/ADIOSConfig.h"
#include "adios2/helper/adiosComm.h"
......@@ -31,7 +32,8 @@ public:
~FileFStream() = default;
void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;
void SetBuffer(char *buffer, size_t size) final;
......@@ -52,12 +54,15 @@ public:
private:
/** file stream using fstream library */
std::fstream m_FileStream;
bool m_IsOpening = false;
std::future<void> m_OpenFuture;
/**
* Check if m_FileStream is false after an operation
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};
} // end namespace transport
......
......@@ -37,8 +37,32 @@ FilePOSIX::~FilePOSIX()
}
}
void FilePOSIX::Open(const std::string &name, const Mode openMode)
void FilePOSIX::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_FileDescriptor = m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}
void FilePOSIX::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> int {
ProfilerStart("open");
int FD = open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
return FD;
};
m_Name = name;
CheckName();
m_OpenMode = openMode;
......@@ -46,10 +70,19 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode)
{
case (Mode::Write):
ProfilerStart("open");
m_FileDescriptor =
open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
ProfilerStart("open");
m_FileDescriptor =
open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
}
break;
case (Mode::Append):
......@@ -71,10 +104,13 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode)
", in call to POSIX open");
}
CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}
void FilePOSIX::Write(const char *buffer, size_t size, size_t start)
......@@ -103,6 +139,7 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start)
}
};
WaitForOpen();
if (start != MaxSizeT)
{
const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET);
......@@ -161,6 +198,8 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
}
};
WaitForOpen();
if (start != MaxSizeT)
{
const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET);
......@@ -197,6 +236,7 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
size_t FilePOSIX::GetSize()
{
struct stat fileStat;
WaitForOpen();
if (fstat(m_FileDescriptor, &fileStat) == -1)
{
throw std::ios_base::failure("ERROR: couldn't get size of file " +
......@@ -209,6 +249,7 @@ void FilePOSIX::Flush() {}
void FilePOSIX::Close()
{
WaitForOpen();
ProfilerStart("close");
const int status = close(m_FileDescriptor);
ProfilerStop("close");
......@@ -232,6 +273,7 @@ void FilePOSIX::CheckFile(const std::string hint) const
void FilePOSIX::SeekToEnd()
{
WaitForOpen();
const int status = lseek(m_FileDescriptor, 0, SEEK_END);
if (status == -1)
{
......@@ -243,6 +285,7 @@ void FilePOSIX::SeekToEnd()
void FilePOSIX::SeekToBegin()
{
WaitForOpen();
const int status = lseek(m_FileDescriptor, 0, SEEK_SET);
if (status == -1)
{
......
......@@ -11,6 +11,8 @@
#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
#include <future> //std::async, std::future
#include "adios2/common/ADIOSConfig.h"
#include "adios2/toolkit/transport/Transport.h"
......@@ -32,7 +34,8 @@ public:
~FilePOSIX();
void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;
void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final;
......@@ -52,12 +55,15 @@ public:
private:
/** POSIX file handle returned by Open */
int m_FileDescriptor = -1;
bool m_IsOpening = false;
std::future<int> m_OpenFuture;
/**
* Check if m_FileDescriptor is -1 after an operation
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};
} // end namespace transport
......
......@@ -37,8 +37,28 @@ FileStdio::~FileStdio()
}
}
void FileStdio::Open(const std::string &name, const Mode openMode)
void FileStdio::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_File = m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}
void FileStdio::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> FILE * {
return std::fopen(name.c_str(), "wb");
};
m_Name = name;
CheckName();
m_OpenMode = openMode;
......@@ -46,7 +66,16 @@ void FileStdio::Open(const std::string &name, const Mode openMode)
switch (m_OpenMode)
{
case (Mode::Write):
m_File = std::fopen(name.c_str(), "wb");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
m_File = std::fopen(name.c_str(), "wb");
}
break;
case (Mode::Append):
m_File = std::fopen(name.c_str(), "rwb");
......@@ -61,9 +90,13 @@ void FileStdio::Open(const std::string &name, const Mode openMode)
", in call to stdio fopen");
}
CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to stdio open");
m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to stdio open");
m_IsOpen = true;
}
}
void FileStdio::SetBuffer(char *buffer, size_t size)
......@@ -98,6 +131,7 @@ void FileStdio::Write(const char *buffer, size_t size, size_t start)
}
};
WaitForOpen();
if (start != MaxSizeT)
{
const auto status =
......@@ -151,6 +185,7 @@ void FileStdio::Read(char *buffer, size_t size, size_t start)
}
};
WaitForOpen();
if (start != MaxSizeT)
{
const auto status =
......@@ -182,6 +217,7 @@ void FileStdio::Read(char *buffer, size_t size, size_t start)
size_t FileStdio::GetSize()
{
WaitForOpen();
const auto currentPosition = ftell(m_File);
if (currentPosition == -1L)
{
......@@ -204,6 +240,7 @@ size_t FileStdio::GetSize()
void FileStdio::Flush()
{
WaitForOpen();
ProfilerStart("write");
const int status = std::fflush(m_File);
ProfilerStop("write");
......@@ -217,6 +254,7 @@ void FileStdio::Flush()
void FileStdio::Close()
{
WaitForOpen();
ProfilerStart("close");
const int status = std::fclose(m_File);
ProfilerStop("close");
......@@ -240,6 +278,7 @@ void FileStdio::CheckFile(const std::string hint) const
void FileStdio::SeekToEnd()
{
WaitForOpen();
const auto status = std::fseek(m_File, 0, SEEK_END);
if (status == -1)
{
......@@ -251,6 +290,7 @@ void FileStdio::SeekToEnd()
void FileStdio::SeekToBegin()
{
WaitForOpen();
const auto status = std::fseek(m_File, 0, SEEK_SET);
if (status == -1)
{
......
......@@ -12,6 +12,7 @@
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEPOINTER_H_
#include <cstdio> // FILE*
#include <future> //std::async, std::future
#include "adios2/toolkit/transport/Transport.h"
......@@ -33,7 +34,8 @@ public:
~FileStdio();
void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;
void SetBuffer(char *buffer, size_t size) final;
......@@ -54,12 +56,15 @@ public:
private:
/** C File pointer */
FILE *m_File = nullptr;
bool m_IsOpening = false;
std::future<FILE *> m_OpenFuture;
/**
* Check for std::ferror and throw an exception if true
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};
} // end namespace transport
......
......@@ -31,7 +31,8 @@ NullTransport::NullTransport(helper::Comm const &comm, const bool debugMode)
NullTransport::~NullTransport() = default;
void NullTransport::Open(const std::string &name, const Mode openMode)
void NullTransport::Open(const std::string &name, const Mode openMode,
const bool async)
{
if (Impl->IsOpen)
{
......
......@@ -34,7 +34,8 @@ public:
virtual ~NullTransport();
void Open(const std::string &name, const Mode openMode) override;
void Open(const std::string &name, const Mode openMode,
const bool async = false) override;
void SetBuffer(char *buffer, size_t size) override;
......
......@@ -47,7 +47,8 @@ ShmSystemV::~ShmSystemV() // this might not be correct
}
}
void ShmSystemV::Open(const std::string &name, const Mode openMode)
void ShmSystemV::Open(const std::string &name, const Mode openMode,
const bool async)
{
m_Name = name;
CheckName();
......
......@@ -40,7 +40,8 @@ public:
~ShmSystemV();
void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;
void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final;
......
......@@ -75,7 +75,6 @@ void TransportMan::OpenFiles(const std::vector<std::string> &fileNames,
const std::vector<Params> &parametersVector,
const bool profile)
{
WaitForAsync();
for (size_t i = 0; i < fileNames.size(); ++i)
{
const Params &parameters = parametersVector[i];
......@@ -90,40 +89,10 @@ void TransportMan::OpenFiles(const std::vector<std::string> &fileNames,
}
}
void TransportMan::OpenFilesAsync(const std::vector<std::string> &fileNames,
const Mode openMode,
const std::vector<Params> &parametersVector,
const bool profile)
{
WaitForAsync();
auto lf_OpenFiles =
[&](const std::vector<std::string> &fileNames, const Mode openMode,
const std::vector<Params> &parametersVector, const bool profile) {
for (size_t i = 0; i < fileNames.size(); ++i)
{
const Params &parameters = parametersVector[i];
const std::string type = parameters.at("transport");
if (type == "File" || type == "file")
{
std::shared_ptr<Transport> file = OpenFileTransport(