Skip to content
Snippets Groups Projects
DataManWriter.cpp 11.2 KiB
Newer Older
/*
 * DataMan.cpp
 *
 *  Created on: Jan 10, 2017
 *      Author: wfg
 */

#include <iostream> //needs to go away, this is just for demo purposes

#include "engine/dataman/DataManWriter.h"

#include "core/Support.h"
#include "functions/adiosFunctions.h" //CSVToVector

//supported transports
#include "transport/file/FD.h" // uses POSIX
#include "transport/file/FP.h" // uses C FILE*
#include "transport/file/FStream.h" // uses C++ fstream
#include "transport/wan/MdtmMan.h" //uses Mdtm library


namespace adios
{


DataManWriter::DataManWriter( ADIOS& adios, const std::string name, const std::string accessMode, MPI_Comm mpiComm,
                              const Method& method, const bool debugMode, const unsigned int cores ):
    Engine( adios, "DataManWriter", name, accessMode, mpiComm, method, debugMode, cores, " DataManWriter constructor (or call to ADIOS Open).\n" ),
    m_Buffer( accessMode, m_RankMPI, m_DebugMode )
{
    Init( );
}


DataManWriter::~DataManWriter( )
{ }

void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback )
    m_CallBack = callback;
    m_Man.reg_callback(callback);
}

void DataManWriter::Write( Variable<char>& variable, const char* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<unsigned char>& variable, const unsigned char* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<short>& variable, const short* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<unsigned short>& variable, const unsigned short* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<int>& variable, const int* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<unsigned int>& variable, const unsigned int* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<long int>& variable, const long int* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<unsigned long int>& variable, const unsigned long int* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<long long int>& variable, const long long int* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<unsigned long long int>& variable, const unsigned long long int* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<float>& variable, const float* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<double>& variable, const double* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<long double>& variable, const long double* values )
{ WriteVariableCommon( variable, values ); }
void DataManWriter::Write( Variable<std::complex<float>>& variable, const std::complex<float>* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<std::complex<double>>& variable, const std::complex<double>* values )
{ WriteVariableCommon( variable, values ); }

void DataManWriter::Write( Variable<std::complex<long double>>& variable, const std::complex<long double>* values )
{ WriteVariableCommon( variable, values ); }

//String version
void DataManWriter::Write( const std::string variableName, const char* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<char>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const unsigned char* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<unsigned char>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const short* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<short>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const unsigned short* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<unsigned short>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const int* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<int>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const unsigned int* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<unsigned int>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const long int* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<long int>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const unsigned long int* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<unsigned long int>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const long long int* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<long long int>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const unsigned long long int* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<unsigned long long int>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const float* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<float>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const double* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<double>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const long double* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<long double>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const std::complex<float>* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<std::complex<float>>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const std::complex<double>* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<std::complex<double>>( variableName ), values ); }

void DataManWriter::Write( const std::string variableName, const std::complex<long double>* values )
{ WriteVariableCommon( m_ADIOS.GetVariable<std::complex<long double>>( variableName ), values ); }


void DataManWriter::Close( const int transportIndex )
{
    //here close IPs and deallocate or free/close resources (if using STL no need for memory deallocation)
}

