Commit d9f6ac33 authored by AdamSimpson's avatar AdamSimpson
Browse files

get rid of heartbeat cruft

parent a714041a
Loading
Loading
Loading
Loading
+76 −76
Original line number Diff line number Diff line
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <fstream>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/process.hpp>
#include <boost/regex.hpp>
#include "Messenger.h"
#include <iostream>
#include "Logger.h"
#include "Messenger.h"
#include <memory>

namespace asio = boost::asio;
namespace bp = boost::process;
using asio::ip::tcp;
using callback_type = std::function<void(const boost::system::error_code&, std::size_t size)>;
namespace bp = boost::process;

int main(int argc, char *argv[]) {

    try {
        // Accept a connection to use this builder
        // Block until the initial client connects
        asio::io_service io_service;
        tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 8080));
        tcp::socket socket(io_service);
        acceptor.accept(socket);

        logger::write("Received connection from: " + boost::lexical_cast<std::string>(socket.remote_endpoint()));
        logger::write(socket, "Client connected");

        Messenger messenger(socket);

        // Read the recipe file
        Messenger client_messenger(socket);
        client_messenger.receive_file("container.def");
        // Once we're connected start the build process
        asio::spawn(io_service,
                    [&](asio::yield_context yield) {
                        // Receive the definition file from the client
                        messenger.async_receive_file("container.def", yield);

        logger::write("Recipe file received");
                        logger::write(socket, "Received container.def");

                        // Create a pipe to communicate with our build subprocess
                        bp::async_pipe std_pipe(io_service);
@@ -36,17 +40,18 @@ int main(int argc, char *argv[]) {
                        // Launch our build as a subprocess
                        // We use "unbuffer" to fake the build into thinking it has a real TTY, which the command output eventually will
                        // This causes things like wget and color ls to work nicely
        // TODO : perhaps set TERM as well to something like xterm ?
        std::string build_command("/usr/bin/sudo /usr/bin/unbuffer /usr/local/bin/singularity build ./container.img ./container.def");
                        std::string build_command(
                                "/usr/bin/sudo /usr/bin/unbuffer /usr/local/bin/singularity build ./container.img ./container.def");

                        bp::group group;
                        std::error_code build_ec;
        bp::child build_child(build_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, build_ec);
                        bp::child build_child(build_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe,
                                              group, build_ec);
                        if (build_ec) {
            logger::write("subprocess error: " + build_ec.message());
                            logger::write(socket, "subprocess error: " + build_ec.message());
                        }

        logger::write("Running build command: " + build_command);
                        logger::write(socket, "launched build process: " + build_command);

                        // Read process pipe output and write it to the client
                        // line buffer(ish) by reading from the pipe until we hit \n, \r
@@ -55,43 +60,38 @@ int main(int argc, char *argv[]) {
                        // TODO: just call read_some perhaps?
                        asio::streambuf buffer;
                        boost::regex line_matcher{"\\r|\\n"};
                        std::size_t read_size = 0;

        // Callback for handling reading from pipe and sending output to client
        callback_type read_std_pipe = [&](const boost::system::error_code& ec, std::size_t size) {
            client_messenger.send(buffer);
                        do {
                            // Read from the pipe into a buffer
                            read_size = asio::async_read_until(std_pipe, buffer, line_matcher, yield);

            if(size > 0) {
                asio::async_read_until(std_pipe, buffer, line_matcher, read_std_pipe);
            }
            else if(ec == asio::error::eof) {
                logger::write("build output EOF");
            }
            else {
                throw std::system_error(EBADMSG, std::generic_category(), "Error reading build output" + ec.message());
            }
        };
        // Start reading child stdout/err from pipe
        asio::async_read_until(std_pipe, buffer, line_matcher, read_std_pipe);

        io_service.run();
                            // Write the buffer to our socket
                            messenger.async_send(buffer, yield);
                        } while (read_size > 0);

                        // Get the return value from the build subprocess
                        logger::write(socket, "Waiting on build process to exit");
                        build_child.wait();
                        int build_code = build_child.exit_code();

        logger::write("Sending image to client");

                        // Send the container to the client
                        if (build_code == 0) {
            logger::write("Image failed to build");
            client_messenger.send_file("container.img");
                            logger::write(socket, "Build complete, sending container");
                            messenger.send_file("container.img");
                        } else {
                            logger::write(socket, "Build failed, not sending container");
                        }
                    });

        logger::write("Finished sending image to client");
        // Begin processing our connections and queue
        io_service.run();
    }
    catch (std::exception &e) {
        logger::write(std::string("Build error: ") + e.what());
        logger::write(std::string() + "Build server exception: " + e.what());
    }

    logger::write("Builder shutting down");

    return 0;
}
 No newline at end of file

