Commit 832e7a1a authored by Simon Spannagel's avatar Simon Spannagel
Browse files

DatabaseWriter: use proper transaction isolation

parent 4b08af08
Loading
Loading
Loading
Loading
+164 −147
Original line number Diff line number Diff line
@@ -27,7 +27,6 @@
using namespace allpix;

thread_local std::shared_ptr<pqxx::connection> DatabaseWriterModule::conn_ = nullptr;
thread_local std::shared_ptr<pqxx::nontransaction> DatabaseWriterModule::W_ = nullptr;

DatabaseWriterModule::DatabaseWriterModule(Configuration& config, Messenger* messenger, GeometryManager*)
    : SequentialModule(config), messenger_(messenger) {
@@ -109,11 +108,19 @@ void DatabaseWriterModule::initializeThread() {
    }

    prepare_statements(conn_);
    W_ = std::make_shared<pqxx::nontransaction>(*conn_);

    // inserting run entry in the database
    pqxx::result runR = W_->exec_prepared("add_run", run_id_);
    // Inserting run entry in the database
    try {
        // Open new transaction
        pqxx::work transaction(*conn_);
        pqxx::result runR = transaction.exec_prepared("add_run", run_id_);
        run_nr_ = atoi(runR[0][0].c_str());
        // Commit transaction to database:
        transaction.commit();

    } catch(const std::exception& e) {
        throw ModuleError("SQL error: " + std::string(e.what()));
    }

    // Read include and exclude list
    if(config_.has("include") && config_.has("exclude")) {
@@ -194,11 +201,13 @@ void DatabaseWriterModule::run(Event* event) {
    std::optional<int> pixelcharge_nr;

    LOG(TRACE) << "Writing new objects to database";

    std::stringstream insertionLine;
    try {
        // Open new transaction
        pqxx::work transaction(*conn_);
        LOG(TRACE) << "Started new database transaction";

        // Writing entry to event table
    auto insertionResult = W_->exec_prepared("add_event", run_nr_, event->number);
        auto insertionResult = transaction.exec_prepared("add_event", run_nr_, event->number);
        int event_nr = atoi(insertionResult[0][0].c_str());

        // Looping through messages
@@ -226,7 +235,7 @@ void DatabaseWriterModule::run(Event* event) {
                if(class_name == "PixelHit") {
                    LOG(TRACE) << "inserting PixelHit" << std::endl;
                    auto hit = static_cast<PixelHit&>(current_object);
                insertionResult = W_->exec_prepared("add_pixelhit",
                    insertionResult = transaction.exec_prepared("add_pixelhit",
                                                                run_nr_,
                                                                event_nr,
                                                                mcparticle_nr,
@@ -239,7 +248,7 @@ void DatabaseWriterModule::run(Event* event) {
                } else if(class_name == "PixelCharge") {
                    LOG(TRACE) << "inserting PixelCharge" << std::endl;
                    auto charge = static_cast<PixelCharge&>(current_object);
                insertionResult = W_->exec_prepared("add_pixelcharge",
                    insertionResult = transaction.exec_prepared("add_pixelcharge",
                                                                run_nr_,
                                                                event_nr,
                                                                propagatedcharge_nr,
@@ -252,10 +261,11 @@ void DatabaseWriterModule::run(Event* event) {
                                                                charge.getPixel().getGlobalCenter().X(),
                                                                charge.getPixel().getGlobalCenter().Y());
                    pixelcharge_nr = atoi(insertionResult[0][0].c_str());
            } else if(class_name == "PropagatedCharge") { // not recommended, this will slow down the simulation considerably
                } else if(class_name ==
                          "PropagatedCharge") { // not recommended, this will slow down the simulation considerably
                    LOG(TRACE) << "inserting PropagatedCharge" << std::endl;
                    PropagatedCharge charge = static_cast<PropagatedCharge&>(current_object);
                insertionResult = W_->exec_prepared("add_propagatedcharge",
                    insertionResult = transaction.exec_prepared("add_propagatedcharge",
                                                                run_nr_,
                                                                event_nr,
                                                                depositedcharge_nr,
@@ -272,7 +282,7 @@ void DatabaseWriterModule::run(Event* event) {
                } else if(class_name == "MCTrack") {
                    LOG(TRACE) << "inserting MCTrack" << std::endl;
                    auto track = static_cast<MCTrack&>(current_object);
                insertionResult = W_->exec_prepared("add_mctrack",
                    insertionResult = transaction.exec_prepared("add_mctrack",
                                                                run_nr_,
                                                                event_nr,
                                                                detectorName,
@@ -293,7 +303,7 @@ void DatabaseWriterModule::run(Event* event) {
                } else if(class_name == "DepositedCharge") {
                    LOG(TRACE) << "inserting DepositedCharge" << std::endl;
                    auto charge = static_cast<DepositedCharge&>(current_object);
                insertionResult = W_->exec_prepared("add_depositedcharge",
                    insertionResult = transaction.exec_prepared("add_depositedcharge",
                                                                run_nr_,
                                                                event_nr,
                                                                mcparticle_nr,
@@ -310,7 +320,7 @@ void DatabaseWriterModule::run(Event* event) {
                } else if(class_name == "MCParticle") {
                    LOG(TRACE) << "inserting MCParticle" << std::endl;
                    auto particle = static_cast<MCParticle&>(current_object);
                insertionResult = W_->exec_prepared("add_mcparticle",
                    insertionResult = transaction.exec_prepared("add_mcparticle",
                                                                run_nr_,
                                                                event_nr,
                                                                mctrack_nr,
@@ -340,6 +350,13 @@ void DatabaseWriterModule::run(Event* event) {
            }
            msg_cnt_++;
        }

        // Commit transaction to database:
        transaction.commit();
        LOG(TRACE) << "Database transaction completed";
    } catch(const std::exception& e) {
        throw ModuleError("SQL error: " + std::string(e.what()));
    }
}

void DatabaseWriterModule::finalizeThread() {
+0 −1
Original line number Diff line number Diff line
@@ -85,7 +85,6 @@ namespace allpix {

        // postgreSQL objects
        static thread_local std::shared_ptr<pqxx::connection> conn_;
        static thread_local std::shared_ptr<pqxx::nontransaction> W_;
        std::string host_;
        std::string port_;
        std::string database_name_;