YouDunIt/modules/you_done_it/ydi_server.cpp

164 lines
4.4 KiB
C++

#include "ydi_server.h"
#include "ydi_networking.h"
#include <core/os/time.h>
#include <core/templates/vector.h>
#include <atomic>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <unordered_set>
#include <zmq.hpp>
#include <zmq_addon.hpp>
namespace ydi::server {
struct Event {
MessageType type;
union {
ClueID clue;
};
};
struct Service {
std::optional<zmq::context_t> context{ std::nullopt };
std::optional<zmq::socket_t> socket{ std::nullopt };
std::unordered_set<ClueID> revealed_clues{};
std::recursive_mutex mtx{};
std::optional<std::string> client{ std::nullopt };
std::queue<Event> unhandled_events{};
double lastHeart{ 0.0 };
double lastBeat{ 0.0 };
std::atomic<bool> stop_threads{ false };
};
std::optional<std::thread> receive_thread{ std::nullopt };
std::optional<std::thread> ping_thread{ std::nullopt };
std::optional<Service> service{ std::nullopt };
void handle_reveal_clue(zmq::multipart_t const &message) {
Event evt{ .type = REVEAL, .clue = to_clue_id(message.at(2)) };
service->unhandled_events.push(evt);
}
void handle_authorised_message(std::string_view const &sender, MessageType type, zmq::multipart_t &message) {
if (type == BEAT) {
service->lastBeat = Time::get_singleton()->get_unix_time_from_system();
} else if (type == REVEAL) {
handle_reveal_clue(message);
} else {
multipart(sender, "NOK", "UNKOWN_COMMAND", message).send(*service->socket);
}
}
void handle_message(zmq::multipart_t &message) {
std::string_view const sender{ message.at(0).to_string_view() };
MessageType type{ to_message_type(message.at(1)) };
std::scoped_lock lock{ service->mtx };
if (service->client) {
if (sender == service->client) {
handle_authorised_message(sender, type, message);
} else {
multipart(sender, "NOK", "UNAUTHORIZED_REQUEST").send(*service->socket);
}
} else if (type == CONNECT) {
service->client.emplace(sender);
multipart(sender, OK, message).send(*service->socket);
} else {
multipart(sender, NOK, "UNAUTHORIZED_REQUEST", message).send(*service->socket);
}
}
void receive_thread_entry() {
using namespace std::chrono_literals;
zmq::multipart_t incoming{};
while (service->stop_threads) {
std::this_thread::sleep_for(20ms);
std::scoped_lock lock{ service->mtx };
if (incoming.recv(*service->socket)) {
handle_message(incoming);
}
}
}
void ping_thread_entry() {
using namespace std::chrono_literals;
{
std::scoped_lock lock{ service->mtx };
if (!service->client) {
return;
}
}
static zmq::multipart_t ping{ multipart(*service->client, "HEART") };
while (!service->stop_threads) {
std::this_thread::sleep_for(1s);
std::scoped_lock lock{ service->mtx };
ping.send(*service->socket);
service->lastHeart = Time::get_singleton()->get_unix_time_from_system();
}
}
void open() {
print_line("Server: Starting");
service.emplace();
try {
service->context.emplace(1);
} catch (...) {
service.reset();
print_line("Server: Failed to create context");
return;
}
print_line("Server: Created zmq context");
try {
service->socket.emplace(*service->context, zmq::socket_type::router);
} catch (...) {
service->context->close();
service->context.reset();
service.reset();
print_line("Server: Failed to create socket");
return;
}
print_line("Server: Created socket");
try {
service->socket->bind("tcp://*:6667");
} catch (...) {
service->socket->close();
service->socket.reset();
service->context->close();
service->context.reset();
service.reset();
print_line("Server: Failed to bind socket");
return;
}
print_line("Server: Bound socket");
receive_thread.emplace(receive_thread_entry);
}
void close() {
if (service) {
print_line("Server: Shutting down...");
std::scoped_lock lock{ service->mtx };
service->stop_threads = true;
print_line("Server: Stopping Threads...");
if (receive_thread && receive_thread->joinable()) {
receive_thread->join();
}
print_line("Server: Receive thread stopped");
if (ping_thread && ping_thread->joinable()) {
ping_thread->join();
}
print_line("Server: Ping thread stopped");
if (service->socket) {
service->socket->close();
service->socket.reset();
}
print_line("Server: Socket closed");
if (service->context) {
service->context->close();
service->context.reset();
}
print_line("Server: Context closed");
service.reset();
print_line("Server: Shutdown complete!");
}
}
} //namespace ydi::server