Builder/main_heartbeat.cpp

deleted100644 → 0
+0 −156
Original line number Diff line number Diff line
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/process.hpp>
#include <boost/regex.hpp>
#include <iostream>
#include "Logger.h"
#include "Messenger.h"
#include <memory>

namespace asio = boost::asio;
using asio::ip::tcp;
namespace bp = boost::process;

int main(int argc, char *argv[]) {

    try {
        // Block until the initial client connects
        asio::io_service io_service;
        tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 8080));
        std::shared_ptr<tcp::socket> socket = std::make_shared<tcp::socket>(io_service);
        acceptor.accept(*socket);

        logger::write(*socket, "Client connected");

        std::shared_ptr<Messenger> messenger = std::make_shared<Messenger>(*socket);

        // Once we're connected start the build process in a coroutine
        asio::spawn(io_service,
                    [&](asio::yield_context yield) {
                        // Receive the definition file from the client
                        messenger->async_receive_file("container.def", yield);

                        logger::write(*socket, "Received container.def");

                        // Create a pipe to communicate with our build subprocess
                        bp::async_pipe std_pipe(io_service);

                        // Launch our build as a subprocess
                        // We use "unbuffer" to fake the build into thinking it has a real TTY, which the command output eventually will
                        // This causes things like wget and color ls to work nicely
                        std::string build_command(
                                "/usr/bin/sudo /usr/bin/unbuffer /usr/local/bin/singularity build ./container.img ./container.def");

                        bp::group group;
                        std::error_code build_ec;
                        bp::child build_child(build_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe,
                                              group, build_ec);
                        if (build_ec) {
                            logger::write(*socket, "subprocess error: " + build_ec.message());
                        }

                        logger::write(*socket, "launched build process: " + build_command);

                        // Read process pipe output and write it to the client
                        // line buffer(ish) by reading from the pipe until we hit \n, \r
                        // NOTE: read_until will fill buffer until line_matcher is satisfied but generally will contain additional data.
                        // This is fine as all we care about is dumping everything from std_pipe to our buffer and don't require exact line buffering
                        // TODO: just call read_some perhaps?
                        asio::streambuf buffer;
                        boost::regex line_matcher{"\\r|\\n"};
                        std::size_t read_size = 0;
                        boost::system::error_code read_ec;
                        boost::system::error_code write_ec;

                        do {
                            // Read from the pipe into a buffer
                            read_size = asio::async_read_until(std_pipe, buffer, line_matcher, yield[read_ec]);
                            if (read_ec != boost::system::errc::success) {
                                logger::write(*socket, "Error reading builder output");
                            }
                            // Write the buffer to our socket
                            // If the connection is aborted we retry assuming we have a fresh socket
                            do {
                                messenger->async_send(buffer, yield[write_ec]);
                            } while(write_ec == boost::asio::error::connection_aborted);
                        } while (read_size > 0);

                        // Get the return value from the build subprocess
                        logger::write(*socket, "Waiting on build process to exit");
                        build_child.wait();
                        int build_code = build_child.exit_code();

                        // Send the container to the client
                        // TODO we send the file in a blocking manner so the heartbeat doesn't process - fix file transfer
                        // TODO git puto handle heartbeat/resume
                        if (build_code == 0) {
                            logger::write(*socket, "Build complete, sending container");
                            messenger->send_file("container.img");
                        } else {
                            logger::write(*socket, "Build failed, not sending container");
                        }
                    });

        // Start the heartbeat which will let the client know we're alive every `pulse` seconds
        // The heartbeat is guarded by a watchdog that if triggered will cause the connection to be restarted
        asio::deadline_timer heartbeat(io_service);
        asio::deadline_timer watchdog(io_service);
        const auto pulse = boost::posix_time::seconds(5);
        const auto timeout = boost::posix_time::seconds(15);

        // When a hang is detected we destroy the socket and attempt to create a new one
        // This is handled outside of the heartbeat coroutine as if it fires that coroutine will be jammed up
        timer_callback_t heartbeat_hung = [&](const boost::system::error_code &ec) {
            // Ignore the timer being canceled
            if (ec != boost::system::errc::success) {
                return;
            }

            logger::write(*socket, "killing socket due to heartbeat hang");

            // Destroy the current socket
            socket->cancel();
            socket->close();

            logger::write(*socket, "accepting a reconnect");

            // Accept a new connection and reset the socket and messenger
            socket = std::make_shared<tcp::socket>(io_service);
            acceptor.accept(*socket);
            messenger = std::make_shared<Messenger>(*socket);

            logger::write(*socket, "opened new socket");
        };

        asio::spawn(io_service,
                    [&](asio::yield_context yield) {
                        for (;;) {
                            // arm the watchdog
                            watchdog.expires_from_now(timeout);
                            watchdog.async_wait(heartbeat_hung);

                            // Try to send our heartbeat
                            logger::write("Attempt to send heartbeat");
                            messenger->async_send_heartbeat(yield);
                            logger::write("heartbeat sent");

                            // Set the next heartbeat
                            heartbeat.expires_from_now(pulse);
                            // Wait until the next heartbeat
                            heartbeat.async_wait(yield);
                        }
                    });

        // Begin processing our connections and queue
        io_service.run();
    }
    catch (std::exception &e) {
        logger::write(std::string() + "Build server exception: " + e.what());
    }

    logger::write("Builder shutting down");

    return 0;
}
 No newline at end of file
