Loading source/adios2/engine/bp5/BP5Writer.cpp +62 −17 Original line number Diff line number Diff line Loading @@ -326,27 +326,34 @@ uint64_t BP5Writer::WriteMetadata(const std::vector<char> &ContigMetaData, MDataTotalSize += sizeof(uint64_t); } } // Assemble all output into a single WriteV call: // [header: MDataTotalSize|SizeVector|AttrSizeVector] [ContigMetaData] [AttributeBlocks...] MetaDataSize = 0; m_MetadataFile->Write((char *)&MDataTotalSize, sizeof(uint64_t)); MetaDataSize += sizeof(uint64_t); m_MetadataFile->Write((char *)SizeVector.data(), sizeof(uint64_t) * SizeVector.size()); MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); m_MetadataFile->Write((char *)AttrSizeVector.data(), sizeof(uint64_t) * AttrSizeVector.size()); MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); { profiling::ProfilerGuard g(m_Profiler, "WriteMD_Blocks"); m_MetadataFile->Write(ContigMetaData.data(), ContigMetaData.size()); } std::vector<uint64_t> headerBuf; headerBuf.reserve(1 + SizeVector.size() + AttrSizeVector.size()); headerBuf.push_back(static_cast<uint64_t>(MDataTotalSize)); for (auto s : SizeVector) headerBuf.push_back(static_cast<uint64_t>(s)); headerBuf.insert(headerBuf.end(), AttrSizeVector.begin(), AttrSizeVector.end()); std::vector<core::iovec> iovs; iovs.reserve(2 + AttributeBlocks.size()); iovs.push_back({headerBuf.data(), sizeof(uint64_t) * headerBuf.size()}); if (!ContigMetaData.empty()) iovs.push_back({ContigMetaData.data(), ContigMetaData.size()}); for (auto &b : AttributeBlocks) if (b.iov_base && b.iov_len) iovs.push_back(b); MetaDataSize += ContigMetaData.size(); m_MetadataFile->WriteV(iovs.data(), static_cast<int>(iovs.size())); MetaDataSize += sizeof(uint64_t); // MDataTotalSize field MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); // SizeVector fields MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); // AttrSizeVector fields MetaDataSize += ContigMetaData.size(); for (auto &b : AttributeBlocks) { if (!b.iov_base) continue; m_MetadataFile->Write((char *)b.iov_base, b.iov_len); if (b.iov_base && b.iov_len) MetaDataSize += b.iov_len; } m_MetadataFile->Flush(); Loading Loading @@ -850,6 +857,43 @@ void BP5Writer::SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo size_t AlignedMetadataSize = (TSInfo.MetaEncodeBuffer->m_FixedSize + 7) & ~0x7; MetaEncodeSize.push_back(AlignedMetadataSize); if (m_Comm.Size() == 1 && UniqueMetaMetaBlocks.empty()) { // Single rank with no new MetaMetaBlocks: skip the aggregation // ceremony (MPI gather/bcast, MD5 hashing, profiler string // construction) and go directly to write. We cannot take this // path when UniqueMetaMetaBlocks is non-empty because the // aggregation path aligns MetaMetaInfoLen to 8 bytes and // produces malloc'd copies; skipping that changes the .mmd // file layout. // The aggregation path aligns attribute block sizes to 8 bytes // and zeroes the padding. Match that here. if (!AttributeBlocks.empty() && AttributeBlocks[0].iov_base) { size_t origLen = AttributeBlocks[0].iov_len; size_t alignedLen = (origLen + 7) & ~0x7; if (alignedLen > origLen) { std::memset(static_cast<char *>(const_cast<void *>(AttributeBlocks[0].iov_base)) + origLen, 0, alignedLen - origLen); } AttributeBlocks[0].iov_len = alignedLen; } m_LatestMetaDataPos = m_MetaDataPos; std::vector<char> ContigMetadata(AlignedMetadataSize); std::memcpy(ContigMetadata.data(), TSInfo.MetaEncodeBuffer->Data(), TSInfo.MetaEncodeBuffer->m_FixedSize); m_LatestMetaDataSize = WriteMetadata(ContigMetadata, MetaEncodeSize, AttributeBlocks); // AttributeBlocks[0].iov_base points into TSInfo.AttributeEncodeBuffer // (a shared_ptr) — do NOT free it; the shared_ptr owns that memory. if (!m_Parameters.AsyncWrite) WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize); return; } { std::string aggInfo_str = agg_str + "_AggInfo"; m_Profiler.AddTimerWatch(aggInfo_str); Loading Loading @@ -1073,6 +1117,7 @@ void BP5Writer::EndStep() } #ifdef ADIOS2_HAVE_DERIVED_VARIABLE if (!m_IO.GetDerivedVariables().empty()) { profiling::ProfilerGuard g(m_Profiler, "ES_DeriveVars"); ComputeDerivedVariables(); Loading source/adios2/toolkit/profiling/iochrono/IOChrono.h +18 −6 Original line number Diff line number Diff line Loading @@ -64,6 +64,15 @@ public: void Start(const std::string process) { m_Profiler.Start(process); }; void Stop(const std::string process) { m_Profiler.Stop(process); }; /** Look up a timer once; returns null if inactive or not found. */ Timer *GetTimer(const std::string &process) noexcept { if (!m_Profiler.m_IsActive) return nullptr; auto it = m_Profiler.m_Timers.find(process); return (it != m_Profiler.m_Timers.end()) ? &it->second : nullptr; } void AddBytes(const std::string process, size_t bytes) { m_Profiler.m_Bytes[process] += bytes; Loading @@ -85,16 +94,19 @@ private: class ProfilerGuard { public: explicit ProfilerGuard(JSONProfiler &host, const std::string &tag) : m_HostProfiler(host), m_Tag(tag) explicit ProfilerGuard(JSONProfiler &host, const std::string &tag) : m_Timer(host.GetTimer(tag)) { if (m_Timer) m_Timer->Resume(); } ~ProfilerGuard() { m_HostProfiler.Start(tag); if (m_Timer) m_Timer->Pause(); } ~ProfilerGuard() { m_HostProfiler.Stop(m_Tag); } private: JSONProfiler &m_HostProfiler; std::string m_Tag; Timer *m_Timer; }; } // end namespace profiling } // end namespace adios Loading source/adios2/toolkit/transport/file/FilePOSIX.cpp +31 −3 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ #include <sys/types.h> // open #include <thread> #ifndef _MSC_VER #include <sys/uio.h> // writev #include <unistd.h> // write, close, ftruncate #ifndef O_BINARY #define O_BINARY 0 Loading Loading @@ -295,6 +296,11 @@ void FilePOSIX::OpenChain(const std::string &name, Mode openMode, const helper:: void FilePOSIX::Write(const char *buffer, size_t size, size_t start) { if (size == 0 && start == MaxSizeT) { return; } auto lf_Write = [&](const char *buffer, size_t size) { while (size > 0) { Loading Loading @@ -362,7 +368,7 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start) } } #ifdef REALLY_WANT_WRITEV #ifndef _MSC_VER void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) { auto lf_Write = [&](const core::iovec *iov, const int iovcnt) { Loading Loading @@ -442,6 +448,10 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) } } // writev() is limited to ~2GB total per call on some platforms // (macOS returns EINVAL above INT32_MAX). Compute total size for // each batch and fall back to sequential Write() if too large. static constexpr size_t WritevMaxBytes = INT32_MAX; int cntTotal = 0; while (cntTotal < iovcnt) { Loading @@ -450,7 +460,25 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) { cnt = 8; } // Check total size for this batch size_t batchBytes = 0; for (int i = 0; i < cnt; ++i) { batchBytes += iov[cntTotal + i].iov_len; } if (batchBytes > WritevMaxBytes) { // Fall back to individual write() calls for (int i = 0; i < cnt; ++i) { Write(static_cast<const char *>(iov[cntTotal + i].iov_base), iov[cntTotal + i].iov_len); } } else { lf_Write(iov + cntTotal, cnt); } cntTotal += cnt; } } Loading source/adios2/toolkit/transport/file/FilePOSIX.h +1 −2 Original line number Diff line number Diff line Loading @@ -38,8 +38,7 @@ public: void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; #ifdef REALLY_WANT_WRITEV /* Actual writev() function, inactive for now */ #ifndef _MSC_VER void WriteV(const core::iovec *iov, const int iovcnt, size_t start = MaxSizeT) final; #endif Loading Loading
source/adios2/engine/bp5/BP5Writer.cpp +62 −17 Original line number Diff line number Diff line Loading @@ -326,27 +326,34 @@ uint64_t BP5Writer::WriteMetadata(const std::vector<char> &ContigMetaData, MDataTotalSize += sizeof(uint64_t); } } // Assemble all output into a single WriteV call: // [header: MDataTotalSize|SizeVector|AttrSizeVector] [ContigMetaData] [AttributeBlocks...] MetaDataSize = 0; m_MetadataFile->Write((char *)&MDataTotalSize, sizeof(uint64_t)); MetaDataSize += sizeof(uint64_t); m_MetadataFile->Write((char *)SizeVector.data(), sizeof(uint64_t) * SizeVector.size()); MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); m_MetadataFile->Write((char *)AttrSizeVector.data(), sizeof(uint64_t) * AttrSizeVector.size()); MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); { profiling::ProfilerGuard g(m_Profiler, "WriteMD_Blocks"); m_MetadataFile->Write(ContigMetaData.data(), ContigMetaData.size()); } std::vector<uint64_t> headerBuf; headerBuf.reserve(1 + SizeVector.size() + AttrSizeVector.size()); headerBuf.push_back(static_cast<uint64_t>(MDataTotalSize)); for (auto s : SizeVector) headerBuf.push_back(static_cast<uint64_t>(s)); headerBuf.insert(headerBuf.end(), AttrSizeVector.begin(), AttrSizeVector.end()); std::vector<core::iovec> iovs; iovs.reserve(2 + AttributeBlocks.size()); iovs.push_back({headerBuf.data(), sizeof(uint64_t) * headerBuf.size()}); if (!ContigMetaData.empty()) iovs.push_back({ContigMetaData.data(), ContigMetaData.size()}); for (auto &b : AttributeBlocks) if (b.iov_base && b.iov_len) iovs.push_back(b); MetaDataSize += ContigMetaData.size(); m_MetadataFile->WriteV(iovs.data(), static_cast<int>(iovs.size())); MetaDataSize += sizeof(uint64_t); // MDataTotalSize field MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); // SizeVector fields MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); // AttrSizeVector fields MetaDataSize += ContigMetaData.size(); for (auto &b : AttributeBlocks) { if (!b.iov_base) continue; m_MetadataFile->Write((char *)b.iov_base, b.iov_len); if (b.iov_base && b.iov_len) MetaDataSize += b.iov_len; } m_MetadataFile->Flush(); Loading Loading @@ -850,6 +857,43 @@ void BP5Writer::SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo size_t AlignedMetadataSize = (TSInfo.MetaEncodeBuffer->m_FixedSize + 7) & ~0x7; MetaEncodeSize.push_back(AlignedMetadataSize); if (m_Comm.Size() == 1 && UniqueMetaMetaBlocks.empty()) { // Single rank with no new MetaMetaBlocks: skip the aggregation // ceremony (MPI gather/bcast, MD5 hashing, profiler string // construction) and go directly to write. We cannot take this // path when UniqueMetaMetaBlocks is non-empty because the // aggregation path aligns MetaMetaInfoLen to 8 bytes and // produces malloc'd copies; skipping that changes the .mmd // file layout. // The aggregation path aligns attribute block sizes to 8 bytes // and zeroes the padding. Match that here. if (!AttributeBlocks.empty() && AttributeBlocks[0].iov_base) { size_t origLen = AttributeBlocks[0].iov_len; size_t alignedLen = (origLen + 7) & ~0x7; if (alignedLen > origLen) { std::memset(static_cast<char *>(const_cast<void *>(AttributeBlocks[0].iov_base)) + origLen, 0, alignedLen - origLen); } AttributeBlocks[0].iov_len = alignedLen; } m_LatestMetaDataPos = m_MetaDataPos; std::vector<char> ContigMetadata(AlignedMetadataSize); std::memcpy(ContigMetadata.data(), TSInfo.MetaEncodeBuffer->Data(), TSInfo.MetaEncodeBuffer->m_FixedSize); m_LatestMetaDataSize = WriteMetadata(ContigMetadata, MetaEncodeSize, AttributeBlocks); // AttributeBlocks[0].iov_base points into TSInfo.AttributeEncodeBuffer // (a shared_ptr) — do NOT free it; the shared_ptr owns that memory. if (!m_Parameters.AsyncWrite) WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize); return; } { std::string aggInfo_str = agg_str + "_AggInfo"; m_Profiler.AddTimerWatch(aggInfo_str); Loading Loading @@ -1073,6 +1117,7 @@ void BP5Writer::EndStep() } #ifdef ADIOS2_HAVE_DERIVED_VARIABLE if (!m_IO.GetDerivedVariables().empty()) { profiling::ProfilerGuard g(m_Profiler, "ES_DeriveVars"); ComputeDerivedVariables(); Loading
source/adios2/toolkit/profiling/iochrono/IOChrono.h +18 −6 Original line number Diff line number Diff line Loading @@ -64,6 +64,15 @@ public: void Start(const std::string process) { m_Profiler.Start(process); }; void Stop(const std::string process) { m_Profiler.Stop(process); }; /** Look up a timer once; returns null if inactive or not found. */ Timer *GetTimer(const std::string &process) noexcept { if (!m_Profiler.m_IsActive) return nullptr; auto it = m_Profiler.m_Timers.find(process); return (it != m_Profiler.m_Timers.end()) ? &it->second : nullptr; } void AddBytes(const std::string process, size_t bytes) { m_Profiler.m_Bytes[process] += bytes; Loading @@ -85,16 +94,19 @@ private: class ProfilerGuard { public: explicit ProfilerGuard(JSONProfiler &host, const std::string &tag) : m_HostProfiler(host), m_Tag(tag) explicit ProfilerGuard(JSONProfiler &host, const std::string &tag) : m_Timer(host.GetTimer(tag)) { if (m_Timer) m_Timer->Resume(); } ~ProfilerGuard() { m_HostProfiler.Start(tag); if (m_Timer) m_Timer->Pause(); } ~ProfilerGuard() { m_HostProfiler.Stop(m_Tag); } private: JSONProfiler &m_HostProfiler; std::string m_Tag; Timer *m_Timer; }; } // end namespace profiling } // end namespace adios Loading
source/adios2/toolkit/transport/file/FilePOSIX.cpp +31 −3 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ #include <sys/types.h> // open #include <thread> #ifndef _MSC_VER #include <sys/uio.h> // writev #include <unistd.h> // write, close, ftruncate #ifndef O_BINARY #define O_BINARY 0 Loading Loading @@ -295,6 +296,11 @@ void FilePOSIX::OpenChain(const std::string &name, Mode openMode, const helper:: void FilePOSIX::Write(const char *buffer, size_t size, size_t start) { if (size == 0 && start == MaxSizeT) { return; } auto lf_Write = [&](const char *buffer, size_t size) { while (size > 0) { Loading Loading @@ -362,7 +368,7 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start) } } #ifdef REALLY_WANT_WRITEV #ifndef _MSC_VER void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) { auto lf_Write = [&](const core::iovec *iov, const int iovcnt) { Loading Loading @@ -442,6 +448,10 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) } } // writev() is limited to ~2GB total per call on some platforms // (macOS returns EINVAL above INT32_MAX). Compute total size for // each batch and fall back to sequential Write() if too large. static constexpr size_t WritevMaxBytes = INT32_MAX; int cntTotal = 0; while (cntTotal < iovcnt) { Loading @@ -450,7 +460,25 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) { cnt = 8; } // Check total size for this batch size_t batchBytes = 0; for (int i = 0; i < cnt; ++i) { batchBytes += iov[cntTotal + i].iov_len; } if (batchBytes > WritevMaxBytes) { // Fall back to individual write() calls for (int i = 0; i < cnt; ++i) { Write(static_cast<const char *>(iov[cntTotal + i].iov_base), iov[cntTotal + i].iov_len); } } else { lf_Write(iov + cntTotal, cnt); } cntTotal += cnt; } } Loading
source/adios2/toolkit/transport/file/FilePOSIX.h +1 −2 Original line number Diff line number Diff line Loading @@ -38,8 +38,7 @@ public: void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; #ifdef REALLY_WANT_WRITEV /* Actual writev() function, inactive for now */ #ifndef _MSC_VER void WriteV(const core::iovec *iov, const int iovcnt, size_t start = MaxSizeT) final; #endif Loading