Commit 0ad7de03 authored by Eisenhauer, Greg's avatar Eisenhauer, Greg
Browse files

Support for attribute marshaling with FFS

parent 4d83f1bb
Loading
Loading
Loading
Loading
+47 −2
Original line number Diff line number Diff line
@@ -73,6 +73,51 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
        return (void *)NULL;
    };

    auto attrFFSCallback = [](void *reader, const char *attrName,
                              const char *type, void *data) {
        class SstReader::SstReader *Reader =
            reinterpret_cast<class SstReader::SstReader *>(reader);
        if (attrName == NULL)
        {
            // if attrName is NULL, prepare for attr reinstallation
            Reader->m_IO.RemoveAllAttributes();
            return;
        }
        std::string Type(type);
        try
        {
            if (Type == "compound")
            {
                return;
            }
            else if (Type == "string")
            {
                Reader->m_IO.DefineAttribute<std::string>(attrName,
                                                          *(char **)data);
            }
#define declare_type(T)                                                        \
    else if (Type == helper::GetType<T>())                                     \
    {                                                                          \
        std::cout << "Loading attribute matched type " << Type << std::endl;   \
        Reader->m_IO.DefineAttribute<T>(attrName, *(T *)data);                 \
    }

            ADIOS2_FOREACH_ATTRIBUTE_PRIMITIVE_TYPE_1ARG(declare_type)
#undef declare_type
            else
            {
                std::cout << "Loading attribute matched no type " << Type
                          << std::endl;
            }
        }
        catch (...)
        {
            std::cout << "Load failed" << std::endl;
            return;
        }
        return;
    };

    auto arrayFFSCallback = [](void *reader, const char *variableName,
                               const char *type, int DimCount, size_t *Shape,
                               size_t *Start, size_t *Count) {
@@ -109,7 +154,8 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
        return (void *)NULL;
    };

    SstReaderInitFFSCallback(m_Input, this, varFFSCallback, arrayFFSCallback);
    SstReaderInitFFSCallback(m_Input, this, varFFSCallback, arrayFFSCallback,
                             attrFFSCallback);

    delete[] cstr;
}
@@ -202,7 +248,6 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec)
                    (*m_CurrentStepMetaData->WriterMetadata)->DataSize);

        m_IO.RemoveAllVariables();
        m_IO.RemoveAllAttributes();
        m_BP3Deserializer->ParseMetadata(m_BP3Deserializer->m_Metadata, m_IO);
        m_IO.ResetVariablesStepSelection(true,
                                         "in call to SST Reader BeginStep");
+58 −1
Original line number Diff line number Diff line
@@ -62,11 +62,67 @@ StepStatus SstWriter::BeginStep(StepMode mode, const float timeout_sec)
    return StepStatus::OK;
}

void SstWriter::FFSMarshalAttributes()
{
    const auto attributesDataMap = m_IO.GetAttributesDataMap();

    const uint32_t attributesCount =
        static_cast<uint32_t>(attributesDataMap.size());

    // if there are no new attributes, nothing to do
    if (attributesCount == m_FFSMarshaledAttributesCount)
        return;

    for (const auto &attributePair : attributesDataMap)
    {
        const std::string name(attributePair.first);
        const std::string type(attributePair.second.first);

        if (type == "unknown")
        {
        }
        else if (type == "string")
        {
            core::Attribute<std::string> &attribute =
                *m_IO.InquireAttribute<std::string>(name);
            int element_count = -1;
            const char *data_addr = attribute.m_DataSingleValue.c_str();
            if (!attribute.m_IsSingleValue)
            {
                //
            }

            SstFFSMarshalAttribute(m_Output, name.c_str(), type.c_str(),
                                   sizeof(char *), element_count, data_addr);
        }
#define declare_type(T)                                                        \
    else if (type == helper::GetType<T>())                                     \
    {                                                                          \
        core::Attribute<T> &attribute = *m_IO.InquireAttribute<T>(name);       \
        int element_count = -1;                                                \
        void *data_addr = &attribute.m_DataSingleValue;                        \
        if (!attribute.m_IsSingleValue)                                        \
        {                                                                      \
            element_count = attribute.m_Elements;                              \
            data_addr = attribute.m_DataArray.data();                          \
        }                                                                      \
        SstFFSMarshalAttribute(m_Output, attribute.m_Name.c_str(),             \
                               type.c_str(), sizeof(T), element_count,         \
                               data_addr);                                     \
    }

        ADIOS2_FOREACH_ATTRIBUTE_PRIMITIVE_TYPE_1ARG(declare_type)
#undef declare_type
    }
}

