YouDunIt/modules/you_done_it/ydi_client.cpp

117 lines
3.1 KiB
C++

#include "ydi_client.h"
#include "core/string/print_string.h"
#include "you_done_it/ydi_networking.h"
#include <atomic>
#include <mutex>
#include <optional>
#include <thread>
#include <zmq.hpp>
#include <zmq_addon.hpp>
namespace ydi::client {
struct Connection {
std::optional<std::string> server;
std::optional<zmq::context_t> context{ std::nullopt };
std::optional<zmq::socket_t> socket{ std::nullopt };
std::recursive_mutex mtx;
std::atomic<NetworkData::ConnectionStatus> status;
std::atomic<bool> stop_threads;
};
std::optional<Connection> connection{ std::nullopt };
std::optional<std::thread> receive_thread{ std::nullopt };
void handle_ok(zmq::multipart_t const &message) {
NetworkData::MessageType type{ to_message_type(message[1]) };
switch (type) {
default: // no need to handle every OK, just some relevant ones
return;
case NetworkData::MSG_CONNECT:
connection->status = NetworkData::CONNECTION_AUTHENTICATED;
return;
}
}
void handle_message(zmq::multipart_t const &message) {
NetworkData::MessageType type{ to_message_type(message[0]) };
switch (type) {
case NetworkData::MSG_OK:
handle_ok(message);
case NetworkData::MSG_HEART:
multipart(NetworkData::MSG_BEAT).send(*connection->socket);
default:
print_error(vformat("Client: Received unhandled message: ", type, message[0].to_string().c_str()));
return;
}
}
void receive_thread_entry() {
zmq::multipart_t message{};
while (!connection->stop_threads) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(10ms);
std::scoped_lock lock{ connection->mtx };
if (message.recv(*connection->socket)) {
handle_message(message);
}
}
}
void connect(String const &url) {
connection.emplace();
print_line("Client: Connecting to ", url);
try {
connection->context.emplace(1);
} catch (...) {
connection.reset();
print_line("Client: Failed to create context");
return;
}
print_line("Client: Created context");
try {
connection->socket.emplace(*connection->context, zmq::socket_type::dealer);
} catch (...) {
connection->context->close();
connection->context.reset();
connection.reset();
print_line("Client: Failed to create socket");
return;
}
print_line("Client: Created socket");
try {
CharStringT<char> cstrurl{ url.ascii() };
std::string server{ cstrurl.get_data() };
server = "tcp://" + server + ":6667";
connection->socket->connect(server);
connection->server = server;
} catch (...) {
connection->socket->close();
connection->socket.reset();
connection->context->close();
connection->context.reset();
connection.reset();
print_line("Client: Failed to connect to server");
}
print_line("Client: connected to server");
receive_thread.emplace(receive_thread_entry);
}
void disconnect() {
if (connection) {
std::scoped_lock lock{ connection->mtx };
if (receive_thread && receive_thread->joinable()) {
receive_thread->join();
}
if (connection->socket) {
connection->socket->close();
connection->socket.reset();
}
if (connection->context) {
connection->context->shutdown();
connection->context.reset();
}
}
connection.reset();
}
} //namespace ydi::client