Loading Builder/main.cpp +19 −7 Original line number Diff line number Diff line Loading @@ -29,8 +29,14 @@ int main(int argc, char *argv[]) { // Once we're connected start the build process asio::spawn(io_service, [&](asio::yield_context yield) { boost::system::error_code error; // Receive the definition file from the client messenger.async_receive_file("container.def", yield); messenger.async_receive_file("container.def", yield[error]); if(error) { logger::write("Error receiving definition file: " + error.message()); return; } logger::write(socket, "Received container.def"); Loading @@ -49,6 +55,7 @@ int main(int argc, char *argv[]) { group, build_ec); if (build_ec) { logger::write(socket, "subprocess error: " + build_ec.message()); return; } logger::write(socket, "launched build process: " + build_command); Loading @@ -61,13 +68,18 @@ int main(int argc, char *argv[]) { asio::streambuf buffer; boost::regex line_matcher{"\\r|\\n"}; std::size_t read_size = 0; boost::system::error_code err_code; do { // Read from the pipe into a buffer read_size = asio::async_read_until(std_pipe, buffer, line_matcher, yield[err_code]); read_size = asio::async_read_until(std_pipe, buffer, line_matcher, yield[error]); if(error) { logger::write(socket, "reading process pipe failed: " + error.message()); } // Write the buffer to our socket messenger.async_send(buffer, yield); } while (read_size > 0 && err_code == boost::system::errc::success); messenger.async_send(buffer, yield[error]); if(error) { logger::write(socket, "sending process pipe failed: " + error.message()); } } while (read_size > 0 && !error); // Get the return value from the build subprocess logger::write(socket, "Waiting on build process to exit"); Loading @@ -86,8 +98,8 @@ int main(int argc, char *argv[]) { // Begin processing our connections and queue io_service.run(); } catch (std::exception &e) { logger::write(std::string() + "Build server exception: " + e.what()); catch (...) { logger::write("Unknown builder exception encountered"); } logger::write("Builder shutting down"); Loading BuilderQueue/include/BuilderQueue.h +1 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ public: // Attempt to process the queue after an event that adds/removes builders or requests void tick(asio::yield_context yield); private: asio::io_service &io_service; Loading BuilderQueue/src/BuilderQueue.cpp +22 −6 Original line number Diff line number Diff line Loading @@ -9,7 +9,13 @@ Reservation &BuilderQueue::enter() { void BuilderQueue::tick(asio::yield_context yield) { // Update list of "Active" OpenStack builders auto all_builders = OpenStackBuilder::get_builders(io_service, yield); boost::system::error_code error; auto all_builders = OpenStackBuilder::get_builders(io_service, yield[error]); if (error) { logger::write("Error calling get_builders" + error.message()); return; } std::set<Builder> unavailable_builders; // Delete reservations that are completed Loading @@ -28,8 +34,13 @@ void BuilderQueue::tick(asio::yield_context yield) { reservation.set_enter_cleanup(); asio::spawn(io_service, [this, &reservation](asio::yield_context destroy_yield) { OpenStackBuilder::destroy(reservation.builder.get(), io_service, destroy_yield); boost::system::error_code error; OpenStackBuilder::destroy(reservation.builder.get(), io_service, destroy_yield[error]); if (error) { logger::write("Error destryoing builder " + error.message()); } else { reservation.set_finalize(); } }); } } Loading Loading @@ -59,8 +70,13 @@ void BuilderQueue::tick(asio::yield_context yield) { pending_requests++; asio::spawn(io_service, [this](asio::yield_context request_yield) { OpenStackBuilder::request_create(io_service, request_yield); boost::system::error_code error; OpenStackBuilder::request_create(io_service, request_yield[error]); if (error) { logger::write("Error requesting builder create " + error.message()); } else { pending_requests--; } }); } } No newline at end of file BuilderQueue/src/Connection.cpp +42 −30 Original line number Diff line number Diff line Loading @@ -4,44 +4,56 @@ // Handle the initial request void Connection::begin() { try { auto self(shared_from_this()); asio::spawn(socket.get_io_service(), [this, self](asio::yield_context yield) { try { Messenger messenger(socket); auto request = messenger.async_receive(yield, MessageType::string); if (request == "checkout_builder_request") checkout_builder(yield); else throw std::system_error(EPERM, std::system_category(), request + " not supported"); } catch (std::exception &e) { logger::write(socket, std::string() + "Connection initial request error " + e.what()); boost::system::error_code error; auto request = messenger.async_receive(yield[error], MessageType::string); if (error) { logger::write(socket, "Request failure" + error.message()); } else if (request == "checkout_builder_request") { checkout_builder(yield[ec]); } else { logger::write(socket, "Invalid request message received: " + request); } }); } catch(...) { logger::write(socket, "Unknown connection exception caught"); } } // Handle a client builder request void Connection::checkout_builder(asio::yield_context yield) { try { logger::write(socket, "Checkout resource request"); boost::system::error_code error; Messenger messenger(socket); logger::write(socket, "Requesting resource from the queue"); // Request a builder logger::write(socket, "Requesting builder from the queue"); ReservationRequest reservation(queue); auto builder = reservation.async_wait(yield); Builder builder = reservation.async_wait(yield[error]); if (error) { logger::write("reservation builder request failed: " + error.message()); return; } messenger.async_send(builder, yield); logger::write(socket, "sent builder: " + builder.id + "(" + builder.host + ")"); // Send the fulfilled builder messenger.async_send(builder, yield[error]); if (error) { logger::write(socket, "Error sending builder: " + builder.id + "(" + builder.host + ")"); return; } auto complete = messenger.async_receive(yield, MessageType::string); if (complete == "checkout_builder_complete") logger::write(socket, "Client completed"); else logger::write(socket, "Client complete error: " + complete); } catch (std::exception &e) { logger::write(socket, "Exception: checkout_builder disconnect:" + std::string(e.what())); // Wait on connection to finish logger::write(socket, "Sent builder: " + builder.id + "(" + builder.host + ")"); std::string complete = messenger.async_receive(yield[error], MessageType::string); if (error || complete != "checkout_builder_complete") { logger::write(socket, "Failed to receive completion message from client" + error.message()); return; } logger::write(socket, "Connection completed successfully"); } No newline at end of file BuilderQueue/src/OpenStackBuilder.cpp +37 −33 Original line number Diff line number Diff line Loading @@ -15,18 +15,18 @@ namespace OpenStackBuilder { asio::streambuf buffer; // Asynchronously launch the list command std::error_code list_ec; std::error_code list_error; logger::write("Running command: " + list_command); bp::child list_child(list_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, list_ec); if (list_ec) { logger::write("subprocess error: " + list_ec.message()); bp::child list_child(list_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, list_error); if (list_error) { logger::write("subprocess error: " + list_error.message()); } // Read the list command output until we reach EOF, which is returned as an error boost::system::error_code ec; boost::asio::async_read(std_pipe, buffer, yield[ec]); if (ec != asio::error::eof) { logger::write("OpenStack destroy error: " + ec.message()); boost::system::error_code read_error; boost::asio::async_read(std_pipe, buffer, yield[read_error]); if (read_error != asio::error::eof) { logger::write("OpenStack destroy error: " + read_error.message()); } // Grab exit code from destroy command Loading Loading @@ -56,6 +56,7 @@ namespace OpenStackBuilder { // Fill a set of builders from the property tree data std::set<Builder> builders; try { for (const auto &builder_node : builder_tree) { Builder builder; auto network = builder_node.second.get<std::string>("Networks"); Loading @@ -66,6 +67,9 @@ namespace OpenStackBuilder { builder.port = "8080"; builders.insert(builder); } } catch(const pt::ptree_error &e) { logger::write(std::string() + "Error parsing builders: " + e.what()); } return builders; } Loading @@ -77,19 +81,19 @@ namespace OpenStackBuilder { asio::streambuf buffer; // Asynchronously launch the create command std::error_code create_ec; std::error_code create_error; logger::write("Running command: " + create_command); bp::child build_child(create_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, create_ec); if (create_ec) { logger::write("subprocess error: " + create_ec.message()); create_error); if (create_error) { logger::write("subprocess error: " + create_error.message()); } // Read the create_command output until we reach EOF, which is returned as an error boost::system::error_code read_ec; boost::asio::async_read(std_pipe, buffer, yield[read_ec]); if (read_ec != asio::error::eof) { logger::write("OpenStack create error: " + read_ec.message()); boost::system::error_code read_error; boost::asio::async_read(std_pipe, buffer, yield[read_error]); if (read_error != asio::error::eof) { logger::write("OpenStack create error: " + read_error.message()); } // Grab exit code from builder Loading @@ -108,19 +112,19 @@ namespace OpenStackBuilder { asio::streambuf buffer; // Asynchronously launch the destroy command std::error_code destroy_ec; std::error_code destroy_error; logger::write("Running command: " + destroy_command); bp::child destroy_child(destroy_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, destroy_ec); if (destroy_ec) { logger::write("subprocess error: " + destroy_ec.message()); destroy_error); if (destroy_error) { logger::write("subprocess error: " + destroy_error.message()); } // Read the destroy_command output until we reach EOF, which is returned as an error boost::system::error_code ec; boost::asio::async_read(std_pipe, buffer, yield[ec]); if (ec != asio::error::eof) { logger::write("OpenStack destroy error: " + ec.message()); boost::system::error_code read_ec; boost::asio::async_read(std_pipe, buffer, yield[read_ec]); if (read_ec != asio::error::eof) { logger::write("OpenStack destroy error: " + read_ec.message()); } // Grab exit code from destroy command Loading Loading
Builder/main.cpp +19 −7 Original line number Diff line number Diff line Loading @@ -29,8 +29,14 @@ int main(int argc, char *argv[]) { // Once we're connected start the build process asio::spawn(io_service, [&](asio::yield_context yield) { boost::system::error_code error; // Receive the definition file from the client messenger.async_receive_file("container.def", yield); messenger.async_receive_file("container.def", yield[error]); if(error) { logger::write("Error receiving definition file: " + error.message()); return; } logger::write(socket, "Received container.def"); Loading @@ -49,6 +55,7 @@ int main(int argc, char *argv[]) { group, build_ec); if (build_ec) { logger::write(socket, "subprocess error: " + build_ec.message()); return; } logger::write(socket, "launched build process: " + build_command); Loading @@ -61,13 +68,18 @@ int main(int argc, char *argv[]) { asio::streambuf buffer; boost::regex line_matcher{"\\r|\\n"}; std::size_t read_size = 0; boost::system::error_code err_code; do { // Read from the pipe into a buffer read_size = asio::async_read_until(std_pipe, buffer, line_matcher, yield[err_code]); read_size = asio::async_read_until(std_pipe, buffer, line_matcher, yield[error]); if(error) { logger::write(socket, "reading process pipe failed: " + error.message()); } // Write the buffer to our socket messenger.async_send(buffer, yield); } while (read_size > 0 && err_code == boost::system::errc::success); messenger.async_send(buffer, yield[error]); if(error) { logger::write(socket, "sending process pipe failed: " + error.message()); } } while (read_size > 0 && !error); // Get the return value from the build subprocess logger::write(socket, "Waiting on build process to exit"); Loading @@ -86,8 +98,8 @@ int main(int argc, char *argv[]) { // Begin processing our connections and queue io_service.run(); } catch (std::exception &e) { logger::write(std::string() + "Build server exception: " + e.what()); catch (...) { logger::write("Unknown builder exception encountered"); } logger::write("Builder shutting down"); Loading
BuilderQueue/include/BuilderQueue.h +1 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ public: // Attempt to process the queue after an event that adds/removes builders or requests void tick(asio::yield_context yield); private: asio::io_service &io_service; Loading
BuilderQueue/src/BuilderQueue.cpp +22 −6 Original line number Diff line number Diff line Loading @@ -9,7 +9,13 @@ Reservation &BuilderQueue::enter() { void BuilderQueue::tick(asio::yield_context yield) { // Update list of "Active" OpenStack builders auto all_builders = OpenStackBuilder::get_builders(io_service, yield); boost::system::error_code error; auto all_builders = OpenStackBuilder::get_builders(io_service, yield[error]); if (error) { logger::write("Error calling get_builders" + error.message()); return; } std::set<Builder> unavailable_builders; // Delete reservations that are completed Loading @@ -28,8 +34,13 @@ void BuilderQueue::tick(asio::yield_context yield) { reservation.set_enter_cleanup(); asio::spawn(io_service, [this, &reservation](asio::yield_context destroy_yield) { OpenStackBuilder::destroy(reservation.builder.get(), io_service, destroy_yield); boost::system::error_code error; OpenStackBuilder::destroy(reservation.builder.get(), io_service, destroy_yield[error]); if (error) { logger::write("Error destryoing builder " + error.message()); } else { reservation.set_finalize(); } }); } } Loading Loading @@ -59,8 +70,13 @@ void BuilderQueue::tick(asio::yield_context yield) { pending_requests++; asio::spawn(io_service, [this](asio::yield_context request_yield) { OpenStackBuilder::request_create(io_service, request_yield); boost::system::error_code error; OpenStackBuilder::request_create(io_service, request_yield[error]); if (error) { logger::write("Error requesting builder create " + error.message()); } else { pending_requests--; } }); } } No newline at end of file
BuilderQueue/src/Connection.cpp +42 −30 Original line number Diff line number Diff line Loading @@ -4,44 +4,56 @@ // Handle the initial request void Connection::begin() { try { auto self(shared_from_this()); asio::spawn(socket.get_io_service(), [this, self](asio::yield_context yield) { try { Messenger messenger(socket); auto request = messenger.async_receive(yield, MessageType::string); if (request == "checkout_builder_request") checkout_builder(yield); else throw std::system_error(EPERM, std::system_category(), request + " not supported"); } catch (std::exception &e) { logger::write(socket, std::string() + "Connection initial request error " + e.what()); boost::system::error_code error; auto request = messenger.async_receive(yield[error], MessageType::string); if (error) { logger::write(socket, "Request failure" + error.message()); } else if (request == "checkout_builder_request") { checkout_builder(yield[ec]); } else { logger::write(socket, "Invalid request message received: " + request); } }); } catch(...) { logger::write(socket, "Unknown connection exception caught"); } } // Handle a client builder request void Connection::checkout_builder(asio::yield_context yield) { try { logger::write(socket, "Checkout resource request"); boost::system::error_code error; Messenger messenger(socket); logger::write(socket, "Requesting resource from the queue"); // Request a builder logger::write(socket, "Requesting builder from the queue"); ReservationRequest reservation(queue); auto builder = reservation.async_wait(yield); Builder builder = reservation.async_wait(yield[error]); if (error) { logger::write("reservation builder request failed: " + error.message()); return; } messenger.async_send(builder, yield); logger::write(socket, "sent builder: " + builder.id + "(" + builder.host + ")"); // Send the fulfilled builder messenger.async_send(builder, yield[error]); if (error) { logger::write(socket, "Error sending builder: " + builder.id + "(" + builder.host + ")"); return; } auto complete = messenger.async_receive(yield, MessageType::string); if (complete == "checkout_builder_complete") logger::write(socket, "Client completed"); else logger::write(socket, "Client complete error: " + complete); } catch (std::exception &e) { logger::write(socket, "Exception: checkout_builder disconnect:" + std::string(e.what())); // Wait on connection to finish logger::write(socket, "Sent builder: " + builder.id + "(" + builder.host + ")"); std::string complete = messenger.async_receive(yield[error], MessageType::string); if (error || complete != "checkout_builder_complete") { logger::write(socket, "Failed to receive completion message from client" + error.message()); return; } logger::write(socket, "Connection completed successfully"); } No newline at end of file
BuilderQueue/src/OpenStackBuilder.cpp +37 −33 Original line number Diff line number Diff line Loading @@ -15,18 +15,18 @@ namespace OpenStackBuilder { asio::streambuf buffer; // Asynchronously launch the list command std::error_code list_ec; std::error_code list_error; logger::write("Running command: " + list_command); bp::child list_child(list_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, list_ec); if (list_ec) { logger::write("subprocess error: " + list_ec.message()); bp::child list_child(list_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, list_error); if (list_error) { logger::write("subprocess error: " + list_error.message()); } // Read the list command output until we reach EOF, which is returned as an error boost::system::error_code ec; boost::asio::async_read(std_pipe, buffer, yield[ec]); if (ec != asio::error::eof) { logger::write("OpenStack destroy error: " + ec.message()); boost::system::error_code read_error; boost::asio::async_read(std_pipe, buffer, yield[read_error]); if (read_error != asio::error::eof) { logger::write("OpenStack destroy error: " + read_error.message()); } // Grab exit code from destroy command Loading Loading @@ -56,6 +56,7 @@ namespace OpenStackBuilder { // Fill a set of builders from the property tree data std::set<Builder> builders; try { for (const auto &builder_node : builder_tree) { Builder builder; auto network = builder_node.second.get<std::string>("Networks"); Loading @@ -66,6 +67,9 @@ namespace OpenStackBuilder { builder.port = "8080"; builders.insert(builder); } } catch(const pt::ptree_error &e) { logger::write(std::string() + "Error parsing builders: " + e.what()); } return builders; } Loading @@ -77,19 +81,19 @@ namespace OpenStackBuilder { asio::streambuf buffer; // Asynchronously launch the create command std::error_code create_ec; std::error_code create_error; logger::write("Running command: " + create_command); bp::child build_child(create_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, create_ec); if (create_ec) { logger::write("subprocess error: " + create_ec.message()); create_error); if (create_error) { logger::write("subprocess error: " + create_error.message()); } // Read the create_command output until we reach EOF, which is returned as an error boost::system::error_code read_ec; boost::asio::async_read(std_pipe, buffer, yield[read_ec]); if (read_ec != asio::error::eof) { logger::write("OpenStack create error: " + read_ec.message()); boost::system::error_code read_error; boost::asio::async_read(std_pipe, buffer, yield[read_error]); if (read_error != asio::error::eof) { logger::write("OpenStack create error: " + read_error.message()); } // Grab exit code from builder Loading @@ -108,19 +112,19 @@ namespace OpenStackBuilder { asio::streambuf buffer; // Asynchronously launch the destroy command std::error_code destroy_ec; std::error_code destroy_error; logger::write("Running command: " + destroy_command); bp::child destroy_child(destroy_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, destroy_ec); if (destroy_ec) { logger::write("subprocess error: " + destroy_ec.message()); destroy_error); if (destroy_error) { logger::write("subprocess error: " + destroy_error.message()); } // Read the destroy_command output until we reach EOF, which is returned as an error boost::system::error_code ec; boost::asio::async_read(std_pipe, buffer, yield[ec]); if (ec != asio::error::eof) { logger::write("OpenStack destroy error: " + ec.message()); boost::system::error_code read_ec; boost::asio::async_read(std_pipe, buffer, yield[read_ec]); if (read_ec != asio::error::eof) { logger::write("OpenStack destroy error: " + read_ec.message()); } // Grab exit code from destroy command Loading