Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Podhorszki, Norbert
ADIOS2
Commits
6ec7bebf
Commit
6ec7bebf
authored
Mar 08, 2020
by
Ruonan Wang
Browse files
replaced MPI_WORLD_COMM with stream communicator in SSC
parent
558add13
Changes
7
Hide whitespace changes
Inline
Side-by-side
source/adios2/engine/ssc/SscReader.cpp
View file @
6ec7bebf
...
...
@@ -27,10 +27,6 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
:
Engine
(
"SscReader"
,
io
,
name
,
mode
,
std
::
move
(
comm
))
{
TAU_SCOPED_TIMER_FUNC
();
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
m_WorldRank
);
MPI_Comm_size
(
MPI_COMM_WORLD
,
&
m_WorldSize
);
m_ReaderRank
=
m_Comm
.
Rank
();
m_ReaderSize
=
m_Comm
.
Size
();
ssc
::
GetParameter
(
m_IO
.
m_Parameters
,
"MpiMode"
,
m_MpiMode
);
ssc
::
GetParameter
(
m_IO
.
m_Parameters
,
"Verbose"
,
m_Verbosity
);
...
...
@@ -44,8 +40,13 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
m_Buffer
.
resize
(
1
);
m_GlobalWritePattern
.
resize
(
m_WorldSize
);
SyncMpiPattern
();
m_ReaderRank
=
m_Comm
.
Rank
();
m_ReaderSize
=
m_Comm
.
Size
();
MPI_Comm_rank
(
m_StreamComm
,
&
m_StreamRank
);
MPI_Comm_size
(
m_StreamComm
,
&
m_StreamSize
);
m_GlobalWritePattern
.
resize
(
m_StreamSize
);
}
SscReader
::~
SscReader
()
{
TAU_SCOPED_TIMER_FUNC
();
}
...
...
@@ -96,7 +97,7 @@ void SscReader::GetTwoSided()
{
requests
.
emplace_back
();
MPI_Irecv
(
m_Buffer
.
data
()
+
i
.
second
.
first
,
i
.
second
.
second
,
MPI_CHAR
,
i
.
first
,
0
,
MPI_COMM_WORLD
,
&
requests
.
back
());
i
.
first
,
0
,
m_StreamComm
,
&
requests
.
back
());
}
MPI_Status
statuses
[
requests
.
size
()];
MPI_Waitall
(
requests
.
size
(),
requests
.
data
(),
statuses
);
...
...
@@ -111,7 +112,7 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
{
m_InitialStep
=
false
;
SyncWritePattern
();
MPI_Win_create
(
NULL
,
0
,
1
,
MPI_INFO_NULL
,
MPI_COMM_WORLD
,
&
m_MpiWin
);
MPI_Win_create
(
NULL
,
0
,
1
,
MPI_INFO_NULL
,
m_StreamComm
,
&
m_MpiWin
);
MPI_Win_start
(
m_MpiAllWritersGroup
,
0
,
m_MpiWin
);
}
else
...
...
@@ -141,7 +142,7 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscReader::BeginStep, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscReader::BeginStep, World Rank "
<<
m_
Stream
Rank
<<
", Reader Rank "
<<
m_ReaderRank
<<
", Step "
<<
m_CurrentStep
<<
std
::
endl
;
}
...
...
@@ -171,11 +172,11 @@ void SscReader::EndStep()
MPI_Win_free
(
&
m_MpiWin
);
SyncReadPattern
();
MPI_Win_create
(
m_Buffer
.
data
(),
m_Buffer
.
size
(),
1
,
MPI_INFO_NULL
,
MPI_COMM_WORLD
,
&
m_MpiWin
);
m_StreamComm
,
&
m_MpiWin
);
}
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscReader::EndStep, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscReader::EndStep, World Rank "
<<
m_
Stream
Rank
<<
", Reader Rank "
<<
m_ReaderRank
<<
std
::
endl
;
}
}
...
...
@@ -188,7 +189,7 @@ void SscReader::SyncMpiPattern()
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscReader::SyncMpiPattern, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscReader::SyncMpiPattern, World Rank "
<<
m_
Stream
Rank
<<
", Reader Rank "
<<
m_ReaderRank
<<
std
::
endl
;
}
...
...
@@ -196,11 +197,14 @@ void SscReader::SyncMpiPattern()
m_MaxFilenameLength
,
m_RendezvousAppCount
,
CommAsMPI
(
m_Comm
));
std
::
vector
<
int
>
allWorkerRanks
;
std
::
vector
<
int
>
allWriterRanks
;
for
(
const
auto
&
app
:
m_MpiHandshake
.
GetWriterMap
(
m_Name
))
{
for
(
int
rank
:
app
.
second
)
{
m_AllWriterRanks
.
push_back
(
rank
);
allWriterRanks
.
push_back
(
rank
);
allWorkerRanks
.
push_back
(
rank
);
}
}
...
...
@@ -208,14 +212,21 @@ void SscReader::SyncMpiPattern()
{
for
(
int
rank
:
app
.
second
)
{
m_AllRead
erRanks
.
push_back
(
rank
);
allWork
erRanks
.
push_back
(
rank
);
}
}
MPI_Group
worldGroup
;
MPI_Comm_group
(
MPI_COMM_WORLD
,
&
worldGroup
);
MPI_Group_incl
(
worldGroup
,
m_A
llWriterRanks
.
size
(),
m_A
llWriterRanks
.
data
(),
MPI_Group_incl
(
worldGroup
,
a
llWriterRanks
.
size
(),
a
llWriterRanks
.
data
(),
&
m_MpiAllWritersGroup
);
std
::
sort
(
allWorkerRanks
.
begin
(),
allWorkerRanks
.
end
());
MPI_Group
allWorkersGroup
;
MPI_Group_incl
(
worldGroup
,
allWorkerRanks
.
size
(),
allWorkerRanks
.
data
(),
&
allWorkersGroup
);
MPI_Comm_create_group
(
MPI_COMM_WORLD
,
allWorkersGroup
,
0
,
&
m_StreamComm
);
MPI_Barrier
(
m_StreamComm
);
}
void
SscReader
::
SyncWritePattern
()
...
...
@@ -223,7 +234,7 @@ void SscReader::SyncWritePattern()
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscReader::SyncWritePattern, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscReader::SyncWritePattern, World Rank "
<<
m_
Stream
Rank
<<
", Reader Rank "
<<
m_ReaderRank
<<
std
::
endl
;
}
...
...
@@ -231,16 +242,16 @@ void SscReader::SyncWritePattern()
size_t
localSize
=
0
;
size_t
maxLocalSize
;
MPI_Allreduce
(
&
localSize
,
&
maxLocalSize
,
1
,
MPI_UNSIGNED_LONG_LONG
,
MPI_MAX
,
MPI_COMM_WORLD
);
m_StreamComm
);
std
::
vector
<
char
>
localVec
(
maxLocalSize
,
'\0'
);
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
World
Size
);
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
Stream
Size
);
MPI_Allgather
(
localVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
globalVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
MPI_COMM_WORLD
);
maxLocalSize
,
MPI_CHAR
,
m_StreamComm
);
// deserialize global metadata Json
try
{
for
(
size_t
i
=
0
;
i
<
m_
World
Size
;
++
i
)
for
(
size_t
i
=
0
;
i
<
m_
Stream
Size
;
++
i
)
{
if
(
globalVec
[
i
*
maxLocalSize
]
==
'\0'
)
{
...
...
@@ -287,7 +298,7 @@ void SscReader::SyncWritePattern()
}
}
for
(
int
i
=
0
;
i
<
m_
World
Size
;
++
i
)
for
(
int
i
=
0
;
i
<
m_
Stream
Size
;
++
i
)
{
if
(
m_GlobalWritePatternJson
[
i
]
==
nullptr
)
{
...
...
@@ -338,7 +349,7 @@ void SscReader::SyncReadPattern()
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscReader::SyncReadPattern, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscReader::SyncReadPattern, World Rank "
<<
m_
Stream
Rank
<<
", Reader Rank "
<<
m_ReaderRank
<<
std
::
endl
;
}
...
...
@@ -348,18 +359,18 @@ void SscReader::SyncReadPattern()
size_t
localSize
=
localStr
.
size
();
size_t
maxLocalSize
;
MPI_Allreduce
(
&
localSize
,
&
maxLocalSize
,
1
,
MPI_UNSIGNED_LONG_LONG
,
MPI_MAX
,
MPI_COMM_WORLD
);
m_StreamComm
);
std
::
vector
<
char
>
localVec
(
maxLocalSize
,
'\0'
);
std
::
memcpy
(
localVec
.
data
(),
localStr
.
c_str
(),
localStr
.
size
());
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
World
Size
);
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
Stream
Size
);
MPI_Allgather
(
localVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
globalVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
MPI_COMM_WORLD
);
maxLocalSize
,
MPI_CHAR
,
m_StreamComm
);
// deserialize global metadata Json
nlohmann
::
json
globalJson
;
try
{
for
(
size_t
i
=
0
;
i
<
m_
World
Size
;
++
i
)
for
(
size_t
i
=
0
;
i
<
m_
Stream
Size
;
++
i
)
{
if
(
globalVec
[
i
*
maxLocalSize
]
==
'\0'
)
{
...
...
@@ -461,7 +472,7 @@ void SscReader::DoClose(const int transportIndex)
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscReader::DoClose, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscReader::DoClose, World Rank "
<<
m_
Stream
Rank
<<
", Reader Rank "
<<
m_ReaderRank
<<
std
::
endl
;
}
MPI_Win_free
(
&
m_MpiWin
);
...
...
source/adios2/engine/ssc/SscReader.h
View file @
6ec7bebf
...
...
@@ -51,16 +51,15 @@ private:
std
::
vector
<
char
>
m_Buffer
;
MPI_Win
m_MpiWin
;
MPI_Group
m_MpiAllWritersGroup
;
MPI_Comm
m_StreamComm
;
std
::
string
m_MpiMode
=
"TwoSided"
;
int
m_
World
Rank
;
int
m_
World
Size
;
int
m_
Stream
Rank
;
int
m_
Stream
Size
;
int
m_ReaderRank
;
int
m_ReaderSize
;
helper
::
MpiHandshake
m_MpiHandshake
;
std
::
vector
<
int
>
m_AllWriterRanks
;
std
::
vector
<
int
>
m_AllReaderRanks
;
void
SyncMpiPattern
();
void
SyncWritePattern
();
...
...
source/adios2/engine/ssc/SscWriter.cpp
View file @
6ec7bebf
...
...
@@ -26,10 +26,6 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
:
Engine
(
"SscWriter"
,
io
,
name
,
mode
,
std
::
move
(
comm
))
{
TAU_SCOPED_TIMER_FUNC
();
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
m_WorldRank
);
MPI_Comm_size
(
MPI_COMM_WORLD
,
&
m_WorldSize
);
m_WriterRank
=
m_Comm
.
Rank
();
m_WriterSize
=
m_Comm
.
Size
();
ssc
::
GetParameter
(
m_IO
.
m_Parameters
,
"MpiMode"
,
m_MpiMode
);
ssc
::
GetParameter
(
m_IO
.
m_Parameters
,
"Verbose"
,
m_Verbosity
);
...
...
@@ -41,10 +37,16 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
m_MaxStreamsPerApp
);
ssc
::
GetParameter
(
m_IO
.
m_Parameters
,
"OpenTimeoutSecs"
,
m_OpenTimeoutSecs
);
m_GlobalWritePattern
.
resize
(
m_WorldSize
);
m_GlobalReadPattern
.
resize
(
m_WorldSize
);
m_Buffer
.
resize
(
1
);
SyncMpiPattern
();
m_WriterRank
=
m_Comm
.
Rank
();
m_WriterSize
=
m_Comm
.
Size
();
MPI_Comm_rank
(
m_StreamComm
,
&
m_StreamRank
);
MPI_Comm_size
(
m_StreamComm
,
&
m_StreamSize
);
m_GlobalWritePattern
.
resize
(
m_StreamSize
);
m_GlobalReadPattern
.
resize
(
m_StreamSize
);
}
StepStatus
SscWriter
::
BeginStep
(
StepMode
mode
,
const
float
timeoutSeconds
)
...
...
@@ -62,7 +64,7 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscWriter::BeginStep, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscWriter::BeginStep, World Rank "
<<
m_
Stream
Rank
<<
", Writer Rank "
<<
m_WriterRank
<<
", Step "
<<
m_CurrentStep
<<
std
::
endl
;
}
...
...
@@ -124,7 +126,7 @@ void SscWriter::PutTwoSided()
{
requests
.
emplace_back
();
MPI_Isend
(
m_Buffer
.
data
(),
m_Buffer
.
size
(),
MPI_CHAR
,
i
.
first
,
0
,
MPI_COMM_WORLD
,
&
requests
.
back
());
m_StreamComm
,
&
requests
.
back
());
}
MPI_Status
statuses
[
requests
.
size
()];
MPI_Waitall
(
requests
.
size
(),
requests
.
data
(),
statuses
);
...
...
@@ -135,7 +137,7 @@ void SscWriter::EndStep()
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscWriter::EndStep, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscWriter::EndStep, World Rank "
<<
m_
Stream
Rank
<<
", Writer Rank "
<<
m_WriterRank
<<
", Step "
<<
m_CurrentStep
<<
std
::
endl
;
}
...
...
@@ -144,12 +146,12 @@ void SscWriter::EndStep()
{
SyncWritePattern
();
MPI_Win_create
(
m_Buffer
.
data
(),
m_Buffer
.
size
(),
1
,
MPI_INFO_NULL
,
MPI_COMM_WORLD
,
&
m_MpiWin
);
m_StreamComm
,
&
m_MpiWin
);
PutOneSidedPostPull
();
MPI_Win_free
(
&
m_MpiWin
);
SyncReadPattern
();
MPI_Win_create
(
m_Buffer
.
data
(),
m_Buffer
.
size
(),
1
,
MPI_INFO_NULL
,
MPI_COMM_WORLD
,
&
m_MpiWin
);
m_StreamComm
,
&
m_MpiWin
);
}
else
{
...
...
@@ -186,7 +188,7 @@ void SscWriter::SyncMpiPattern()
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscWriter::SyncMpiPattern, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscWriter::SyncMpiPattern, World Rank "
<<
m_
Stream
Rank
<<
", Writer Rank "
<<
m_WriterRank
<<
std
::
endl
;
}
...
...
@@ -194,11 +196,14 @@ void SscWriter::SyncMpiPattern()
m_MaxFilenameLength
,
m_RendezvousAppCount
,
CommAsMPI
(
m_Comm
));
std
::
vector
<
int
>
allWorkerRanks
;
std
::
vector
<
int
>
allReaderRanks
;
for
(
const
auto
&
app
:
m_MpiHandshake
.
GetWriterMap
(
m_Name
))
{
for
(
int
rank
:
app
.
second
)
{
m_AllWrit
erRanks
.
push_back
(
rank
);
allWork
erRanks
.
push_back
(
rank
);
}
}
...
...
@@ -206,14 +211,22 @@ void SscWriter::SyncMpiPattern()
{
for
(
int
rank
:
app
.
second
)
{
m_AllReaderRanks
.
push_back
(
rank
);
allWorkerRanks
.
push_back
(
rank
);
allReaderRanks
.
push_back
(
rank
);
}
}
MPI_Group
worldGroup
;
MPI_Comm_group
(
MPI_COMM_WORLD
,
&
worldGroup
);
MPI_Group_incl
(
worldGroup
,
m_A
llReaderRanks
.
size
(),
m_A
llReaderRanks
.
data
(),
MPI_Group_incl
(
worldGroup
,
a
llReaderRanks
.
size
(),
a
llReaderRanks
.
data
(),
&
m_MpiAllReadersGroup
);
std
::
sort
(
allWorkerRanks
.
begin
(),
allWorkerRanks
.
end
());
MPI_Group
allWorkersGroup
;
MPI_Group_incl
(
worldGroup
,
allWorkerRanks
.
size
(),
allWorkerRanks
.
data
(),
&
allWorkersGroup
);
MPI_Comm_create_group
(
MPI_COMM_WORLD
,
allWorkersGroup
,
0
,
&
m_StreamComm
);
MPI_Barrier
(
m_StreamComm
);
}
void
SscWriter
::
SyncWritePattern
()
...
...
@@ -221,13 +234,13 @@ void SscWriter::SyncWritePattern()
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscWriter::SyncWritePattern, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscWriter::SyncWritePattern, World Rank "
<<
m_
Stream
Rank
<<
", Writer Rank "
<<
m_WriterRank
<<
std
::
endl
;
}
// serialize local writer rank variables metadata
nlohmann
::
json
localRankMetaJ
;
for
(
const
auto
&
b
:
m_GlobalWritePattern
[
m_
World
Rank
])
for
(
const
auto
&
b
:
m_GlobalWritePattern
[
m_
Stream
Rank
])
{
localRankMetaJ
[
"Variables"
].
emplace_back
();
auto
&
jref
=
localRankMetaJ
[
"Variables"
].
back
();
...
...
@@ -278,18 +291,18 @@ void SscWriter::SyncWritePattern()
size_t
localSize
=
localStr
.
size
();
size_t
maxLocalSize
;
MPI_Allreduce
(
&
localSize
,
&
maxLocalSize
,
1
,
MPI_UNSIGNED_LONG_LONG
,
MPI_MAX
,
MPI_COMM_WORLD
);
m_StreamComm
);
std
::
vector
<
char
>
localVec
(
maxLocalSize
,
'\0'
);
std
::
memcpy
(
localVec
.
data
(),
localStr
.
data
(),
localStr
.
size
());
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
World
Size
,
'\0'
);
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
Stream
Size
,
'\0'
);
MPI_Allgather
(
localVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
globalVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
MPI_COMM_WORLD
);
maxLocalSize
,
MPI_CHAR
,
m_StreamComm
);
// deserialize global metadata Json
nlohmann
::
json
globalJson
;
try
{
for
(
size_t
i
=
0
;
i
<
m_
World
Size
;
++
i
)
for
(
size_t
i
=
0
;
i
<
m_
Stream
Size
;
++
i
)
{
if
(
globalVec
[
i
*
maxLocalSize
]
==
'\0'
)
{
...
...
@@ -319,24 +332,24 @@ void SscWriter::SyncReadPattern()
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscWriter::SyncReadPattern, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscWriter::SyncReadPattern, World Rank "
<<
m_
Stream
Rank
<<
", Writer Rank "
<<
m_WriterRank
<<
std
::
endl
;
}
size_t
localSize
=
0
;
size_t
maxLocalSize
;
MPI_Allreduce
(
&
localSize
,
&
maxLocalSize
,
1
,
MPI_UNSIGNED_LONG_LONG
,
MPI_MAX
,
MPI_COMM_WORLD
);
m_StreamComm
);
std
::
vector
<
char
>
localVec
(
maxLocalSize
,
'\0'
);
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
World
Size
);
std
::
vector
<
char
>
globalVec
(
maxLocalSize
*
m_
Stream
Size
);
MPI_Allgather
(
localVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
globalVec
.
data
(),
maxLocalSize
,
MPI_CHAR
,
MPI_COMM_WORLD
);
maxLocalSize
,
MPI_CHAR
,
m_StreamComm
);
// deserialize global metadata Json
nlohmann
::
json
globalJson
;
try
{
for
(
size_t
i
=
0
;
i
<
m_
World
Size
;
++
i
)
for
(
size_t
i
=
0
;
i
<
m_
Stream
Size
;
++
i
)
{
if
(
globalVec
[
i
*
maxLocalSize
]
==
'\0'
)
{
...
...
@@ -359,7 +372,7 @@ void SscWriter::SyncReadPattern()
ssc
::
JsonToBlockVecVec
(
globalJson
,
m_GlobalReadPattern
);
ssc
::
CalculateOverlap
(
m_GlobalReadPattern
,
m_GlobalWritePattern
[
m_
World
Rank
]);
m_GlobalWritePattern
[
m_
Stream
Rank
]);
m_AllSendingReaderRanks
=
ssc
::
AllOverlapRanks
(
m_GlobalReadPattern
);
CalculatePosition
(
m_GlobalWritePattern
,
m_GlobalReadPattern
,
m_WriterRank
,
m_AllSendingReaderRanks
);
...
...
@@ -428,7 +441,7 @@ void SscWriter::DoClose(const int transportIndex)
TAU_SCOPED_TIMER_FUNC
();
if
(
m_Verbosity
>=
5
)
{
std
::
cout
<<
"SscWriter::DoClose, World Rank "
<<
m_
World
Rank
std
::
cout
<<
"SscWriter::DoClose, World Rank "
<<
m_
Stream
Rank
<<
", Writer Rank "
<<
m_WriterRank
<<
std
::
endl
;
}
...
...
@@ -440,7 +453,7 @@ void SscWriter::DoClose(const int transportIndex)
for
(
const
auto
&
i
:
m_AllSendingReaderRanks
)
{
requests
.
emplace_back
();
MPI_Isend
(
m_Buffer
.
data
(),
1
,
MPI_CHAR
,
i
.
first
,
0
,
MPI_COMM_WORLD
,
MPI_Isend
(
m_Buffer
.
data
(),
1
,
MPI_CHAR
,
i
.
first
,
0
,
m_StreamComm
,
&
requests
.
back
());
}
MPI_Status
statuses
[
requests
.
size
()];
...
...
source/adios2/engine/ssc/SscWriter.h
View file @
6ec7bebf
...
...
@@ -52,16 +52,15 @@ private:
std
::
vector
<
char
>
m_Buffer
;
MPI_Win
m_MpiWin
;
MPI_Group
m_MpiAllReadersGroup
;
MPI_Comm
m_StreamComm
;
std
::
string
m_MpiMode
=
"TwoSided"
;
int
m_
World
Rank
;
int
m_
World
Size
;
int
m_
Stream
Rank
;
int
m_
Stream
Size
;
int
m_WriterRank
;
int
m_WriterSize
;
helper
::
MpiHandshake
m_MpiHandshake
;
std
::
vector
<
int
>
m_AllWriterRanks
;
std
::
vector
<
int
>
m_AllReaderRanks
;
void
SyncMpiPattern
();
void
SyncWritePattern
();
...
...
source/adios2/engine/ssc/SscWriter.tcc
View file @
6ec7bebf
...
...
@@ -24,7 +24,7 @@ namespace engine
template <class T>
bool SscWriter::HasBlock(const Variable<T> &variable)
{
for (const auto &b : m_GlobalWritePattern[m_
World
Rank])
for (const auto &b : m_GlobalWritePattern[m_
Stream
Rank])
{
if (b.name == variable.m_Name and
ssc::AreSameDims(variable.m_Start, b.start) and
...
...
@@ -59,8 +59,8 @@ void SscWriter::PutDeferredCommon(Variable<T> &variable, const T *data)
{
if (not HasBlock(variable))
{
m_GlobalWritePattern[m_
World
Rank].emplace_back();
auto &b = m_GlobalWritePattern[m_
World
Rank].back();
m_GlobalWritePattern[m_
Stream
Rank].emplace_back();
auto &b = m_GlobalWritePattern[m_
Stream
Rank].back();
b.name = variable.m_Name;
b.type = helper::GetType<T>();
b.shape = variable.m_Shape;
...
...
@@ -75,7 +75,7 @@ void SscWriter::PutDeferredCommon(Variable<T> &variable, const T *data)
variable.SetData(data);
bool found = false;
for (const auto &b : m_GlobalWritePattern[m_
World
Rank])
for (const auto &b : m_GlobalWritePattern[m_
Stream
Rank])
{
if (b.name == variable.m_Name and
ssc::AreSameDims(variable.m_Start, b.start) and
...
...
source/adios2/helper/adiosMpiHandshake.cpp
View file @
6ec7bebf
...
...
@@ -218,7 +218,8 @@ void MpiHandshake::Handshake(const std::string &filename, const char mode,
if
(
duration
.
count
()
>
timeoutSeconds
)
{
throw
(
std
::
runtime_error
(
"Mpi handshake timeout on Rank"
+
std
::
to_string
(
m_WorldRank
)));
std
::
to_string
(
m_WorldRank
)
+
" for Stream "
+
filename
));
}
}
...
...
testing/adios2/engine/ssc/TestSscXgc3Way.cpp
View file @
6ec7bebf
...
...
@@ -91,12 +91,12 @@ void coupler(const Dims &shape, const Dims &start, const Dims &count,
{
x_to_c_engine
.
BeginStep
();
auto
x_to_c_var
=
x_to_c_io
.
InquireVariable
<
float
>
(
"x_to_c"
);
x_to_c_data
.
resize
(
std
::
accumulate
(
x_to_c_var
.
Shape
()
.
begin
(),
x_to_c_var
.
Shape
()
.
end
(),
1
,
std
::
multiplies
<
size_t
>
()));
auto
readShape
=
x_to_c_var
.
Shape
()
;
x_to_c_data
.
resize
(
std
::
accumulate
(
readShape
.
begin
(),
readShape
.
end
(),
1
,
std
::
multiplies
<
size_t
>
()));
x_to_c_engine
.
Get
(
x_to_c_var
,
x_to_c_data
.
data
(),
adios2
::
Mode
::
Sync
);
VerifyData
(
x_to_c_data
.
data
(),
i
,
Dims
(
x_to_c_var
.
Shape
()
.
size
(),
0
),
x_to_c_var
.
Shape
(),
x_to_c_var
.
Shape
()
,
mpiRank
);
VerifyData
(
x_to_c_data
.
data
(),
i
,
Dims
(
read
Shape
.
size
(),
0
),
readShape
,
read
Shape
,
mpiRank
);
x_to_c_engine
.
EndStep
();
c_to_g_engine
.
BeginStep
();
...
...
@@ -106,12 +106,12 @@ void coupler(const Dims &shape, const Dims &start, const Dims &count,
g_to_c_engine
.
BeginStep
();
auto
g_to_c_var
=
g_to_c_io
.
InquireVariable
<
float
>
(
"g_to_c"
);
g_to_c_data
.
resize
(
std
::
accumulate
(
g_to_c_var
.
Shape
()
.
begin
(),
g_to_c_var
.
Shape
()
.
end
(),
1
,
std
::
multiplies
<
size_t
>
()));
readShape
=
g_to_c_var
.
Shape
()
;
g_to_c_data
.
resize
(
std
::
accumulate
(
readShape
.
begin
(),
readShape
.
end
(),
1
,
std
::
multiplies
<
size_t
>
()));
g_to_c_engine
.
Get
(
g_to_c_var
,
g_to_c_data
.
data
(),
adios2
::
Mode
::
Sync
);
VerifyData
(
g_to_c_data
.
data
(),
i
,
Dims
(
g_to_c_var
.
Shape
()
.
size
(),
0
),
g_to_c_var
.
Shape
(),
g_to_c_var
.
Shape
()
,
mpiRank
);
VerifyData
(
g_to_c_data
.
data
(),
i
,
Dims
(
read
Shape
.
size
(),
0
),
readShape
,
read
Shape
,
mpiRank
);
g_to_c_engine
.
EndStep
();
c_to_x_engine
.
BeginStep
();
...
...
@@ -244,16 +244,16 @@ TEST_F(SscEngineTest, TestSscXgc3Way)
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
worldRank
);
MPI_Comm_size
(
MPI_COMM_WORLD
,
&
worldSize
);
if
(
worldSize
<
6
)
if
(
worldSize
<
4
)
{
return
;
}
if
(
worldRank
==
0
or
worldRank
==
1
)
if
(
worldRank
==
0
)
{
mpiGroup
=
0
;
}
else
if
(
worldRank
==
2
or
worldRank
==
3
)
else
if
(
worldRank
==
1
)
{
mpiGroup
=
1
;
}
...
...
@@ -276,7 +276,8 @@ TEST_F(SscEngineTest, TestSscXgc3Way)
count
=
{
1
,
10
};
adios2
::
Params
engineParams
=
{{
"RendezvousAppCount"
,
"2"
},
{
"MaxStreamsPerApp"
,
"4"
},
{
"OpenTimeoutSecs"
,
"3"
}};
{
"OpenTimeoutSecs"
,
"3"
},
{
"Verbose"
,
"10"
}};
std
::
cout
<<
"Rank "
<<
worldRank
<<
" launched coupler"
<<
std
::
endl
;
coupler
(
shape
,
start
,
count
,
steps
,
engineParams
);
}
...
...
@@ -288,7 +289,8 @@ TEST_F(SscEngineTest, TestSscXgc3Way)
count
=
{
1
,
10
};
adios2
::
Params
engineParams
=
{{
"RendezvousAppCount"
,
"2"
},
{
"MaxStreamsPerApp"
,
"4"
},
{
"OpenTimeoutSecs"
,
"3"
}};
{
"OpenTimeoutSecs"
,
"3"
},
{
"Verbose"
,
"10"
}};
std
::
cout
<<
"Rank "
<<
worldRank
<<
" launched gene"
<<
std
::
endl
;
gene
(
shape
,
start
,
shape
,
steps
,
engineParams
);
}
...
...
@@ -300,7 +302,8 @@ TEST_F(SscEngineTest, TestSscXgc3Way)
count
=
{
1
,
10
};
adios2
::
Params
engineParams
=
{{
"RendezvousAppCount"
,
"2"
},
{
"MaxStreamsPerApp"
,
"4"
},
{
"OpenTimeoutSecs"
,
"3"
}};
{
"OpenTimeoutSecs"
,
"3"
},
{
"Verbose"
,
"10"
}};
std
::
cout
<<
"Rank "
<<
worldRank
<<
" launched xgc"
<<
std
::
endl
;
xgc
(
shape
,
start
,
count
,
steps
,
engineParams
);
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment