#include "ydi_server.h" #include "core/os/time.h" #include "core/templates/vector.h" #include "you_done_it/ydi_networking.h" #include "zmq.hpp" #include "zmq_addon.hpp" #include #include #include #include #include namespace ydi::server { struct Service { std::optional context{ std::nullopt }; std::optional socket{ std::nullopt }; std::unordered_set revealed_clues{}; std::recursive_mutex mtx{}; std::optional client{ std::nullopt }; Vector new_clues{}; double lastHeart{ 0.0 }; double lastBeat{ 0.0 }; std::atomic stop_threads{ false }; }; std::optional receive_thread{ std::nullopt }; std::optional ping_thread{ std::nullopt }; std::optional service{ std::nullopt }; void handle_reveal_clue(zmq::multipart_t const &message) { service->new_clues.push_back(to_clue_id(message.at(2))); } void handle_authorised_message(std::string_view const &sender, NetworkData::MessageType type, zmq::multipart_t &message) { switch (type) { case NetworkData::MSG_BEAT: print_line("Server: Received beat, storing timestamp"); service->lastBeat = Time::get_singleton()->get_unix_time_from_system(); return; case NetworkData::MSG_REVEAL: print_line("Server: Received revealed clue, ", (int)to_clue_id(message.at(2))); handle_reveal_clue(message); return; default: print_line("Server: Encountered unknown message type, sending NOK_UNKNOWN_MSG response"); multipart(sender, NetworkData::MSG_NOK, NetworkData::NOK_UNKNOWN_MSG, message).send(*service->socket); return; } } void ping_thread_entry() { using namespace std::chrono_literals; { std::scoped_lock lock{ service->mtx }; if (!service->client) { return; } } while (!service->stop_threads) { std::this_thread::sleep_for(1s); std::scoped_lock lock{ service->mtx }; print_line("Server: Send HEART"); multipart(*service->client, NetworkData::MSG_HEART).send(*service->socket); service->lastHeart = Time::get_singleton()->get_unix_time_from_system(); } } void handle_message(zmq::multipart_t &message) { print_line("Server handle_message:"); print_message_contents(message); std::string_view const sender{ message.at(0).to_string_view() }; NetworkData::MessageType type{ to_message_type(message.at(1)) }; std::scoped_lock lock{ service->mtx }; if (service->client) { if (sender == service->client) { handle_authorised_message(sender, type, message); } else { multipart(sender, NetworkData::MSG_NOK, NetworkData::NOK_UNAUTHENTICATED, std::pair{ message.begin() + 1, message.end() }).send(*service->socket); } } else if (type == NetworkData::MSG_CONNECT) { service->client.emplace(sender); multipart(sender, NetworkData::MSG_OK, std::pair{ message.begin() + 1, message.end() }).send(*service->socket); ping_thread.emplace(ping_thread_entry); } else { multipart(sender, NetworkData::MSG_NOK, NetworkData::NOK_OUT_OF_CONTEXT, std::pair{ message.begin() + 1, message.end() }).send(*service->socket); } } void receive_thread_entry() { using namespace std::chrono_literals; zmq::multipart_t incoming{}; while (!service->stop_threads) { std::this_thread::sleep_for(20ms); std::scoped_lock lock{ service->mtx }; if (incoming.recv(*service->socket, (int)zmq::recv_flags::dontwait)) { handle_message(incoming); } } } void open() { if (service) { print_error("Server: Detected attempt to open duplicate Server, exiting without action."); return; } print_line("Server: Starting"); service.emplace(); try { service->context.emplace(1); } catch (...) { service.reset(); print_line("Server: Failed to create context"); return; } print_line("Server: Created zmq context"); try { service->socket.emplace(*service->context, zmq::socket_type::router); } catch (...) { service->context->close(); service->context.reset(); service.reset(); print_line("Server: Failed to create socket"); return; } print_line("Server: Created socket"); try { service->socket->bind("tcp://*:6667"); } catch (...) { service->socket->close(); service->socket.reset(); service->context->close(); service->context.reset(); service.reset(); print_line("Server: Failed to bind socket"); return; } print_line("Server: Bound socket"); receive_thread.emplace(receive_thread_entry); assert(receive_thread->joinable()); print_line("Server: Startup complete!"); } void close() { if (service) { print_line("Server: Shutting down..."); service->stop_threads = true; print_line("Server: Stopping Threads..."); if (receive_thread && receive_thread->joinable()) { receive_thread->join(); } print_line("Server: Receive thread stopped"); if (ping_thread && ping_thread->joinable()) { ping_thread->join(); } print_line("Server: Ping thread stopped"); if (service->socket) { service->socket->close(); service->socket.reset(); } print_line("Server: Socket closed"); if (service->context) { service->context->close(); service->context.reset(); } print_line("Server: Context closed"); service.reset(); print_line("Server: Shutdown complete!"); } } bool has_client() { if (service) { std::scoped_lock lock{ service->mtx }; return service->client.has_value(); } else { return false; } } namespace receive { bool new_clues(Vector &out) { std::scoped_lock lock{ service->mtx }; bool const has_new{ !service->new_clues.is_empty() }; if (has_new) { out.append_array(service->new_clues); service->new_clues.clear(); } return has_new; } } //namespace receive } //namespace ydi::server