void SstWriter::EndStep()
{
    m_BetweenStepPairs = false;
    if (m_MarshalMethod == SstMarshalFFS)
    {
    SstWriter:
        FFSMarshalAttributes();
        SstFFSWriterEndStep(m_Output, m_WriterStep);
    }
    else if (m_MarshalMethod == SstMarshalBP)
@@ -102,7 +158,8 @@ void SstWriter::EndStep()
        newblock->data.block = m_BP3Serializer->m_Data.m_Buffer.data();
        newblock->serializer = m_BP3Serializer;
        SstProvideTimestep(m_Output, &newblock->metadata, &newblock->data,
                           m_WriterStep, lf_FreeBlocks, newblock);
                           m_WriterStep, lf_FreeBlocks, newblock, NULL, NULL,
                           NULL);
    }
    else
    {
+2 −0
Original line number Diff line number Diff line
@@ -68,12 +68,14 @@ private:
    SstStream m_Output;
    long m_WriterStep = -1;
    bool m_BetweenStepPairs = false;
    size_t m_FFSMarshaledAttributesCount = 0;
    struct _SstParams Params;
#define declare_locals(Param, Type, Typedecl, Default)                         \
    Typedecl m_##Param = Default;
    SST_FOREACH_PARAMETER_TYPE_4ARGS(declare_locals)
#undef declare_locals

    void FFSMarshalAttributes();
    void DoClose(const int transportIndex = -1) final;
};

+4 −2
Original line number Diff line number Diff line
@@ -250,10 +250,10 @@ static FMStructDescRec CP_WriterResponseStructs[] = {
    {NULL, NULL, 0, NULL}};

static FMField MetaDataPlusDPInfoList[] = {
    {"RequestGlobalOp", "integer", sizeof(int),
     FMOffset(struct _MetadataPlusDPInfo *, RequestGlobalOp)},
    {"Metadata", "*SstBlock", sizeof(struct _SstBlock),
     FMOffset(struct _MetadataPlusDPInfo *, Metadata)},
    {"AttributeData", "*SstBlock", sizeof(struct _SstBlock),
     FMOffset(struct _MetadataPlusDPInfo *, AttributeData)},
    {"Formats", "*FFSFormatBlock", sizeof(struct FFSFormatBlock),
     FMOffset(struct _MetadataPlusDPInfo *, Formats)},
    {"DP_TimestepInfo", "*DP_STRUCT", 0,
@@ -297,6 +297,8 @@ static FMField TimestepMetadataList[] = {
     FMOffset(struct _TimestepMetadataMsg *, Formats)},
    {"metadata", "(*SstBlock)[cohort_size]", sizeof(struct _SstBlock),
     FMOffset(struct _TimestepMetadataMsg *, Metadata)},
    {"attribute_data", "(*SstBlock)[cohort_size]", sizeof(struct _SstBlock),
     FMOffset(struct _TimestepMetadataMsg *, AttributeData)},
    {"TP_TimestepInfo", "(*DP_STRUCT)[cohort_size]", 0,
     FMOffset(struct _TimestepMetadataMsg *, DP_TimestepInfo)},
    {NULL, NULL, 0, 0}};
+8 −6
Original line number Diff line number Diff line
@@ -170,6 +170,7 @@ struct _SstStream
    FFSContext ReaderFFSContext;
    VarSetupUpcallFunc VarSetupUpcall;
    ArraySetupUpcallFunc ArraySetupUpcall;
    AttrSetupUpcallFunc AttrSetupUpcall;
    void *SetupUpcallReader;
    void *ReaderMarshalData;

@@ -220,8 +221,8 @@ struct FFSFormatBlock
 */
struct _MetadataPlusDPInfo
{
    int RequestGlobalOp;
    SstData Metadata;
    SstData AttributeData;
    FFSFormatList Formats;
    void *DP_TimestepInfo;
};
@@ -296,6 +297,7 @@ typedef struct _TimestepMetadataMsg
    int CohortSize;
    FFSFormatList Formats;
    SstData *Metadata;
    SstData *AttributeData;
    void **DP_TimestepInfo;
} * TSMetadataMsg;

@@ -353,11 +355,11 @@ void CP_validateParams(SstStream stream, SstParams Params, int Writer);
extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo);
extern char *CP_GetContactString(SstStream s);
extern SstStream CP_newStream();
extern void SstInternalProvideTimestep(SstStream s, SstData LocalMetadata,
                                       SstData Data, long Timestep,
                                       FFSFormatList Formats,
                                       DataFreeFunc FreeTimestep,
                                       void *FreeClientData);
extern void SstInternalProvideTimestep(
    SstStream s, SstData LocalMetadata, SstData Data, long Timestep,
    FFSFormatList Formats, DataFreeFunc FreeTimestep, void *FreeClientData,
    SstData AttributeData, DataFreeFunc FreeAttributeData,
    void *FreeAttributeClientData);

void **CP_consolidateDataToRankZero(SstStream stream, void *local_info,
                                    FFSTypeHandle type, void **ret_data_block);
Loading