diff --git a/CMakeLists.txt b/CMakeLists.txt index 62aa90b..8a4cb3b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,8 @@ set(POLYMARKET_CLIENT_SOURCES src/http_client.cpp src/json_rpc_client.cpp src/websocket_client.cpp + src/websocket_client_resilience.cpp + src/websocket_resilience.cpp src/market_fetcher.cpp src/orderbook.cpp src/clob_order_execution.cpp @@ -177,6 +179,11 @@ if(POLYMARKET_CLIENT_BUILD_TESTS) add_executable(test_sdk_error tests/test_sdk_error.cpp) target_link_libraries(test_sdk_error PRIVATE polymarket::client) add_test(NAME test_sdk_error COMMAND test_sdk_error) + + add_executable(test_websocket_resilience tests/test_websocket_resilience.cpp) + target_include_directories(test_websocket_resilience PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src) + target_link_libraries(test_websocket_resilience PRIVATE polymarket::client) + add_test(NAME test_websocket_resilience COMMAND test_websocket_resilience) endif() # Install library, headers, and dependency targets into a single export set diff --git a/README.md b/README.md index 4bf3a2b..6c07ad3 100644 --- a/README.md +++ b/README.md @@ -270,6 +270,35 @@ client.stop_heartbeat(); **Expected gains**: First request ~40-60ms → subsequent requests ~25-35ms. +## WebSocket Resilience + +`WebSocketClient` supports additive production-safety options for market-data +consumers: automatic reconnect backoff, ping interval, bounded message queue, +subscription replay, typed message callbacks, and counters for reconnects, +dropped messages, parse errors, last message time, messages, and bytes. + +```cpp +polymarket::WebSocketClient ws; +polymarket::WebSocketOptions options; +options.message_queue_limit = 4096; +options.min_backoff_ms = 250; +options.max_backoff_ms = 5000; +ws.configure(options); + +ws.on_message([](const std::string& raw) { + // Raw callback remains available. +}); +ws.on_typed_message([](const polymarket::TypedWebSocketMessage& msg) { + if (msg.topic == "clob_market" && msg.type == "agg_orderbook") { + // Use msg.asset_id and msg.payload. + } +}); +``` + +Call `track_subscription(subscription_json)` after sending a subscription if +you use `WebSocketClient` directly. `OrderbookManager` tracks its subscription +message and restores it automatically after reconnect. + ## Neg-Risk Markets The client automatically detects neg_risk markets and uses the appropriate exchange address for order signing: diff --git a/include/websocket_client.hpp b/include/websocket_client.hpp index c98453c..f6b9cb2 100644 --- a/include/websocket_client.hpp +++ b/include/websocket_client.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include namespace polymarket @@ -28,6 +29,43 @@ namespace polymarket using OnDisconnectCallback = std::function; using OnErrorCallback = std::function; + struct WebSocketOptions + { + bool reconnect_enabled{true}; + int max_reconnect_attempts{0}; + uint32_t min_backoff_ms{250}; + uint32_t max_backoff_ms{10000}; + int ping_interval_ms{5000}; + std::size_t message_queue_limit{1024}; + }; + + struct WebSocketStats + { + uint64_t messages_received{0}; + uint64_t bytes_received{0}; + uint64_t reconnects{0}; + uint64_t dropped_messages{0}; + uint64_t parse_errors{0}; + uint64_t last_message_time_ns{0}; + }; + + struct TypedWebSocketMessage + { + std::string topic; + std::string type; + std::string event_type; + std::string asset_id; + std::string payload; + std::string raw; + }; + + using OnTypedMessageCallback = std::function; + + namespace detail + { + class BoundedMessageQueue; + } + // High-performance WebSocket client using IXWebSocket class WebSocketClient { @@ -43,9 +81,12 @@ namespace polymarket void set_url(const std::string &url); void set_ping_interval_ms(int interval_ms); void set_auto_reconnect(bool enabled); + void configure(const WebSocketOptions &options); + WebSocketOptions options() const; // Callbacks void on_message(OnMessageCallback callback); + void on_typed_message(OnTypedMessageCallback callback); void on_connect(OnConnectCallback callback); void on_disconnect(OnDisconnectCallback callback); void on_error(OnErrorCallback callback); @@ -58,6 +99,10 @@ namespace polymarket // Send message bool send(const std::string &message); + void track_subscription(const std::string &message); + void untrack_subscription(const std::string &message); + void clear_subscriptions(); + std::size_t tracked_subscription_count() const; // Run event loop (blocking) - IXWebSocket runs in its own thread void run(); @@ -68,29 +113,47 @@ namespace polymarket // Get statistics uint64_t messages_received() const { return messages_received_.load(); } uint64_t bytes_received() const { return bytes_received_.load(); } + WebSocketStats stats() const; private: ix::WebSocket ws_; + std::unique_ptr message_queue_; // Configuration std::string url_; - int ping_interval_ms_; - bool auto_reconnect_; + WebSocketOptions options_; // State std::atomic state_; std::atomic running_; std::atomic should_stop_; + std::atomic worker_running_{false}; + std::atomic has_connected_{false}; + std::thread message_worker_; // Callbacks OnMessageCallback on_message_cb_; + OnTypedMessageCallback on_typed_message_cb_; OnConnectCallback on_connect_cb_; OnDisconnectCallback on_disconnect_cb_; OnErrorCallback on_error_cb_; + mutable std::mutex subscriptions_mutex_; + std::vector subscriptions_; + // Statistics std::atomic messages_received_{0}; std::atomic bytes_received_{0}; + std::atomic reconnects_{0}; + std::atomic parse_errors_{0}; + std::atomic last_message_time_ns_{0}; + + void apply_options(); + void start_message_worker(); + void stop_message_worker(); + void enqueue_message(const std::string &message); + void dispatch_message(const std::string &message); + void restore_subscriptions(); }; } // namespace polymarket diff --git a/src/orderbook.cpp b/src/orderbook.cpp index 32be0b5..a6ee717 100644 --- a/src/orderbook.cpp +++ b/src/orderbook.cpp @@ -23,7 +23,10 @@ namespace polymarket ws_.on_connect([this]() { std::cout << "[WS] Connected to orderbook stream" << std::endl; - send_subscribe_message(); }); + if (ws_.stats().reconnects == 0 || ws_.tracked_subscription_count() == 0) + { + send_subscribe_message(); + } }); ws_.on_disconnect([this]() { std::cout << "[WS] Disconnected from orderbook stream" << std::endl; }); @@ -62,6 +65,12 @@ namespace polymarket std::cout << "[OrderbookManager] Subscribed to market: " << market.slug << " (YES: " << market.token_yes.substr(0, 16) << "...)" << std::endl; + + ws_.clear_subscriptions(); + if (ws_.is_connected()) + { + send_subscribe_message(); + } } void OrderbookManager::unsubscribe(const std::string &token_id) @@ -70,6 +79,11 @@ namespace polymarket if (it != subscribed_tokens_.end()) { subscribed_tokens_.erase(it); + ws_.clear_subscriptions(); + if (ws_.is_connected()) + { + send_subscribe_message(); + } } std::unique_lock lock(orderbooks_mutex_); @@ -79,6 +93,7 @@ namespace polymarket void OrderbookManager::unsubscribe_all() { subscribed_tokens_.clear(); + ws_.clear_subscriptions(); { std::unique_lock lock(orderbooks_mutex_); @@ -183,6 +198,7 @@ namespace polymarket std::string msg = subscribe_msg.dump(); std::cout << "[WS] Sending subscribe: " << subscribed_tokens_.size() << " tokens" << std::endl; + ws_.track_subscription(msg); ws_.send(msg); } diff --git a/src/websocket_client.cpp b/src/websocket_client.cpp index 3b1a7e3..4474c65 100644 --- a/src/websocket_client.cpp +++ b/src/websocket_client.cpp @@ -1,13 +1,18 @@ #include "websocket_client.hpp" #include "types.hpp" +#include "websocket_resilience.hpp" #include namespace polymarket { WebSocketClient::WebSocketClient() - : ping_interval_ms_(5000), auto_reconnect_(true), state_(WsState::DISCONNECTED), running_(false), should_stop_(false) + : message_queue_(std::make_unique(options_.message_queue_limit)), + state_(WsState::DISCONNECTED), + running_(false), + should_stop_(false) { + apply_options(); } WebSocketClient::~WebSocketClient() @@ -23,18 +28,14 @@ namespace polymarket void WebSocketClient::set_ping_interval_ms(int interval_ms) { - ping_interval_ms_ = interval_ms; - ws_.setPingInterval(interval_ms); + options_.ping_interval_ms = interval_ms; + apply_options(); } void WebSocketClient::set_auto_reconnect(bool enabled) { - auto_reconnect_ = enabled; - ws_.enableAutomaticReconnection(); - if (!enabled) - { - ws_.disableAutomaticReconnection(); - } + options_.reconnect_enabled = enabled; + apply_options(); } void WebSocketClient::on_message(OnMessageCallback callback) @@ -42,6 +43,11 @@ namespace polymarket on_message_cb_ = std::move(callback); } + void WebSocketClient::on_typed_message(OnTypedMessageCallback callback) + { + on_typed_message_cb_ = std::move(callback); + } + void WebSocketClient::on_connect(OnConnectCallback callback) { on_connect_cb_ = std::move(callback); @@ -71,6 +77,11 @@ namespace polymarket { case ix::WebSocketMessageType::Open: state_.store(WsState::CONNECTED); + if (has_connected_.exchange(true)) + { + reconnects_++; + } + restore_subscriptions(); if (on_connect_cb_) { on_connect_cb_(); @@ -78,7 +89,7 @@ namespace polymarket break; case ix::WebSocketMessageType::Close: - state_.store(WsState::DISCONNECTED); + state_.store(options_.reconnect_enabled ? WsState::RECONNECTING : WsState::DISCONNECTED); if (on_disconnect_cb_) { on_disconnect_cb_(); @@ -86,7 +97,7 @@ namespace polymarket break; case ix::WebSocketMessageType::Error: - state_.store(WsState::DISCONNECTED); + state_.store(options_.reconnect_enabled ? WsState::RECONNECTING : WsState::DISCONNECTED); if (on_error_cb_) { on_error_cb_(msg->errorInfo.reason); @@ -96,10 +107,8 @@ namespace polymarket case ix::WebSocketMessageType::Message: messages_received_++; bytes_received_ += msg->str.size(); - if (on_message_cb_) - { - on_message_cb_(msg->str); - } + last_message_time_ns_.store(now_ns()); + enqueue_message(msg->str); break; case ix::WebSocketMessageType::Ping: @@ -110,6 +119,7 @@ namespace polymarket } }); state_.store(WsState::CONNECTING); + start_message_worker(); ws_.start(); return true; @@ -119,6 +129,7 @@ namespace polymarket { state_.store(WsState::CLOSING); ws_.stop(); + stop_message_worker(); state_.store(WsState::DISCONNECTED); } diff --git a/src/websocket_client_resilience.cpp b/src/websocket_client_resilience.cpp new file mode 100644 index 0000000..7b6655b --- /dev/null +++ b/src/websocket_client_resilience.cpp @@ -0,0 +1,149 @@ +#include "websocket_client.hpp" +#include "websocket_resilience.hpp" +#include + +namespace polymarket +{ + void WebSocketClient::configure(const WebSocketOptions &options) + { + options_ = options; + apply_options(); + message_queue_->reset(options_.message_queue_limit); + } + + WebSocketOptions WebSocketClient::options() const + { + return options_; + } + + void WebSocketClient::track_subscription(const std::string &message) + { + std::lock_guard lock(subscriptions_mutex_); + if (std::find(subscriptions_.begin(), subscriptions_.end(), message) == subscriptions_.end()) + { + subscriptions_.push_back(message); + } + } + + void WebSocketClient::untrack_subscription(const std::string &message) + { + std::lock_guard lock(subscriptions_mutex_); + subscriptions_.erase(std::remove(subscriptions_.begin(), subscriptions_.end(), message), subscriptions_.end()); + } + + void WebSocketClient::clear_subscriptions() + { + std::lock_guard lock(subscriptions_mutex_); + subscriptions_.clear(); + } + + std::size_t WebSocketClient::tracked_subscription_count() const + { + std::lock_guard lock(subscriptions_mutex_); + return subscriptions_.size(); + } + + WebSocketStats WebSocketClient::stats() const + { + return { + messages_received_.load(), + bytes_received_.load(), + reconnects_.load(), + message_queue_->dropped(), + parse_errors_.load(), + last_message_time_ns_.load()}; + } + + void WebSocketClient::apply_options() + { + const int ping_seconds = std::max(1, options_.ping_interval_ms / 1000); + ws_.setPingInterval(ping_seconds); + ws_.setMinWaitBetweenReconnectionRetries(options_.min_backoff_ms); + ws_.setMaxWaitBetweenReconnectionRetries(options_.max_backoff_ms); + if (options_.reconnect_enabled) + { + ws_.enableAutomaticReconnection(); + } + else + { + ws_.disableAutomaticReconnection(); + } + } + + void WebSocketClient::start_message_worker() + { + if (worker_running_.load()) + { + return; + } + + worker_running_.store(true); + message_worker_ = std::thread([this]() + { + while (worker_running_.load()) + { + auto message = message_queue_->pop(); + if (!message) + { + break; + } + dispatch_message(*message); + } }); + } + + void WebSocketClient::stop_message_worker() + { + message_queue_->close(); + worker_running_.store(false); + if (message_worker_.joinable()) + { + message_worker_.join(); + } + message_queue_->reset(options_.message_queue_limit); + } + + void WebSocketClient::enqueue_message(const std::string &message) + { + message_queue_->push(message); + } + + void WebSocketClient::dispatch_message(const std::string &message) + { + if (on_message_cb_) + { + on_message_cb_(message); + } + + if (!on_typed_message_cb_) + { + return; + } + + try + { + auto typed = detail::parse_typed_message(message); + if (typed) + { + on_typed_message_cb_(*typed); + } + } + catch (...) + { + parse_errors_++; + } + } + + void WebSocketClient::restore_subscriptions() + { + std::vector subscriptions; + { + std::lock_guard lock(subscriptions_mutex_); + subscriptions = subscriptions_; + } + + for (const auto &message : subscriptions) + { + ws_.send(message); + } + } +} diff --git a/src/websocket_resilience.cpp b/src/websocket_resilience.cpp new file mode 100644 index 0000000..b24221e --- /dev/null +++ b/src/websocket_resilience.cpp @@ -0,0 +1,102 @@ +#include "websocket_resilience.hpp" +#include + +using json = nlohmann::json; + +namespace polymarket::detail +{ + BoundedMessageQueue::BoundedMessageQueue(std::size_t limit) + : limit_(limit == 0 ? 1 : limit) + { + } + + bool BoundedMessageQueue::push(std::string message) + { + std::lock_guard lock(mutex_); + if (closed_) + { + return false; + } + if (messages_.size() >= limit_) + { + messages_.pop_front(); + ++dropped_; + } + messages_.push_back(std::move(message)); + cv_.notify_one(); + return true; + } + + std::optional BoundedMessageQueue::pop() + { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this]() + { return closed_ || !messages_.empty(); }); + if (messages_.empty()) + { + return std::nullopt; + } + auto message = std::move(messages_.front()); + messages_.pop_front(); + return message; + } + + void BoundedMessageQueue::close() + { + std::lock_guard lock(mutex_); + closed_ = true; + cv_.notify_all(); + } + + void BoundedMessageQueue::reset(std::size_t limit) + { + std::lock_guard lock(mutex_); + limit_ = limit == 0 ? 1 : limit; + messages_.clear(); + closed_ = false; + dropped_ = 0; + } + + uint64_t BoundedMessageQueue::dropped() const + { + std::lock_guard lock(mutex_); + return dropped_; + } + + std::size_t BoundedMessageQueue::size() const + { + std::lock_guard lock(mutex_); + return messages_.size(); + } + + std::optional parse_typed_message(const std::string &message) + { + auto parsed = json::parse(message); + if (!parsed.is_object()) + { + return std::nullopt; + } + + TypedWebSocketMessage typed; + typed.raw = message; + typed.topic = parsed.value("topic", ""); + typed.type = parsed.value("type", ""); + typed.event_type = parsed.value("event_type", ""); + typed.asset_id = parsed.value("asset_id", ""); + if (parsed.contains("payload")) + { + const auto &payload = parsed["payload"]; + typed.payload = payload.is_string() ? payload.get() : payload.dump(); + if (payload.is_object() && payload.contains("asset_id")) + { + typed.asset_id = payload["asset_id"].get(); + } + } + + if (typed.topic.empty() && typed.type.empty() && typed.event_type.empty()) + { + return std::nullopt; + } + return typed; + } +} diff --git a/src/websocket_resilience.hpp b/src/websocket_resilience.hpp new file mode 100644 index 0000000..e2aa4ec --- /dev/null +++ b/src/websocket_resilience.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "websocket_client.hpp" +#include +#include +#include +#include +#include + +namespace polymarket::detail +{ + class BoundedMessageQueue + { + public: + explicit BoundedMessageQueue(std::size_t limit); + + bool push(std::string message); + std::optional pop(); + void close(); + void reset(std::size_t limit); + uint64_t dropped() const; + std::size_t size() const; + + private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::deque messages_; + std::size_t limit_; + bool closed_{false}; + uint64_t dropped_{0}; + }; + + std::optional parse_typed_message(const std::string &message); +} diff --git a/tests/test_websocket_resilience.cpp b/tests/test_websocket_resilience.cpp new file mode 100644 index 0000000..24ca7ae --- /dev/null +++ b/tests/test_websocket_resilience.cpp @@ -0,0 +1,95 @@ +#include "websocket_resilience.hpp" + +#include +#include + +using namespace polymarket; + +namespace +{ + bool expect_true(const std::string &name, bool value) + { + if (value) + { + return true; + } + std::cerr << "failed: " << name << "\n"; + return false; + } + + bool expect_equal(const std::string &name, const std::string &actual, const std::string &expected) + { + if (actual == expected) + { + return true; + } + std::cerr << name << " mismatch\n" + << " expected: " << expected << "\n" + << " actual: " << actual << "\n"; + return false; + } +} + +int main() +{ + detail::BoundedMessageQueue queue(2); + if (!expect_true("push first", queue.push("one")) || + !expect_true("push second", queue.push("two")) || + !expect_true("push overflow", queue.push("three")) || + !expect_true("drop count", queue.dropped() == 1) || + !expect_true("bounded size", queue.size() == 2)) + { + return 1; + } + + auto first = queue.pop(); + auto second = queue.pop(); + if (!expect_true("first available", first.has_value()) || + !expect_true("second available", second.has_value()) || + !expect_equal("drop oldest policy", *first, "two") || + !expect_equal("last kept", *second, "three")) + { + return 1; + } + + const auto typed = detail::parse_typed_message( + R"({"topic":"clob_market","type":"agg_orderbook","payload":{"asset_id":"token-1","bids":[],"asks":[]}})"); + if (!expect_true("typed parsed", typed.has_value()) || + !expect_equal("topic", typed->topic, "clob_market") || + !expect_equal("type", typed->type, "agg_orderbook") || + !expect_equal("payload asset id", typed->asset_id, "token-1")) + { + return 1; + } + + const auto legacy = detail::parse_typed_message( + R"({"event_type":"book","asset_id":"token-2","bids":[],"asks":[]})"); + if (!expect_true("legacy parsed", legacy.has_value()) || + !expect_equal("legacy event type", legacy->event_type, "book") || + !expect_equal("legacy asset id", legacy->asset_id, "token-2")) + { + return 1; + } + + const auto heartbeat = detail::parse_typed_message("{}"); + if (!expect_true("heartbeat ignored", !heartbeat.has_value())) + { + return 1; + } + + bool threw = false; + try + { + (void)detail::parse_typed_message("{not-json"); + } + catch (...) + { + threw = true; + } + if (!expect_true("malformed json throws for caller metrics", threw)) + { + return 1; + } + + return 0; +}