+2 −2
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@ set(SOURCE_FILES_QUEUE
        BuilderQueue/include/OpenStackBuilder.h)

set(SOURCE_FILES_BUILDER
        Builder/main_heartbeat.cpp
        Builder/main.cpp
        Common/include/Builder.h
        Common/src/Logger.cpp
        Common/include/Logger.h
@@ -34,7 +34,7 @@ set(SOURCE_FILES_BUILDER

# Files related to the API
set(SOURCE_FILES_CLIENT
        Client/main_heartbeat.cpp
        Client/main.cpp
        Common/include/Builder.h
        Common/src/Logger.cpp
        Common/include/Logger.h
+43 −27
Original line number Diff line number Diff line
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <fstream>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/process.hpp>
#include <boost/regex.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include "Messenger.h"
#include <memory>

namespace asio = boost::asio;
using asio::ip::tcp;
namespace bp = boost::process;

std::string queue_host() {
    auto env = std::getenv("QUEUE_HOST");
    if (!env) {
        throw std::system_error(ENOTSUP, std::system_category(), "QUEUE_HOST");
    }

    return std::string(env);
}

@@ -26,6 +30,7 @@ std::string queue_port() {
}

int main(int argc, char *argv[]) {

    try {

        // Check for correct number of arguments
@@ -39,8 +44,10 @@ int main(int argc, char *argv[]) {
        std::cout << "Attempting to connect to BuilderQueue: " << queue_host() << ":" << queue_port() << std::endl;

        asio::io_service io_service;

        tcp::socket queue_socket(io_service);
        tcp::resolver queue_resolver(io_service);
        boost::system::error_code ec;
        asio::connect(queue_socket, queue_resolver.resolve({queue_host(), queue_port()}));

        std::cout << "Connected to BuilderQueue: " << queue_host() << ":" << queue_port() << std::endl;
@@ -53,13 +60,11 @@ int main(int argc, char *argv[]) {
        // Receive an available builder
        auto builder = queue_messenger.receive_builder();

        std::cout << "Received build host: " << builder.host << ":" << builder.port << std::endl;
        std::cout << "Attempting to connect to build host: " << builder.host << ":" << builder.port << std::endl;

        // Connect to the builder
        // The builder isn't guaranteed to be reachable immediately and may take some time to full get stood up
        // Block until the initial connection to the builder is made
        tcp::socket builder_socket(io_service);
        tcp::resolver builder_resolver(io_service);
        boost::system::error_code ec;
        do {
            asio::connect(builder_socket, builder_resolver.resolve({builder.host, builder.port}), ec);
        } while (ec != boost::system::errc::success);
@@ -68,31 +73,42 @@ int main(int argc, char *argv[]) {

        Messenger builder_messenger(builder_socket);

        // Once we're connected to the builder start the client process
        asio::spawn(io_service,
                    [&](asio::yield_context yield) {
                        std::cout << "Sending definition: " << definition_path << std::endl;

                        // Send the definition file
        builder_messenger.send_file(definition_path);
                        builder_messenger.async_send_file(definition_path, yield);

        // Read the build output until a zero length message is sent
                        std::cout << "Start reading builder output:" << std::endl;
                        std::string line;
                        boost::system::error_code read_ec;

                        do {
            line = builder_messenger.receive(10, 0);
                            auto line = builder_messenger.async_receive(yield);
                            std::cout << line;
                        } while (!line.empty());


        // Receiving file
        std::cout << "Container built!\n";

                        // Read the container image
        builder_messenger.receive_file(container_path);
                        std::cout << "Begin receive of container: " << container_path << std::endl;

        std::cout << "Container built!\n";
                        builder_messenger.async_receive_file(container_path, yield);

                        std::cout << "End receive of container: " << container_path << std::endl;

                        // Inform the queue we're done
        queue_messenger.send("checkout_builder_complete");
                        queue_messenger.async_send(std::string("checkout_builder_complete"), yield);
                    });

        // Begin processing our connections and queue
        io_service.run();
    }
    catch (std::exception &e) {
        std::cerr << "\033[1;31m Failed to build container: " << e.what() << "\033[0m\n";
        std::cout << std::string() + "Build server exception: " + e.what() << std::endl;
    }

    std::cout << "Client shutting down\n";

    return 0;
}
 No newline at end of file

Client/main_heartbeat.cpp

deleted100644 → 0
+0 −158
Original line number Diff line number Diff line
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/process.hpp>
#include <boost/regex.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include "Messenger.h"
#include <memory>

namespace asio = boost::asio;
using asio::ip::tcp;
namespace bp = boost::process;

std::string queue_host() {
    auto env = std::getenv("QUEUE_HOST");
    if (!env) {
        throw std::system_error(ENOTSUP, std::system_category(), "QUEUE_HOST");
    }
    return std::string(env);
}

std::string queue_port() {
    auto env = std::getenv("QUEUE_PORT");
    if (!env) {
        throw std::system_error(ENOTSUP, std::system_category(), "QUEUE_PORT");
    }
    return std::string(env);
}

int main(int argc, char *argv[]) {

    try {

        // Check for correct number of arguments
        if (argc != 3) {
            std::cerr << "Usage: ContainerBuilder <definition path> <container path>\n";
            return 1;
        }
        std::string definition_path(argv[1]);
        std::string container_path(argv[2]);

        std::cout << "Attempting to connect to BuilderQueue: " << queue_host() << ":" << queue_port() << std::endl;

        asio::io_service io_service;

        tcp::socket queue_socket(io_service);
        tcp::resolver queue_resolver(io_service);
        boost::system::error_code ec;
        asio::connect(queue_socket, queue_resolver.resolve({queue_host(), queue_port()}));

        std::cout << "Connected to BuilderQueue: " << queue_host() << ":" << queue_port() << std::endl;

        Messenger queue_messenger(queue_socket);

        // Initiate a build request
        queue_messenger.send("checkout_builder_request");

        // Receive an available builder
        auto builder = queue_messenger.receive_builder();

        std::cout << "Attempting to connect to build host: " << builder.host << ":" << builder.port << std::endl;

        // Block until the initial connection to the builder is made
        std::shared_ptr<tcp::socket> builder_socket = std::make_shared<tcp::socket>(io_service);
        tcp::resolver builder_resolver(io_service);
        do {
            asio::connect(*builder_socket, builder_resolver.resolve({builder.host, builder.port}), ec);
        } while (ec != boost::system::errc::success);

        std::cout << "Connected to builder host: " << builder.host << ":" << builder.port << std::endl;

        std::shared_ptr<Messenger> builder_messenger = std::make_shared<Messenger>(*builder_socket);

        // When a hang is detected this callback will destroy the socket, wait for another connection, reset the messenger, and continue
        timer_callback_t heartbeat_hung = [&](const boost::system::error_code &ec) {
            boost::system::error_code connect_ec;

            // Ignore the timer being canceled
            if (ec != boost::system::errc::success) {
                return;
            }

            std::cout<<"Hang in there, we're resetting the socket\n";

            // Destroy the current socket
            builder_socket->cancel();
            builder_socket->close();

            // Try to resolve a new connection to the builder and reset the socket and messenger
            builder_socket = std::make_shared<tcp::socket>(io_service);
            tcp::resolver builder_resolver(io_service);
            do {
                asio::connect(*builder_socket, builder_resolver.resolve({builder.host, builder.port}), connect_ec);
            } while (connect_ec != boost::system::errc::success);
            builder_messenger = std::make_shared<Messenger>(*builder_socket);
        };

        // Once we're connected to the builder start the client process in the coroutine
        asio::spawn(io_service,
                    [&](asio::yield_context yield) {
                        std::cout<<"Sending definition: "<<definition_path<<std::endl;

                        // Send the definition file
                        builder_messenger->async_send_file(definition_path, yield);

                        // watchdog timers which will guard against socket hangs
                        asio::deadline_timer watchdog(io_service);
                        const auto timeout = boost::posix_time::seconds(15);

                        // Read the build output until a zero length message, that is not a heartbeat, is sent
                        // TODO fix this mess with handling of the heartbeat
                        std::cout<<"Start reading builder output:"<<std::endl;
                        std::string line;
                        boost::system::error_code read_ec;

                        do {
                            // Reset watchdog
                            watchdog.expires_from_now(timeout);
                            watchdog.async_wait(heartbeat_hung);

                            MessageType type;
                            do {
                                line = builder_messenger->async_receive(yield[read_ec], &type);
                                if (type == MessageType::string) {
                                    std::cout << line;
                                } else if (type == MessageType::heartbeat) {
                                    line = std::string("heartbeat");
                                }
                            } while(read_ec == boost::asio::error::connection_aborted);
                        } while (!line.empty());

                        watchdog.cancel();

                        // Read the container image
                        // TODO watchdog on file transfer
                        std::cout<<"Begin receive of container: "<<container_path<<std::endl;

                        builder_messenger->async_receive_file(container_path, yield);

                        std::cout<<"End receive of container: "<<container_path<<std::endl;

                        // Inform the queue we're done
                        queue_messenger.async_send(std::string("checkout_builder_complete"), yield);
                    });

        // Begin processing our connections and queue
        io_service.run();
    }
    catch (std::exception &e) {
        std::cout<< std::string() + "Build server exception: " + e.what() << std::endl;
    }

    std::cout<< "Client shutting down\n";

    return 0;
}
 No newline at end of file
Loading