Unverified Commit d2039250 authored by Jason Wang's avatar Jason Wang Committed by GitHub
Browse files

Merge pull request #3138 from JasonRuonanWang/sz

added mutex in sz operator
parents 9db9e29e 1c89fe70
Loading
Loading
Loading
Loading
+21 −10
Original line number Diff line number Diff line
@@ -25,6 +25,8 @@ namespace core
namespace compress
{

std::mutex CompressSZ::m_Mutex;

CompressSZ::CompressSZ(const Params &parameters)
: Operator("sz", COMPRESS_SZ, "compress", parameters)
{
@@ -241,15 +243,6 @@ size_t CompressSZ::Operate(const char *dataIn, const Dims &blockStart,
        }
    }

    if (use_configfile)
    {
        SZ_Init(sz_configfile.c_str());
    }
    else
    {
        SZ_Init_Params(&sz);
    }

    // Get type info
    int dtype = -1;
    if (varType == helper::GetDataType<double>() ||
@@ -272,9 +265,22 @@ size_t CompressSZ::Operate(const char *dataIn, const Dims &blockStart,
    convertedDims = ConvertDims(blockCount, varType, 5, true, 0);

    size_t szBufferSize;

    m_Mutex.lock();
    if (use_configfile)
    {
        SZ_Init(sz_configfile.c_str());
    }
    else
    {
        SZ_Init_Params(&sz);
    }
    auto *szBuffer = SZ_compress(
        dtype, const_cast<char *>(dataIn), &szBufferSize, convertedDims[0],
        convertedDims[1], convertedDims[2], convertedDims[3], convertedDims[4]);
    SZ_Finalize();
    m_Mutex.unlock();

    if (bufferOutOffset + szBufferSize >
        helper::GetTotalSize(blockCount, helper::GetDataTypeSize(varType)))
    {
@@ -291,7 +297,6 @@ size_t CompressSZ::Operate(const char *dataIn, const Dims &blockStart,
    {
        free(szBuffer);
    }
    SZ_Finalize();
    return bufferOutOffset;
}

@@ -391,12 +396,15 @@ size_t CompressSZ::DecompressV1(const char *bufferIn, const size_t sizeIn,
    const size_t dataSizeBytes =
        helper::GetTotalSize(convertedDims, dataTypeSize);

    m_Mutex.lock();
    void *result =
        SZ_decompress(dtype,
                      reinterpret_cast<unsigned char *>(
                          const_cast<char *>(bufferIn + bufferInOffset)),
                      sizeIn - bufferInOffset, 0, convertedDims[0],
                      convertedDims[1], convertedDims[2], convertedDims[3]);
    SZ_Finalize();
    m_Mutex.unlock();

    if (result == nullptr)
    {
@@ -462,12 +470,15 @@ size_t CompressSZ::DecompressV2(const char *bufferIn, const size_t sizeIn,
    Dims convertedDims =
        ConvertDims(blockCount, helper::GetDataType<float>(), 5, true, 0);

    m_Mutex.lock();
    void *result = SZ_decompress(
        dtype,
        reinterpret_cast<unsigned char *>(
            const_cast<char *>(bufferIn + bufferInOffset)),
        sizeIn - bufferInOffset, convertedDims[0], convertedDims[1],
        convertedDims[2], convertedDims[3], convertedDims[4]);
    SZ_Finalize();
    m_Mutex.unlock();

    if (result == nullptr)
    {
+2 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@
#define ADIOS2_OPERATOR_COMPRESS_COMPRESSSZ_H_

#include "adios2/core/Operator.h"
#include <mutex>

namespace adios2
{
@@ -80,6 +81,7 @@ private:
                        char *dataOut);

    std::string m_VersionInfo;
    static std::mutex m_Mutex;
};

} // end namespace compress
+1 −1
Original line number Diff line number Diff line
@@ -311,7 +311,7 @@ TEST_F(DataManEngineTest, 2D_Sz)
    Dims start = {0, 0};
    Dims count = {10, 10};

    size_t steps = 100;
    size_t steps = 5000;
    adios2::Params engineParams = {
        {"IPAddress", "127.0.0.1"}, {"Port", "12330"}, {"Verbose", "0"}};