Commit ac2404c7 authored by atj's avatar atj
Browse files

Better error handling in queue

parent befc391a
Loading
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ public:

    // Attempt to process the queue after an event that adds/removes builders or requests
    void tick(asio::yield_context yield);

private:
    asio::io_service &io_service;

+22 −6
Original line number Diff line number Diff line
@@ -9,7 +9,13 @@ Reservation &BuilderQueue::enter() {

void BuilderQueue::tick(asio::yield_context yield) {
    // Update list of "Active" OpenStack builders
    auto all_builders = OpenStackBuilder::get_builders(io_service, yield);
    boost::system::error_code ec;
    auto all_builders = OpenStackBuilder::get_builders(io_service, yield[ec]);
    if (ec) {
        logger::write("Error calling get_builders" + ec.message());
        return;
    }

    std::set<Builder> unavailable_builders;

    // Delete reservations that are completed
@@ -28,8 +34,13 @@ void BuilderQueue::tick(asio::yield_context yield) {
                reservation.set_enter_cleanup();
                asio::spawn(io_service,
                            [this, &reservation](asio::yield_context destroy_yield) {
                                OpenStackBuilder::destroy(reservation.builder.get(), io_service, destroy_yield);
                                boost::system::error_code ec;
                                OpenStackBuilder::destroy(reservation.builder.get(), io_service, destroy_yield[ec]);
                                if (ec) {
                                    logger::write("Error destryoing builder " + ec.message());
                                } else {
                                    reservation.set_finalize();
                                }
                            });
            }
        }
@@ -59,8 +70,13 @@ void BuilderQueue::tick(asio::yield_context yield) {
        pending_requests++;
        asio::spawn(io_service,
                    [this](asio::yield_context request_yield) {
                        OpenStackBuilder::request_create(io_service, request_yield);
                        boost::system::error_code ec;
                        OpenStackBuilder::request_create(io_service, request_yield[ec]);
                        if (ec) {
                            logger::write("Error requesting builder create " + ec.message());
                        } else {
                            pending_requests--;
                        }
                    });
    }
}
 No newline at end of file
+42 −30
Original line number Diff line number Diff line
@@ -4,44 +4,56 @@

// Handle the initial request
void Connection::begin() {
    try {
        auto self(shared_from_this());
        asio::spawn(socket.get_io_service(),
                    [this, self](asio::yield_context yield) {
                    try {
                        Messenger messenger(socket);
                        auto request = messenger.async_receive(yield, MessageType::string);

                        if (request == "checkout_builder_request")
                            checkout_builder(yield);
                        else
                            throw std::system_error(EPERM, std::system_category(), request + " not supported");
                    }
                    catch (std::exception &e) {
                        logger::write(socket, std::string() + "Connection initial request error " + e.what());
                        boost::system::error_code ec;
                        auto request = messenger.async_receive(yield[ec], MessageType::string);
                        if (ec) {
                            logger::write(socket, "Request failure" + ec.message());
                        } else if (request == "checkout_builder_request") {
                            checkout_builder(yield[ec]);
                        } else {
                            logger::write(socket, "Invalid request message received: " + request);
                        }
                    });
    } catch(...) {
        logger::write(socket, "Unknown connection exception caught");
    }
}

// Handle a client builder request
void Connection::checkout_builder(asio::yield_context yield) {
    try {
    logger::write(socket, "Checkout resource request");
    boost::system::error_code ec;

    Messenger messenger(socket);

        logger::write(socket, "Requesting resource from the queue");
    // Request a builder
    logger::write(socket, "Requesting builder from the queue");
    ReservationRequest reservation(queue);
        auto builder = reservation.async_wait(yield);
    Builder builder = reservation.async_wait(yield[ec]);
    if (ec) {
        logger::write("reservation builder request failed: " + ec.message());
        return;
    }

        messenger.async_send(builder, yield);
        logger::write(socket, "sent builder: " + builder.id + "(" + builder.host + ")");
    // Send the fulfilled builder
    messenger.async_send(builder, yield[ec]);
    if (ec) {
        logger::write(socket, "Error sending builder: " + builder.id + "(" + builder.host + ")");
        return;
    }

        auto complete = messenger.async_receive(yield, MessageType::string);
        if (complete == "checkout_builder_complete")
            logger::write(socket, "Client completed");
        else
            logger::write(socket, "Client complete error: " + complete);
    } catch (std::exception &e) {
        logger::write(socket, "Exception: checkout_builder disconnect:" + std::string(e.what()));
    // Wait on connection to finish
    logger::write(socket, "Sent builder: " + builder.id + "(" + builder.host + ")");
    std::string complete = messenger.async_receive(yield[ec], MessageType::string);
    if (ec || complete != "checkout_builder_complete") {
        logger::write(socket, "Failed to receive completion message from client" + ec.message());
        return;
    }

    logger::write(socket, "Connection completed successfully");
}
 No newline at end of file
+21 −17
Original line number Diff line number Diff line
@@ -23,10 +23,10 @@ namespace OpenStackBuilder {
        }

        // Read the list command output until we reach EOF, which is returned as an error
        boost::system::error_code ec;
        boost::asio::async_read(std_pipe, buffer, yield[ec]);
        if (ec != asio::error::eof) {
            logger::write("OpenStack destroy error: " + ec.message());
        boost::system::error_code read_ec;
        boost::asio::async_read(std_pipe, buffer, yield[read_ec]);
        if (read_ec != asio::error::eof) {
            logger::write("OpenStack destroy error: " + read_ec.message());
        }

        // Grab exit code from destroy command
@@ -56,6 +56,7 @@ namespace OpenStackBuilder {

        // Fill a set of builders from the property tree data
        std::set<Builder> builders;
        try {
            for (const auto &builder_node : builder_tree) {
                Builder builder;
                auto network = builder_node.second.get<std::string>("Networks");
@@ -66,6 +67,9 @@ namespace OpenStackBuilder {
                builder.port = "8080";
                builders.insert(builder);
            }
        } catch(const pt::ptree_error &e) {
            logger::write(std::string() + "Error parsing builders: " + e.what());
        }

        return builders;
    }
@@ -117,10 +121,10 @@ namespace OpenStackBuilder {
        }

        // Read the destroy_command output until we reach EOF, which is returned as an error
        boost::system::error_code ec;
        boost::asio::async_read(std_pipe, buffer, yield[ec]);
        if (ec != asio::error::eof) {
            logger::write("OpenStack destroy error: " + ec.message());
        boost::system::error_code read_ec;
        boost::asio::async_read(std_pipe, buffer, yield[read_ec]);
        if (read_ec != asio::error::eof) {
            logger::write("OpenStack destroy error: " + read_ec.message());
        }

        // Grab exit code from destroy command
+2 −1
Original line number Diff line number Diff line
@@ -11,7 +11,8 @@ void Reservation::async_wait(asio::yield_context yield) {
        boost::system::error_code ec;
        ready_timer.async_wait(yield[ec]);
        if (ec != asio::error::operation_aborted) {
            throw std::system_error(EBADMSG, std::system_category());
            logger::write("Error in reservation async_wait" + ec.message());
            return;
        }
    }
}
Loading