//PRIVATE functions below
void DataManWriter::Init( )
{
    if( m_DebugMode == true )
    {
        if( m_AccessMode != "w" && m_AccessMode != "write" && m_AccessMode != "a" && m_AccessMode != "append" )
            throw std::invalid_argument( "ERROR: DataManWriter doesn't support access mode " + m_AccessMode +
                                         ", in call to ADIOS Open or DataManWriter constructor\n"  );
    }

    auto itRealTime = m_Method.m_Parameters.find( "real_time" );
    if( itRealTime != m_Method.m_Parameters.end() )
    {
        if( itRealTime->second == "yes" || itRealTime->second == "true" )
            m_DoRealTime = true;
    }

    if(m_DoRealTime)
    {
        /**
         * Lambda function that assigns a parameter in m_Method to a localVariable of type std::string
         */
        auto lf_AssignString = [this]( const std::string parameter, std::string& localVariable )
        {
            auto it = m_Method.m_Parameters.find( parameter );
            if( it != m_Method.m_Parameters.end() )
            {
                localVariable = it->second;
            }
        };

        /**
         * Lambda function that assigns a parameter in m_Method to a localVariable of type int
         */
        auto lf_AssignInt = [this]( const std::string parameter, int& localVariable )
        {
            auto it = m_Method.m_Parameters.find( parameter );
            if( it != m_Method.m_Parameters.end() )
            {
                localVariable = std::stoi( it->second );
            }
        };

        std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default)
        int local_port=0, remote_port=0, num_channels=0;

        lf_AssignString( "method_type", method_type );
        if( method_type == "stream" )
        {
            lf_AssignString( "method", method );
            lf_AssignString( "local_ip", local_ip );
            lf_AssignString( "remote_ip", remote_ip );
            lf_AssignInt( "local_port", local_port );
            lf_AssignInt( "remote_port", remote_port );
            lf_AssignInt( "num_channels", num_channels );

            json jmsg;
            jmsg["method"] = method;
            jmsg["local_ip"] = local_ip;
            jmsg["remote_ip"] = remote_ip;
            jmsg["local_port"] = local_port;
            jmsg["remote_port"] = remote_port;
            jmsg["num_channels"] = num_channels;
            jmsg["stream_mode"] = "sender";
        }
    }
    else
    {
        InitCapsules( );
        InitTransports( );
    }

}

void DataManWriter::InitCapsules( )
{
    //here init memory capsules
}


void DataManWriter::InitTransports( ) //maybe move this?
{
    TransportNamesUniqueness( );

    for( const auto& parameters : m_Method.m_TransportParameters )
    {
        auto itTransport = parameters.find( "transport" );

        if( itTransport->second == "Mdtm" || itTransport->second == "MdtmMan" )
        {
            const std::string localIP( GetMdtmParameter( "localIP", parameters ) ); //mandatory
            const std::string remoteIP( GetMdtmParameter( "remoteIP", parameters ) ); //mandatory
            const std::string prefix( GetMdtmParameter( "prefix", parameters ) );
            const int numberOfPipes = std::stoi( GetMdtmParameter( "pipes", parameters ) );
            const std::vector<int> tolerances = CSVToVectorInt( GetMdtmParameter( "tolerances", parameters ) );
            const std::vector<int> priorities = CSVToVectorInt( GetMdtmParameter( "priorities", parameters ) );

            m_Transports.push_back( std::make_shared<transport::MdtmMan>( localIP, remoteIP, m_AccessMode, prefix, numberOfPipes,
                                                                          tolerances, priorities, m_MPIComm, m_DebugMode ) );
        }
        else if( itTransport->second == "Zmq" )
        {

        }
        else
        {
            if( m_DebugMode == true )
                throw std::invalid_argument( "ERROR: transport + " + itTransport->second + " not supported, in " +
                                              m_Name + m_EndMessage );
        }
    }
}


std::string DataManWriter::GetMdtmParameter( const std::string parameter, const std::map<std::string,std::string>& mdtmParameters )
{
    auto itParam = mdtmParameters.find( parameter );
    if( itParam != mdtmParameters.end() ) //found
    {
        return itParam->second; //return value
    }
    // if not found
    //mandatory ones
    if( parameter == "localIP" || parameter == "remoteIP" )
    {
        if( m_DebugMode == true )
            throw std::invalid_argument( "ERROR: " + parameter + " parameter not found in Method, in call to DataManWriter constructor\n" );
    }
    else if( parameter == "prefix" )
    {
        return "";
    }
    else if( parameter == "pipes" )
    {
        return "0"; // or 1?
    }
    else if( parameter == "tolerances" ) //so far empty string
    {

    }
    else if( parameter == "priority" )
    {

    }

    return ""; //return empty string
}


} //end namespace adios