Unverified Commit 7c2bf0e8 authored by pnorbert's avatar pnorbert Committed by GitHub
Browse files

Merge pull request #1571 from pnorbert/bp4headers

- Add header to all BP4 data files, metadata files and index table
parents 8d759c35 e1f87177
......@@ -280,23 +280,7 @@ void BP4Writer::InitBPBuffer()
{
// Set the flag in the header of metadata index table to 0 again
// to indicate a new run begins
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(),
'\0');
metadataIndex.m_Position = 0;
const uint64_t currentRunIsOver = 0;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position,
&currentRunIsOver);
m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56,
0);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
UpdateActiveFlag(true);
// Get the size of existing metadata file
m_BP4Serializer.m_PreMetadataFileLength =
......@@ -307,6 +291,22 @@ void BP4Writer::InitBPBuffer()
}
}
if (m_BP4Serializer.m_PreDataFileLength == 0)
{
/* This is a new file.
* Make headers in data buffer and metadata buffer
*/
if (m_BP4Serializer.m_RankMPI == 0)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Metadata, "Metadata",
false);
}
if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Data, "Data", false);
}
}
m_BP4Serializer.PutProcessGroupIndex(
m_IO.m_Name, m_IO.m_HostLanguage,
m_FileDataManager.GetTransportsTypes());
......@@ -409,68 +409,17 @@ void BP4Writer::WriteProfilingJSONFile()
}
}
/*generate the header for the metadata index file*/
void BP4Writer::PopulateMetadataIndexFileHeader(std::vector<char> &buffer,
size_t &position,
const uint8_t version,
const bool addSubfiles)
{
TAU_SCOPED_TIMER("BP4Writer::PopulateMetadataIndexFileHeader");
auto lf_CopyVersionChar = [](const std::string version,
std::vector<char> &buffer, size_t &position) {
helper::CopyToBuffer(buffer, position, version.c_str());
};
const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR));
const std::string minorVersion(std::to_string(ADIOS2_VERSION_MINOR));
const std::string patchVersion(std::to_string(ADIOS2_VERSION_PATCH));
const std::string versionLongTag("ADIOS-BP v" + majorVersion + "." +
minorVersion + "." + patchVersion);
const size_t versionLongTagSize = versionLongTag.size();
if (versionLongTagSize < 24)
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(),
versionLongTagSize);
position += 24 - versionLongTagSize;
}
else
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(), 24);
}
lf_CopyVersionChar(majorVersion, buffer, position);
lf_CopyVersionChar(minorVersion, buffer, position);
lf_CopyVersionChar(patchVersion, buffer, position);
++position;
const uint8_t endianness = helper::IsLittleEndian() ? 0 : 1;
helper::CopyToBuffer(buffer, position, &endianness);
if (addSubfiles)
{
const uint8_t zeros1 = 0;
helper::CopyToBuffer(buffer, position, &zeros1);
helper::CopyToBuffer(buffer, position, &version);
}
else
{
const uint16_t zeros2 = 0;
helper::CopyToBuffer(buffer, position, &zeros2);
}
helper::CopyToBuffer(buffer, position, &version);
position += 32;
}
/*write the content of metadata index file*/
void BP4Writer::PopulateMetadataIndexFileContent(
const uint64_t currentStep, const uint64_t mpirank,
BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp, std::vector<char> &buffer,
size_t &position)
const uint64_t currentTimeStamp)
{
TAU_SCOPED_TIMER("BP4Writer::PopulateMetadataIndexFileContent");
auto &buffer = b.m_Buffer;
auto &position = b.m_Position;
auto &absolutePosition = b.m_AbsolutePosition;
helper::CopyToBuffer(buffer, position, &currentStep);
helper::CopyToBuffer(buffer, position, &mpirank);
helper::CopyToBuffer(buffer, position, &pgIndexStart);
......@@ -481,6 +430,15 @@ void BP4Writer::PopulateMetadataIndexFileContent(
position += 8;
}
void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
}
void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{
......@@ -495,18 +453,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{
// But the flag in the header of metadata index table needs to be
// modified to indicate current run is over.
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
const uint64_t currentRunIsOver = 1;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position, &currentRunIsOver);
m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56, 0);
m_FileMetadataIndexManager.FlushFiles();
UpdateActiveFlag(false);
}
return;
}
......@@ -540,9 +487,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
m_BP4Serializer.m_PreMetadataFileLength;
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(64);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
metadataIndex.Resize(128, "BP4 Index Table Entry");
uint64_t currentStep;
if (isFinal && m_BP4Serializer.m_MetadataSet.DataPGCount > 0)
......@@ -562,23 +507,15 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
if (currentStep == 1) // TimeStep starts from 1
{
PopulateMetadataIndexFileHeader(metadataIndex.m_Buffer,
metadataIndex.m_Position, 4, true);
m_FileMetadataIndexManager.WriteFiles(metadataIndex.m_Buffer.data(),
metadataIndex.m_Position);
metadataIndex.m_Buffer.resize(64);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
m_BP4Serializer.MakeHeader(metadataIndex, "Index Table", true);
}
std::time_t currentTimeStamp = std::time(nullptr);
PopulateMetadataIndexFileContent(
currentStep, m_BP4Serializer.m_RankMPI, pgIndexStartMetadataFile,
varIndexStartMetadataFile, attrIndexStartMetadataFile,
currentStepEndPos, currentTimeStamp, metadataIndex.m_Buffer,
metadataIndex.m_Position);
metadataIndex, currentStep, m_BP4Serializer.m_RankMPI,
pgIndexStartMetadataFile, varIndexStartMetadataFile,
attrIndexStartMetadataFile, currentStepEndPos, currentTimeStamp);
m_FileMetadataIndexManager.WriteFiles(metadataIndex.m_Buffer.data(),
metadataIndex.m_Position);
......@@ -592,18 +529,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
// Only one step of metadata is generated at close.
// The flag in the header of metadata index table
// needs to be modified to indicate current run is over.
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
const uint64_t currentRunIsOver = 1;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position, &currentRunIsOver);
m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56, 0);
m_FileMetadataIndexManager.FlushFiles();
UpdateActiveFlag(false);
}
}
/*Clear the local indices buffer at the end of each step*/
......
......@@ -94,16 +94,13 @@ private:
* profilers*/
void WriteProfilingJSONFile();
void PopulateMetadataIndexFileHeader(std::vector<char> &buffer,
size_t &position, const uint8_t,
const bool addSubfiles);
void UpdateActiveFlag(const bool active);
void PopulateMetadataIndexFileContent(
const uint64_t currentStep, const uint64_t mpirank,
BufferSTL &buffer, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp, std::vector<char> &buffer,
size_t &position);
const uint64_t currentTimeStamp);
void WriteCollectiveMetadataFile(const bool isFinal = false);
......
......@@ -216,6 +216,13 @@ public:
size_t m_PreMetadataFileLength = 0;
size_t m_PreDataFileLength = 0;
/** Positions of flags in Index Table Header that Reader uses */
const size_t m_EndianFlagPosition = 36;
const size_t m_BPVersionPosition = 37;
const size_t m_ActiveFlagPosition = 38;
const size_t m_VersionTagPosition = 0;
const size_t m_VersionTagLength = 32;
/**
* Unique constructor
* @param mpiComm for m_BP1Aggregator
......@@ -294,7 +301,7 @@ public:
};
/**
* Resizes the data buffer to hold new dataIn size
* Resizes the data buffer to hold additional new dataIn size
* @param dataIn input size for new data
* @param hint for exception handling
* @return
......
......@@ -60,8 +60,14 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL)
{
const auto &buffer = bufferSTL.m_Buffer;
const size_t bufferSize = buffer.size();
size_t position = 0;
position += 28;
// Read header (64 bytes)
// long version string
size_t position = m_VersionTagPosition;
m_Minifooter.VersionTag.assign(&buffer[position], m_VersionTagLength);
position = m_EndianFlagPosition;
const uint8_t endianness = helper::ReadValue<uint8_t>(buffer, position);
m_Minifooter.IsLittleEndian = (endianness == 0) ? true : false;
#ifndef ADIOS2_HAVE_ENDIAN_REVERSE
......@@ -78,33 +84,28 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL)
}
#endif
position += 1;
const int8_t fileType = helper::ReadValue<int8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (fileType >= 3)
{
m_Minifooter.HasSubFiles = true;
}
else if (fileType == 0 || fileType == 2)
{
m_Minifooter.HasSubFiles = false;
}
// This has no flag in BP4 header. Always true
m_Minifooter.HasSubFiles = true;
// BP version
position = m_BPVersionPosition;
m_Minifooter.Version = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (m_Minifooter.Version < 3)
if (m_Minifooter.Version != 4)
{
throw std::runtime_error("ERROR: ADIOS2 only supports bp format "
"version 3 and above, found " +
std::to_string(m_Minifooter.Version) +
" version \n");
throw std::runtime_error(
"ERROR: ADIOS2 BP4 Engine only supports bp format "
"version 4, found " +
std::to_string(m_Minifooter.Version) + " version \n");
}
position = 0;
m_Minifooter.VersionTag.assign(&buffer[position], 28);
// active flag, not used yet in the reader
position = m_ActiveFlagPosition;
const uint8_t activeFlag = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
position += 64;
// Read each record now
position = 64;
while (position < bufferSize)
{
std::vector<uint64_t> ptrs;
......
......@@ -35,6 +35,115 @@ BP4Serializer::BP4Serializer(MPI_Comm mpiComm, const bool debugMode)
{
}
/*generate the header for the metadata index file*/
void BP4Serializer::MakeHeader(BufferSTL &b, const std::string fileType,
const bool isActive)
{
auto lf_CopyVersionChar = [](const std::string version,
std::vector<char> &buffer, size_t &position) {
helper::CopyToBuffer(buffer, position, version.c_str());
};
auto &buffer = b.m_Buffer;
auto &position = b.m_Position;
auto &absolutePosition = b.m_AbsolutePosition;
if (position > 0)
{
throw std::invalid_argument(
"ERROR: BP4Serializer::MakeHeader can only be called for an empty "
"buffer. This one for " +
fileType + " already has content of " + std::to_string(position) +
" bytes.");
}
if (b.GetAvailableSize() < 64)
{
b.Resize(position + 64, "BP4Serializer::MakeHeader " + fileType);
}
const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR));
const std::string minorVersion(std::to_string(ADIOS2_VERSION_MINOR));
const std::string patchVersion(std::to_string(ADIOS2_VERSION_PATCH));
// byte 0-31: Readable tag
if (position != m_VersionTagPosition)
{
throw std::runtime_error(
"ADIOS Coding ERROR in BP4Serializer::MakeHeader. Version Tag "
"position mismatch");
}
std::string versionLongTag("ADIOS-BP v" + majorVersion + "." +
minorVersion + "." + patchVersion + " ");
size_t maxTypeLen = m_VersionTagLength - versionLongTag.size();
const std::string fileTypeStr = fileType.substr(0, maxTypeLen);
versionLongTag += fileTypeStr;
const size_t versionLongTagSize = versionLongTag.size();
if (versionLongTagSize < m_VersionTagLength)
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(),
versionLongTagSize);
position += m_VersionTagLength - versionLongTagSize;
}
else if (versionLongTagSize > m_VersionTagLength)
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(),
m_VersionTagLength);
}
else
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(),
m_VersionTagLength);
}
// byte 32-35: MAJOR MINOR PATCH Unused
lf_CopyVersionChar(majorVersion, buffer, position);
lf_CopyVersionChar(minorVersion, buffer, position);
lf_CopyVersionChar(patchVersion, buffer, position);
++position;
// Note: Reader does process and use bytes 36-38 in
// BP4Deserialize.cpp::ParseMetadataIndex().
// Order and position must match there.
// byte 36: endianness
if (position != m_EndianFlagPosition)
{
throw std::runtime_error(
"ADIOS Coding ERROR in BP4Serializer::MakeHeader. Endian Flag "
"position mismatch");
}
const uint8_t endianness = helper::IsLittleEndian() ? 0 : 1;
helper::CopyToBuffer(buffer, position, &endianness);
// byte 37: BP Version 4
if (position != m_BPVersionPosition)
{
throw std::runtime_error(
"ADIOS Coding ERROR in BP4Serializer::MakeHeader. Active Flag "
"position mismatch");
}
const uint8_t version = 4;
helper::CopyToBuffer(buffer, position, &version);
// byte 38: Active flag (used in Index Table only)
if (position != m_ActiveFlagPosition)
{
throw std::runtime_error(
"ADIOS Coding ERROR in BP4Serializer::MakeHeader. Active Flag "
"position mismatch");
}
const uint8_t activeFlag = (isActive ? 1 : 0);
helper::CopyToBuffer(buffer, position, &activeFlag);
// byte 39: unused
position += 1;
// byte 40-63: unused
position += 24;
absolutePosition = position;
}
void BP4Serializer::PutProcessGroupIndex(
const std::string &ioName, const std::string hostLanguage,
const std::vector<std::string> &transportsTypes) noexcept
......
......@@ -35,6 +35,15 @@ public:
~BP4Serializer() = default;
/** Writes a 64 byte header into the data/metadata buffer.
* Must be called only when the buffer is empty.
* @param buffer the data or metadata buffer
* @param fileType a small string up to 8 characters that is
* concatenated to the version string
*/
void MakeHeader(BufferSTL &b, const std::string fileType,
const bool isActive);
/**
* Writes a process group index PGIndex and list of methods (from
* transports). Done at Open or Advance.
......
......@@ -169,7 +169,9 @@ def ReadVarData(f, nElements, typeID, ldims, expectedSize,
print("Expected size = {0} calculated size from dimensions = {1}".
format(expectedSize, nBytes[0]))
print(" Variable Data : {0} bytes".format(nBytes[0]))
# seek instead of reading for now
f.read(nBytes[0])
# f.seek(nBytes[0], 1)
# data = readDataToNumpyArray(f, bp4dbg_utils.GetTypeName(typeID),
# nElements)
return True
......@@ -211,14 +213,14 @@ def ReadVMD(f, varidx, varsStartPosition, varsTotalLength):
# VAR NAME, 2 bytes length + string without \0
sizeLimit = expectedVarBlockLength - (f.tell() - startPosition)
status, varname = ReadEncodedString(f, "Var Name", sizeLimit)
if (not status):
if not status:
return False
print(" Var Name : " + varname)
# VAR PATH, 2 bytes length + string without \0
sizeLimit = expectedVarBlockLength - (f.tell() - startPosition)
status, varpath = ReadEncodedString(f, "Var Path", sizeLimit)
if (not status):
if not status:
return False
print(" Var Path : " + varpath)
......@@ -294,7 +296,7 @@ def ReadVMD(f, varidx, varsStartPosition, varsTotalLength):
sizeLimit = expectedVarBlockLength - (f.tell() - startPosition)
status = ReadCharacteristicsFromData(f, sizeLimit, typeID)
if (not status):
if not status:
return False
# Padded end TAG
......@@ -311,7 +313,7 @@ def ReadVMD(f, varidx, varsStartPosition, varsTotalLength):
expectedVarDataSize = expectedVarBlockLength - (f.tell() - startPosition)
status = ReadVarData(f, nElements, typeID, ldims, expectedVarDataSize,
varsStartPosition, varsTotalLength)
if (not status):
if not status:
return False
return True
......@@ -353,14 +355,14 @@ def ReadAMD(f, attridx, attrsStartPosition, attrsTotalLength):
# ATTR NAME, 2 bytes length + string without \0
sizeLimit = expectedAttrBlockLength - (f.tell() - startPosition)
status, attrname = ReadEncodedString(f, "Attr Name", sizeLimit)
if (not status):
if not status:
return False
print(" Attr Name : " + attrname)
# ATTR PATH, 2 bytes length + string without \0
sizeLimit = expectedAttrBlockLength - (f.tell() - startPosition)
status, attrpath = ReadEncodedString(f, "Attr Path", sizeLimit)
if (not status):
if not status:
return False
print(" Attr Path : " + attrpath)
......@@ -428,7 +430,8 @@ def ReadAMD(f, attridx, attrsStartPosition, attrsTotalLength):
def ReadPG(f, fileSize, pgidx):
pgStartPosition = f.tell()
print("========================================================")
if pgidx > 0:
print("========================================================")
print("Process Group {0}: ".format(pgidx))
print(" Starting offset : {0}".format(pgStartPosition))
tag = f.read(4)
......@@ -460,7 +463,7 @@ def ReadPG(f, fileSize, pgidx):
# PG Name, 2 bytes length + string without \0
sizeLimit = expectedPGLength - (f.tell() - pgStartPosition)
status, pgname = ReadEncodedString(f, "PG Name", sizeLimit)
if (not status):
if not status:
return False
print(" PG Name : " + pgname)
......@@ -471,7 +474,7 @@ def ReadPG(f, fileSize, pgidx):
# Timestep name
sizeLimit = expectedPGLength - (f.tell() - pgStartPosition)
status, tsname = ReadEncodedString(f, "Timestep Name", sizeLimit)
if (not status):
if not status:
return False
print(" Step Name : " + tsname)
......@@ -495,7 +498,7 @@ def ReadPG(f, fileSize, pgidx):
sizeLimit = expectedPGLength - (f.tell() - pgStartPosition)
status, methodParams = ReadEncodedString(
f, "Method Parameters", sizeLimit)
if (not status):
if not status:
return False
print(' M. params : "' + methodParams + '"')
......@@ -518,7 +521,7 @@ def ReadPG(f, fileSize, pgidx):
for i in range(nVars):
# VMD block
status = ReadVMD(f, i, varsStartPosition, expectedVarsLength)
if (not status):
if not status:
return False
# ATTRIBUTES
......@@ -543,7 +546,7 @@ def ReadPG(f, fileSize, pgidx):
for i in range(nAttrs):
# AMD block
status = ReadAMD(f, i, attrsStartPosition, expectedAttrsLength)
if (not status):
if not status:
return False
# End TAG PGI]
......@@ -560,13 +563,17 @@ def ReadPG(f, fileSize, pgidx):
def DumpData(fileName):
print("========================================================")
print(" Data File: " + fileName)
print("========================================================")
with open(fileName, "rb") as f:
fileSize = fstat(f.fileno()).st_size
status = True
status = bp4dbg_utils.ReadHeader(f, fileSize, "Data")
if not status:
return status
pgidx = 0
while (f.tell() < fileSize - 12 and status):
status = ReadPG(f, fileSize, pgidx)
pgidx = pgidx + 1
return status
if __name__ == "__main__":
......
......@@ -3,10 +3,9 @@ from __future__ import (absolute_import, division, print_function,
import numpy as np
from os import fstat
import bp4dbg_utils
# Read one PG process group (variables and attributes from one process in
# one step)
def ReadIndexHeader(f, fileSize):
status = True