Loading BuilderQueue/include/BuilderQueue.h +2 −13 Original line number Diff line number Diff line Loading @@ -10,9 +10,7 @@ class BuilderQueue { public: explicit BuilderQueue(asio::io_service &io_service) : io_service(io_service), max_builders(1), max_cached_builders(max_builders), outstanding_builder_requests(0), active_builders(0) max_available_builders(max_builders) {} // Create a new queue reservation and return it to the requester Loading @@ -26,18 +24,9 @@ private: // Hold reservations that are to be fulfilled std::list<Reservation> reservations; // Queue of cached builders std::queue<Builder> cached_builders; // Maximum number of active and cached builders const std::size_t max_builders; // Maximum number of builders to spin and keep up in reserve const std::size_t max_cached_builders; // Builders that have ben requested but not yet failed/suceeded std::size_t outstanding_builder_requests; // Number of active, and in the process of shutting down after being active, builders std::size_t active_builders; const std::size_t max_available_builders; }; No newline at end of file BuilderQueue/include/OpenStackBuilder.h +3 −2 Original line number Diff line number Diff line #pragma once #include "boost/asio/spawn.hpp" #include "boost/optional.hpp" #include "Builder.h" #include <set> namespace asio = boost::asio; namespace OpenStackBuilder { boost::optional<Builder> request_create(asio::io_service& io_service, asio::yield_context yield); std::set<Builder> get_builders(asio::io_service &io_service, asio::yield_context yield); void request_create(asio::io_service& io_service, asio::yield_context yield); void destroy(Builder builder, asio::io_service& io_service, asio::yield_context yield ); }; No newline at end of file BuilderQueue/src/BuilderQueue.cpp +23 −36 Original line number Diff line number Diff line #include "BuilderQueue.h" #include "Logger.h" #include "OpenStackBuilder.h" #include <set> Reservation &BuilderQueue::enter() { reservations.emplace_back(io_service); return reservations.back(); } // TODO handle error state builders void BuilderQueue::tick(asio::yield_context yield) { // Go through reservations and check for any that are completed // Spin down any completed VM's and remove them from the list of active builders // Update OpenStack builder list auto all_builders = OpenStackBuilder::get_builders(io_service, yield); std::set<Builder> unavailable_builders; // Update reservation information for (auto &reservation : reservations) { if (reservation.complete() && reservation.builder) { if (reservation.builder) { unavailable_builders.insert(reservation.builder.get()); } if (reservation.complete()) { asio::spawn(io_service, [&](asio::yield_context yield) { OpenStackBuilder::destroy(reservation.builder.get(), io_service, yield); //TODO check if the builder was actually destroyed active_builders--; }); } } Loading @@ -27,38 +32,20 @@ void BuilderQueue::tick(asio::yield_context yield) { return res.complete(); }); // Assign any unused builders to outstanding reservations for (auto &reservation : reservations) { if (cached_builders.empty()) { break; } if (reservation.pending()) { auto builder = cached_builders.front(); reservation.ready(builder); cached_builders.pop(); active_builders++; } } // Caclulate the total allowable builders auto all_builder_count = active_builders + cached_builders.size() + outstanding_builder_requests; auto open_slots = max_builders - all_builder_count; // Calculate if any cached slots are available auto open_cache_slots = max_cached_builders - cached_builders.size(); // If slots are available attempt to fill them auto request_count = std::min(open_slots, open_cache_slots); // Available_builders = all_builders - unavailable_builders std::set<Builder> available_builders; std::set_difference(all_builders.begin(), all_builders.end(), unavailable_builders.begin(), unavailable_builders.end(), std::inserter(available_builders, available_builders.begin())); // Spin up builders so we have the requested available auto open_slots = max_builders - all_builders.size(); auto open_available_slots = max_available_builders - available_builders.size(); auto request_count = std::min(open_slots, open_available_slots); for (int i = 0; i < request_count; i++) { outstanding_builder_requests++; asio::spawn(io_service, [&](asio::yield_context yield) { auto opt_builder = OpenStackBuilder::request_create(io_service, yield); if (opt_builder) { cached_builders.push(opt_builder.get()); } outstanding_builder_requests--; OpenStackBuilder::request_create(io_service, yield); }); } } No newline at end of file BuilderQueue/src/OpenStackBuilder.cpp +78 −33 Original line number Diff line number Diff line #include "OpenStackBuilder.h" #include "boost/process.hpp" #include "Logger.h" #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/json_parser.hpp> namespace bp = boost::process; namespace pt = boost::property_tree; namespace OpenStackBuilder { boost::optional<Builder> request_create(asio::io_service& io_service, asio::yield_context yield) { std::set<Builder> get_builders(asio::io_service &io_service, asio::yield_context yield) { std::string list_command("/home/queue/GetBuilders"); bp::group group; bp::async_pipe std_pipe(io_service); asio::streambuf buffer; // Asynchronously launch the list command std::error_code list_ec; logger::write("Running command: " + list_command); bp::child list_child(list_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, list_ec); if (list_ec) { logger::write("subprocess error: " + list_ec.message()); } // Read the list command output until we reach EOF, which is returned as an error boost::system::error_code ec; boost::asio::async_read(std_pipe, buffer, yield[ec]); if (ec != asio::error::eof) { logger::write("OpenStack destroy error: " + ec.message()); } // Grab exit code from destroy command list_child.wait(); int exit_code = list_child.exit_code(); if (exit_code != 0) { logger::write("Failed to fetch server list"); } // Read the json output into a property tree /* [ { "Status": "ACTIVE", "Name": "Builder", "Image": "BuilderImage", "ID": "adeda126-18d4-423f-a499-84651937cdc0", "Flavor": "m1.medium", "Networks": "or_provider_general_extnetwork1=128.219.185.100" } ] */ std::istream is_buffer(&buffer); pt::ptree builder_tree; pt::read_json(is_buffer, builder_tree); // Fill a set of builders from the property tree data std::set<Builder> builders; for (const auto &builder_node : builder_tree) { Builder builder; auto network = builder_node.second.get<std::string>("Networks"); size_t eq_pos = network.find("="); builder.host = network.substr(eq_pos); builder.id = builder_node.second.get<std::string>("ID"); builder.port = "8080"; builders.insert(builder); } return builders; } void request_create(asio::io_service &io_service, asio::yield_context yield) { std::string create_command("/home/queue/RequestCreateBuilder"); bp::group group; bp::async_pipe std_pipe(io_service); Loading @@ -14,7 +79,8 @@ namespace OpenStackBuilder { // Asynchronously launch the create command std::error_code create_ec; logger::write("Running command: " + create_command); bp::child build_child(create_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, create_ec); bp::child build_child(create_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, create_ec); if (create_ec) { logger::write("subprocess error: " + create_ec.message()); } Loading @@ -30,31 +96,9 @@ namespace OpenStackBuilder { build_child.wait(); int exit_code = build_child.exit_code(); boost::optional<Builder> opt_builder; // If successful parse the output for the newly created builders ID, IP address, and port if(exit_code == 0) { Builder builder; std::istream stream(&buffer); std::string line; std::getline(stream, builder.id); if(stream.fail() || stream.bad() || stream.eof()) { return opt_builder; } std::getline(stream, builder.host); if(stream.fail() || stream.bad() || stream.eof()) { return opt_builder; } std::getline(stream, builder.port); if(stream.fail() || stream.bad()) { return opt_builder; } opt_builder = builder; if (exit_code != 0) { logger::write("Error in making call to RequestBuilder"); } return opt_builder; } void destroy(Builder builder, asio::io_service &io_service, asio::yield_context yield) { Loading @@ -66,7 +110,8 @@ namespace OpenStackBuilder { // Asynchronously launch the destroy command std::error_code destroy_ec; logger::write("Running command: " + destroy_command); bp::child destroy_child(destroy_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, destroy_ec); bp::child destroy_child(destroy_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, destroy_ec); if (destroy_ec) { logger::write("subprocess error: " + destroy_ec.message()); } Loading CMakeLists.txt +2 −2 Original line number Diff line number Diff line Loading @@ -22,7 +22,7 @@ set(SOURCE_FILES_QUEUE Common/src/Messenger.cpp Common/include/Messenger.h BuilderQueue/src/OpenStackBuilder.cpp BuilderQueue/include/OpenStackBuilder.h) BuilderQueue/include/OpenStackBuilder.h Common/src/Builder.cpp) set(SOURCE_FILES_BUILDER Builder/main.cpp Loading @@ -39,7 +39,7 @@ set(SOURCE_FILES_CLIENT Common/src/Logger.cpp Common/include/Logger.h Common/src/Messenger.cpp Common/include/Messenger.h) Common/include/Messenger.h Common/src/Builder.cpp) # Create executables add_executable(BuilderQueue ${SOURCE_FILES_QUEUE}) Loading Loading
BuilderQueue/include/BuilderQueue.h +2 −13 Original line number Diff line number Diff line Loading @@ -10,9 +10,7 @@ class BuilderQueue { public: explicit BuilderQueue(asio::io_service &io_service) : io_service(io_service), max_builders(1), max_cached_builders(max_builders), outstanding_builder_requests(0), active_builders(0) max_available_builders(max_builders) {} // Create a new queue reservation and return it to the requester Loading @@ -26,18 +24,9 @@ private: // Hold reservations that are to be fulfilled std::list<Reservation> reservations; // Queue of cached builders std::queue<Builder> cached_builders; // Maximum number of active and cached builders const std::size_t max_builders; // Maximum number of builders to spin and keep up in reserve const std::size_t max_cached_builders; // Builders that have ben requested but not yet failed/suceeded std::size_t outstanding_builder_requests; // Number of active, and in the process of shutting down after being active, builders std::size_t active_builders; const std::size_t max_available_builders; }; No newline at end of file
BuilderQueue/include/OpenStackBuilder.h +3 −2 Original line number Diff line number Diff line #pragma once #include "boost/asio/spawn.hpp" #include "boost/optional.hpp" #include "Builder.h" #include <set> namespace asio = boost::asio; namespace OpenStackBuilder { boost::optional<Builder> request_create(asio::io_service& io_service, asio::yield_context yield); std::set<Builder> get_builders(asio::io_service &io_service, asio::yield_context yield); void request_create(asio::io_service& io_service, asio::yield_context yield); void destroy(Builder builder, asio::io_service& io_service, asio::yield_context yield ); }; No newline at end of file
BuilderQueue/src/BuilderQueue.cpp +23 −36 Original line number Diff line number Diff line #include "BuilderQueue.h" #include "Logger.h" #include "OpenStackBuilder.h" #include <set> Reservation &BuilderQueue::enter() { reservations.emplace_back(io_service); return reservations.back(); } // TODO handle error state builders void BuilderQueue::tick(asio::yield_context yield) { // Go through reservations and check for any that are completed // Spin down any completed VM's and remove them from the list of active builders // Update OpenStack builder list auto all_builders = OpenStackBuilder::get_builders(io_service, yield); std::set<Builder> unavailable_builders; // Update reservation information for (auto &reservation : reservations) { if (reservation.complete() && reservation.builder) { if (reservation.builder) { unavailable_builders.insert(reservation.builder.get()); } if (reservation.complete()) { asio::spawn(io_service, [&](asio::yield_context yield) { OpenStackBuilder::destroy(reservation.builder.get(), io_service, yield); //TODO check if the builder was actually destroyed active_builders--; }); } } Loading @@ -27,38 +32,20 @@ void BuilderQueue::tick(asio::yield_context yield) { return res.complete(); }); // Assign any unused builders to outstanding reservations for (auto &reservation : reservations) { if (cached_builders.empty()) { break; } if (reservation.pending()) { auto builder = cached_builders.front(); reservation.ready(builder); cached_builders.pop(); active_builders++; } } // Caclulate the total allowable builders auto all_builder_count = active_builders + cached_builders.size() + outstanding_builder_requests; auto open_slots = max_builders - all_builder_count; // Calculate if any cached slots are available auto open_cache_slots = max_cached_builders - cached_builders.size(); // If slots are available attempt to fill them auto request_count = std::min(open_slots, open_cache_slots); // Available_builders = all_builders - unavailable_builders std::set<Builder> available_builders; std::set_difference(all_builders.begin(), all_builders.end(), unavailable_builders.begin(), unavailable_builders.end(), std::inserter(available_builders, available_builders.begin())); // Spin up builders so we have the requested available auto open_slots = max_builders - all_builders.size(); auto open_available_slots = max_available_builders - available_builders.size(); auto request_count = std::min(open_slots, open_available_slots); for (int i = 0; i < request_count; i++) { outstanding_builder_requests++; asio::spawn(io_service, [&](asio::yield_context yield) { auto opt_builder = OpenStackBuilder::request_create(io_service, yield); if (opt_builder) { cached_builders.push(opt_builder.get()); } outstanding_builder_requests--; OpenStackBuilder::request_create(io_service, yield); }); } } No newline at end of file
BuilderQueue/src/OpenStackBuilder.cpp +78 −33 Original line number Diff line number Diff line #include "OpenStackBuilder.h" #include "boost/process.hpp" #include "Logger.h" #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/json_parser.hpp> namespace bp = boost::process; namespace pt = boost::property_tree; namespace OpenStackBuilder { boost::optional<Builder> request_create(asio::io_service& io_service, asio::yield_context yield) { std::set<Builder> get_builders(asio::io_service &io_service, asio::yield_context yield) { std::string list_command("/home/queue/GetBuilders"); bp::group group; bp::async_pipe std_pipe(io_service); asio::streambuf buffer; // Asynchronously launch the list command std::error_code list_ec; logger::write("Running command: " + list_command); bp::child list_child(list_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, list_ec); if (list_ec) { logger::write("subprocess error: " + list_ec.message()); } // Read the list command output until we reach EOF, which is returned as an error boost::system::error_code ec; boost::asio::async_read(std_pipe, buffer, yield[ec]); if (ec != asio::error::eof) { logger::write("OpenStack destroy error: " + ec.message()); } // Grab exit code from destroy command list_child.wait(); int exit_code = list_child.exit_code(); if (exit_code != 0) { logger::write("Failed to fetch server list"); } // Read the json output into a property tree /* [ { "Status": "ACTIVE", "Name": "Builder", "Image": "BuilderImage", "ID": "adeda126-18d4-423f-a499-84651937cdc0", "Flavor": "m1.medium", "Networks": "or_provider_general_extnetwork1=128.219.185.100" } ] */ std::istream is_buffer(&buffer); pt::ptree builder_tree; pt::read_json(is_buffer, builder_tree); // Fill a set of builders from the property tree data std::set<Builder> builders; for (const auto &builder_node : builder_tree) { Builder builder; auto network = builder_node.second.get<std::string>("Networks"); size_t eq_pos = network.find("="); builder.host = network.substr(eq_pos); builder.id = builder_node.second.get<std::string>("ID"); builder.port = "8080"; builders.insert(builder); } return builders; } void request_create(asio::io_service &io_service, asio::yield_context yield) { std::string create_command("/home/queue/RequestCreateBuilder"); bp::group group; bp::async_pipe std_pipe(io_service); Loading @@ -14,7 +79,8 @@ namespace OpenStackBuilder { // Asynchronously launch the create command std::error_code create_ec; logger::write("Running command: " + create_command); bp::child build_child(create_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, create_ec); bp::child build_child(create_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, create_ec); if (create_ec) { logger::write("subprocess error: " + create_ec.message()); } Loading @@ -30,31 +96,9 @@ namespace OpenStackBuilder { build_child.wait(); int exit_code = build_child.exit_code(); boost::optional<Builder> opt_builder; // If successful parse the output for the newly created builders ID, IP address, and port if(exit_code == 0) { Builder builder; std::istream stream(&buffer); std::string line; std::getline(stream, builder.id); if(stream.fail() || stream.bad() || stream.eof()) { return opt_builder; } std::getline(stream, builder.host); if(stream.fail() || stream.bad() || stream.eof()) { return opt_builder; } std::getline(stream, builder.port); if(stream.fail() || stream.bad()) { return opt_builder; } opt_builder = builder; if (exit_code != 0) { logger::write("Error in making call to RequestBuilder"); } return opt_builder; } void destroy(Builder builder, asio::io_service &io_service, asio::yield_context yield) { Loading @@ -66,7 +110,8 @@ namespace OpenStackBuilder { // Asynchronously launch the destroy command std::error_code destroy_ec; logger::write("Running command: " + destroy_command); bp::child destroy_child(destroy_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, destroy_ec); bp::child destroy_child(destroy_command, bp::std_in.close(), (bp::std_out & bp::std_err) > std_pipe, group, destroy_ec); if (destroy_ec) { logger::write("subprocess error: " + destroy_ec.message()); } Loading
CMakeLists.txt +2 −2 Original line number Diff line number Diff line Loading @@ -22,7 +22,7 @@ set(SOURCE_FILES_QUEUE Common/src/Messenger.cpp Common/include/Messenger.h BuilderQueue/src/OpenStackBuilder.cpp BuilderQueue/include/OpenStackBuilder.h) BuilderQueue/include/OpenStackBuilder.h Common/src/Builder.cpp) set(SOURCE_FILES_BUILDER Builder/main.cpp Loading @@ -39,7 +39,7 @@ set(SOURCE_FILES_CLIENT Common/src/Logger.cpp Common/include/Logger.h Common/src/Messenger.cpp Common/include/Messenger.h) Common/include/Messenger.h Common/src/Builder.cpp) # Create executables add_executable(BuilderQueue ${SOURCE_FILES_QUEUE}) Loading