Loading Client/include/Client.h +1 −63 Original line number Diff line number Diff line #pragma once #include <boost/asio/io_service.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/read_until.hpp> #include <boost/asio/spawn.hpp> #include <boost/regex.hpp> #include <boost/asio.hpp> #include <iostream> #include "Messenger.h" #include "ClientData.h" namespace asio = boost::asio; using asio::ip::tcp; class Client { public: explicit Client(int argc, char **argv) { parse_environment(); parse_arguments(argc, argv); // Hide the cursor and disable buffering for cleaner builder output std::cout << "\e[?25l" << std::flush; std::cout.setf(std::ios::unitbuf); // Full client connection - this will not run until the io_service is started asio::spawn(io_service, [this](asio::yield_context yield) { auto queue_messenger = connect_to_queue(yield); if (queue_messenger.error) { throw std::runtime_error("Error connecting to queue_messenger: " + queue_messenger.error.message()); } auto builder_messenger = connect_to_builder(queue_messenger, yield); if (builder_messenger.error) { throw std::runtime_error("Error connecting to builder: " + builder_messenger.error.message()); } // Send client data to builder builder_messenger.async_send(client_data()); if (builder_messenger.error) { throw std::runtime_error("Error sending client data to builder!"); } logger::write("Sending definition: " + definition_path); builder_messenger.async_send_file(definition_path, true); if (builder_messenger.error) { throw std::runtime_error("Error sending definition file to builder: " + builder_messenger.error.message()); } std::string line; do { line = builder_messenger.async_receive(); if (builder_messenger.error) { throw std::runtime_error("Error streaming build output!"); } std::cout << line; } while (!line.empty()); builder_messenger.async_receive_file(container_path, true); if (builder_messenger.error) { throw std::runtime_error("Error downloading container image: " + builder_messenger.error.message()); } logger::write("Container received: " + container_path, logger::severity_level::success); // Inform the queue we're done queue_messenger.async_send(std::string("checkout_builder_complete")); if (queue_messenger.error) { throw std::runtime_error("Error ending build!"); } }); } explicit Client(int argc, char **argv); ~Client() { // Restore the cursor Loading Client/src/Client.cpp +60 −9 Original line number Diff line number Diff line #include "Client.h" #include <iostream> #include <boost/program_options.hpp> #include <boost/process.hpp> #include "WaitingAnimation.h" Loading @@ -6,6 +7,53 @@ namespace bp = boost::process; Client::Client(int argc, char **argv) { parse_environment(); parse_arguments(argc, argv); // Hide the cursor and disable buffering for cleaner builder output std::cout << "\e[?25l" << std::flush; std::cout.setf(std::ios::unitbuf); // Full client connection - this will not run until the io_service is started asio::spawn(io_service, [this](asio::yield_context yield) { auto queue_messenger = connect_to_queue(yield); auto builder_messenger = connect_to_builder(queue_messenger, yield); boost::system::error_code error; // Send client data to builder builder_messenger.async_write_client_data(client_data(), yield, error); if (error) { throw std::runtime_error("Error sending client data to builder!"); } logger::write("Sending definition: " + definition_path); builder_messenger.async_write_file(definition_path, yield, error); if (error) { throw std::runtime_error("Error sending definition file to builder: " + error.message()); } builder_messenger.async_stream_print(yield, error); builder_messenger.async_read_file(container_path, yield, error); if (error) { throw std::runtime_error("Error downloading container image: " + error.message()); } logger::write("Container received: " + container_path, logger::severity_level::success); queue_messenger.async_write_string("checkout_builder_complete", yield, error); if (error) { throw std::runtime_error("Error ending build!"); } }); } void Client::parse_arguments(int argc, char **argv) { namespace po = boost::program_options; Loading Loading @@ -70,10 +118,11 @@ Messenger Client::connect_to_queue(asio::yield_context yield) { // Start waiting animation WaitingAnimation wait_queue("Connecting to BuilderQueue: "); Messenger queue_messenger(io_service, queue_host, queue_port, yield); if (queue_messenger.error) { boost::system::error_code error; Messenger queue_messenger(io_service, queue_host, queue_port, yield, error); if (error) { wait_queue.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Failed to connect to builder queue!"); throw std::runtime_error("The ContainerBuilder queue is currently unreachable."); } wait_queue.stop("Connected to queue: " + queue_host + ":" + queue_port, logger::severity_level::success); Loading @@ -83,23 +132,25 @@ Messenger Client::connect_to_queue(asio::yield_context yield) { Messenger Client::connect_to_builder(Messenger &queue_messenger, asio::yield_context yield) { WaitingAnimation wait_builder("Requesting BuilderData: "); boost::system::error_code error; // Request a builder from the queue queue_messenger.async_send("checkout_builder_request"); if (queue_messenger.error) { queue_messenger.async_write_string("checkout_builder_request", yield, error); if (error) { wait_builder.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Error communicating with the builder queue!"); } // Wait to receive the builder from the queue auto builder = queue_messenger.async_receive_builder(); if (queue_messenger.error) { auto builder = queue_messenger.async_read_builder(yield, error); if (error) { wait_builder.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Error obtaining VM builder from builder queue!"); } // Create a messenger to the builder Messenger builder_messenger(io_service, builder.host, builder.port, yield); if (builder_messenger.error) { Messenger builder_messenger(io_service, builder.host, builder.port, yield, error); if (error) { wait_builder.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Failed to connect to builder!"); } Loading Common/include/Messenger.h +3 −0 Original line number Diff line number Diff line Loading @@ -98,6 +98,9 @@ public: asio::yield_context yield, boost::system::error_code& error); void async_stream_print(asio::yield_context yield, boost::system::error_code& error); private: websocket::stream<tcp::socket> stream; }; No newline at end of file Common/src/Messenger.cpp +19 −3 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ void Messenger::async_read_file(boost::filesystem::path file_path, // Read file in chunks do { auto bytes_read = stream.async_read_some(buffer, chunk_size, yield[error]); auto bytes_read = stream.async_read_some(buffer, yield[error]); if (error != beast::errc::success && error != asio::error::eof) { logger::write("Error reading file: " + file_path.string() + " " + std::strerror(errno), logger::severity_level::error); Loading Loading @@ -154,7 +154,7 @@ void Messenger::async_write_file(boost::filesystem::path file_path, BuilderData Messenger::async_read_builder(asio::yield_context yield, boost::system::error_code &error) { // Read in the serialized builder as a string auto serialized_builder = this->async_read_string(yield, error); auto serialized_builder = async_read_string(yield, error); if (error) { error = boost::system::errc::make_error_code(boost::system::errc::io_error); logger::write("Received bad builder: " + error.message(), logger::severity_level::error); Loading Loading @@ -191,7 +191,7 @@ void Messenger::async_write_builder(BuilderData builder, ClientData Messenger::async_receive_client_data(asio::yield_context yield, boost::system::error_code &error) { // Read in the serialized client data as a string auto serialized_client_data = this->async_read_string(yield, error); auto serialized_client_data = async_read_string(yield, error); if (error) { error = boost::system::errc::make_error_code(boost::system::errc::io_error); logger::write("Received bad client data: " + error.message(), logger::severity_level::error); Loading Loading @@ -246,3 +246,19 @@ void Messenger::async_write_pipe(bp::async_pipe pipe, } } while (!fin); } // Print a large message as it arrives to the socket void Messenger::async_stream_print(asio::yield_context yield, boost::system::error_code& error) { const auto max_read_bytes = 4096; std::array<char, 4096> buffer; do { auto bytes_read = stream.async_read_some(buffer, yield[error]); if (error != beast::errc::success && error != asio::error::eof) { logger::write(std::string() + "Error reading process output: " + std::strerror(errno), logger::severity_level::error); return; } std::cout.write(buffer.data(), bytes_read); } while (!stream.is_message_done()); } No newline at end of file Loading
Client/include/Client.h +1 −63 Original line number Diff line number Diff line #pragma once #include <boost/asio/io_service.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/read_until.hpp> #include <boost/asio/spawn.hpp> #include <boost/regex.hpp> #include <boost/asio.hpp> #include <iostream> #include "Messenger.h" #include "ClientData.h" namespace asio = boost::asio; using asio::ip::tcp; class Client { public: explicit Client(int argc, char **argv) { parse_environment(); parse_arguments(argc, argv); // Hide the cursor and disable buffering for cleaner builder output std::cout << "\e[?25l" << std::flush; std::cout.setf(std::ios::unitbuf); // Full client connection - this will not run until the io_service is started asio::spawn(io_service, [this](asio::yield_context yield) { auto queue_messenger = connect_to_queue(yield); if (queue_messenger.error) { throw std::runtime_error("Error connecting to queue_messenger: " + queue_messenger.error.message()); } auto builder_messenger = connect_to_builder(queue_messenger, yield); if (builder_messenger.error) { throw std::runtime_error("Error connecting to builder: " + builder_messenger.error.message()); } // Send client data to builder builder_messenger.async_send(client_data()); if (builder_messenger.error) { throw std::runtime_error("Error sending client data to builder!"); } logger::write("Sending definition: " + definition_path); builder_messenger.async_send_file(definition_path, true); if (builder_messenger.error) { throw std::runtime_error("Error sending definition file to builder: " + builder_messenger.error.message()); } std::string line; do { line = builder_messenger.async_receive(); if (builder_messenger.error) { throw std::runtime_error("Error streaming build output!"); } std::cout << line; } while (!line.empty()); builder_messenger.async_receive_file(container_path, true); if (builder_messenger.error) { throw std::runtime_error("Error downloading container image: " + builder_messenger.error.message()); } logger::write("Container received: " + container_path, logger::severity_level::success); // Inform the queue we're done queue_messenger.async_send(std::string("checkout_builder_complete")); if (queue_messenger.error) { throw std::runtime_error("Error ending build!"); } }); } explicit Client(int argc, char **argv); ~Client() { // Restore the cursor Loading
Client/src/Client.cpp +60 −9 Original line number Diff line number Diff line #include "Client.h" #include <iostream> #include <boost/program_options.hpp> #include <boost/process.hpp> #include "WaitingAnimation.h" Loading @@ -6,6 +7,53 @@ namespace bp = boost::process; Client::Client(int argc, char **argv) { parse_environment(); parse_arguments(argc, argv); // Hide the cursor and disable buffering for cleaner builder output std::cout << "\e[?25l" << std::flush; std::cout.setf(std::ios::unitbuf); // Full client connection - this will not run until the io_service is started asio::spawn(io_service, [this](asio::yield_context yield) { auto queue_messenger = connect_to_queue(yield); auto builder_messenger = connect_to_builder(queue_messenger, yield); boost::system::error_code error; // Send client data to builder builder_messenger.async_write_client_data(client_data(), yield, error); if (error) { throw std::runtime_error("Error sending client data to builder!"); } logger::write("Sending definition: " + definition_path); builder_messenger.async_write_file(definition_path, yield, error); if (error) { throw std::runtime_error("Error sending definition file to builder: " + error.message()); } builder_messenger.async_stream_print(yield, error); builder_messenger.async_read_file(container_path, yield, error); if (error) { throw std::runtime_error("Error downloading container image: " + error.message()); } logger::write("Container received: " + container_path, logger::severity_level::success); queue_messenger.async_write_string("checkout_builder_complete", yield, error); if (error) { throw std::runtime_error("Error ending build!"); } }); } void Client::parse_arguments(int argc, char **argv) { namespace po = boost::program_options; Loading Loading @@ -70,10 +118,11 @@ Messenger Client::connect_to_queue(asio::yield_context yield) { // Start waiting animation WaitingAnimation wait_queue("Connecting to BuilderQueue: "); Messenger queue_messenger(io_service, queue_host, queue_port, yield); if (queue_messenger.error) { boost::system::error_code error; Messenger queue_messenger(io_service, queue_host, queue_port, yield, error); if (error) { wait_queue.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Failed to connect to builder queue!"); throw std::runtime_error("The ContainerBuilder queue is currently unreachable."); } wait_queue.stop("Connected to queue: " + queue_host + ":" + queue_port, logger::severity_level::success); Loading @@ -83,23 +132,25 @@ Messenger Client::connect_to_queue(asio::yield_context yield) { Messenger Client::connect_to_builder(Messenger &queue_messenger, asio::yield_context yield) { WaitingAnimation wait_builder("Requesting BuilderData: "); boost::system::error_code error; // Request a builder from the queue queue_messenger.async_send("checkout_builder_request"); if (queue_messenger.error) { queue_messenger.async_write_string("checkout_builder_request", yield, error); if (error) { wait_builder.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Error communicating with the builder queue!"); } // Wait to receive the builder from the queue auto builder = queue_messenger.async_receive_builder(); if (queue_messenger.error) { auto builder = queue_messenger.async_read_builder(yield, error); if (error) { wait_builder.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Error obtaining VM builder from builder queue!"); } // Create a messenger to the builder Messenger builder_messenger(io_service, builder.host, builder.port, yield); if (builder_messenger.error) { Messenger builder_messenger(io_service, builder.host, builder.port, yield, error); if (error) { wait_builder.stop("Failed\n", logger::severity_level::fatal); throw std::runtime_error("Failed to connect to builder!"); } Loading
Common/include/Messenger.h +3 −0 Original line number Diff line number Diff line Loading @@ -98,6 +98,9 @@ public: asio::yield_context yield, boost::system::error_code& error); void async_stream_print(asio::yield_context yield, boost::system::error_code& error); private: websocket::stream<tcp::socket> stream; }; No newline at end of file
Common/src/Messenger.cpp +19 −3 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ void Messenger::async_read_file(boost::filesystem::path file_path, // Read file in chunks do { auto bytes_read = stream.async_read_some(buffer, chunk_size, yield[error]); auto bytes_read = stream.async_read_some(buffer, yield[error]); if (error != beast::errc::success && error != asio::error::eof) { logger::write("Error reading file: " + file_path.string() + " " + std::strerror(errno), logger::severity_level::error); Loading Loading @@ -154,7 +154,7 @@ void Messenger::async_write_file(boost::filesystem::path file_path, BuilderData Messenger::async_read_builder(asio::yield_context yield, boost::system::error_code &error) { // Read in the serialized builder as a string auto serialized_builder = this->async_read_string(yield, error); auto serialized_builder = async_read_string(yield, error); if (error) { error = boost::system::errc::make_error_code(boost::system::errc::io_error); logger::write("Received bad builder: " + error.message(), logger::severity_level::error); Loading Loading @@ -191,7 +191,7 @@ void Messenger::async_write_builder(BuilderData builder, ClientData Messenger::async_receive_client_data(asio::yield_context yield, boost::system::error_code &error) { // Read in the serialized client data as a string auto serialized_client_data = this->async_read_string(yield, error); auto serialized_client_data = async_read_string(yield, error); if (error) { error = boost::system::errc::make_error_code(boost::system::errc::io_error); logger::write("Received bad client data: " + error.message(), logger::severity_level::error); Loading Loading @@ -246,3 +246,19 @@ void Messenger::async_write_pipe(bp::async_pipe pipe, } } while (!fin); } // Print a large message as it arrives to the socket void Messenger::async_stream_print(asio::yield_context yield, boost::system::error_code& error) { const auto max_read_bytes = 4096; std::array<char, 4096> buffer; do { auto bytes_read = stream.async_read_some(buffer, yield[error]); if (error != beast::errc::success && error != asio::error::eof) { logger::write(std::string() + "Error reading process output: " + std::strerror(errno), logger::severity_level::error); return; } std::cout.write(buffer.data(), bytes_read); } while (!stream.is_message_done()); } No newline at end of file