Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ set(POLYMARKET_CLIENT_SOURCES
src/http_client.cpp
src/json_rpc_client.cpp
src/websocket_client.cpp
src/websocket_client_resilience.cpp
src/websocket_resilience.cpp
src/market_fetcher.cpp
src/orderbook.cpp
src/clob_order_execution.cpp
Expand Down Expand Up @@ -177,6 +179,11 @@ if(POLYMARKET_CLIENT_BUILD_TESTS)
add_executable(test_sdk_error tests/test_sdk_error.cpp)
target_link_libraries(test_sdk_error PRIVATE polymarket::client)
add_test(NAME test_sdk_error COMMAND test_sdk_error)

add_executable(test_websocket_resilience tests/test_websocket_resilience.cpp)
target_include_directories(test_websocket_resilience PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(test_websocket_resilience PRIVATE polymarket::client)
add_test(NAME test_websocket_resilience COMMAND test_websocket_resilience)
endif()

# Install library, headers, and dependency targets into a single export set
Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,35 @@ client.stop_heartbeat();

**Expected gains**: First request ~40-60ms → subsequent requests ~25-35ms.

## WebSocket Resilience

`WebSocketClient` supports additive production-safety options for market-data
consumers: automatic reconnect backoff, ping interval, bounded message queue,
subscription replay, typed message callbacks, and counters for reconnects,
dropped messages, parse errors, last message time, messages, and bytes.

```cpp
polymarket::WebSocketClient ws;
polymarket::WebSocketOptions options;
options.message_queue_limit = 4096;
options.min_backoff_ms = 250;
options.max_backoff_ms = 5000;
ws.configure(options);

ws.on_message([](const std::string& raw) {
// Raw callback remains available.
});
ws.on_typed_message([](const polymarket::TypedWebSocketMessage& msg) {
if (msg.topic == "clob_market" && msg.type == "agg_orderbook") {
// Use msg.asset_id and msg.payload.
}
});
```

Call `track_subscription(subscription_json)` after sending a subscription if
you use `WebSocketClient` directly. `OrderbookManager` tracks its subscription
message and restores it automatically after reconnect.

## Neg-Risk Markets

The client automatically detects neg_risk markets and uses the appropriate exchange address for order signing:
Expand Down
67 changes: 65 additions & 2 deletions include/websocket_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <thread>
#include <mutex>
#include <memory>
#include <vector>
#include <ixwebsocket/IXWebSocket.h>

namespace polymarket
Expand All @@ -28,6 +29,43 @@ namespace polymarket
using OnDisconnectCallback = std::function<void()>;
using OnErrorCallback = std::function<void(const std::string &)>;

struct WebSocketOptions
{
bool reconnect_enabled{true};
int max_reconnect_attempts{0};
uint32_t min_backoff_ms{250};
uint32_t max_backoff_ms{10000};
int ping_interval_ms{5000};
std::size_t message_queue_limit{1024};
};

struct WebSocketStats
{
uint64_t messages_received{0};
uint64_t bytes_received{0};
uint64_t reconnects{0};
uint64_t dropped_messages{0};
uint64_t parse_errors{0};
uint64_t last_message_time_ns{0};
};

struct TypedWebSocketMessage
{
std::string topic;
std::string type;
std::string event_type;
std::string asset_id;
std::string payload;
std::string raw;
};

using OnTypedMessageCallback = std::function<void(const TypedWebSocketMessage &)>;

namespace detail
{
class BoundedMessageQueue;
}

// High-performance WebSocket client using IXWebSocket
class WebSocketClient
{
Expand All @@ -43,9 +81,12 @@ namespace polymarket
void set_url(const std::string &url);
void set_ping_interval_ms(int interval_ms);
void set_auto_reconnect(bool enabled);
void configure(const WebSocketOptions &options);
WebSocketOptions options() const;

// Callbacks
void on_message(OnMessageCallback callback);
void on_typed_message(OnTypedMessageCallback callback);
void on_connect(OnConnectCallback callback);
void on_disconnect(OnDisconnectCallback callback);
void on_error(OnErrorCallback callback);
Expand All @@ -58,6 +99,10 @@ namespace polymarket

// Send message
bool send(const std::string &message);
void track_subscription(const std::string &message);
void untrack_subscription(const std::string &message);
void clear_subscriptions();
std::size_t tracked_subscription_count() const;

// Run event loop (blocking) - IXWebSocket runs in its own thread
void run();
Expand All @@ -68,29 +113,47 @@ namespace polymarket
// Get statistics
uint64_t messages_received() const { return messages_received_.load(); }
uint64_t bytes_received() const { return bytes_received_.load(); }
WebSocketStats stats() const;

private:
ix::WebSocket ws_;
std::unique_ptr<detail::BoundedMessageQueue> message_queue_;

// Configuration
std::string url_;
int ping_interval_ms_;
bool auto_reconnect_;
WebSocketOptions options_;

// State
std::atomic<WsState> state_;
std::atomic<bool> running_;
std::atomic<bool> should_stop_;
std::atomic<bool> worker_running_{false};
std::atomic<bool> has_connected_{false};
std::thread message_worker_;

// Callbacks
OnMessageCallback on_message_cb_;
OnTypedMessageCallback on_typed_message_cb_;
OnConnectCallback on_connect_cb_;
OnDisconnectCallback on_disconnect_cb_;
OnErrorCallback on_error_cb_;

mutable std::mutex subscriptions_mutex_;
std::vector<std::string> subscriptions_;

// Statistics
std::atomic<uint64_t> messages_received_{0};
std::atomic<uint64_t> bytes_received_{0};
std::atomic<uint64_t> reconnects_{0};
std::atomic<uint64_t> parse_errors_{0};
std::atomic<uint64_t> last_message_time_ns_{0};

void apply_options();
void start_message_worker();
void stop_message_worker();
void enqueue_message(const std::string &message);
void dispatch_message(const std::string &message);
void restore_subscriptions();
};

} // namespace polymarket
18 changes: 17 additions & 1 deletion src/orderbook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ namespace polymarket
ws_.on_connect([this]()
{
std::cout << "[WS] Connected to orderbook stream" << std::endl;
send_subscribe_message(); });
if (ws_.stats().reconnects == 0 || ws_.tracked_subscription_count() == 0)
{
send_subscribe_message();
} });

ws_.on_disconnect([this]()
{ std::cout << "[WS] Disconnected from orderbook stream" << std::endl; });
Expand Down Expand Up @@ -62,6 +65,12 @@ namespace polymarket

std::cout << "[OrderbookManager] Subscribed to market: " << market.slug
<< " (YES: " << market.token_yes.substr(0, 16) << "...)" << std::endl;

ws_.clear_subscriptions();
if (ws_.is_connected())
{
send_subscribe_message();
}
}

void OrderbookManager::unsubscribe(const std::string &token_id)
Expand All @@ -70,6 +79,11 @@ namespace polymarket
if (it != subscribed_tokens_.end())
{
subscribed_tokens_.erase(it);
ws_.clear_subscriptions();
if (ws_.is_connected())
{
send_subscribe_message();
}
}

std::unique_lock<std::shared_mutex> lock(orderbooks_mutex_);
Expand All @@ -79,6 +93,7 @@ namespace polymarket
void OrderbookManager::unsubscribe_all()
{
subscribed_tokens_.clear();
ws_.clear_subscriptions();

{
std::unique_lock<std::shared_mutex> lock(orderbooks_mutex_);
Expand Down Expand Up @@ -183,6 +198,7 @@ namespace polymarket
std::string msg = subscribe_msg.dump();
std::cout << "[WS] Sending subscribe: " << subscribed_tokens_.size() << " tokens" << std::endl;

ws_.track_subscription(msg);
ws_.send(msg);
}

Expand Down
41 changes: 26 additions & 15 deletions src/websocket_client.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#include "websocket_client.hpp"
#include "types.hpp"
#include "websocket_resilience.hpp"
#include <iostream>

namespace polymarket
{

WebSocketClient::WebSocketClient()
: ping_interval_ms_(5000), auto_reconnect_(true), state_(WsState::DISCONNECTED), running_(false), should_stop_(false)
: message_queue_(std::make_unique<detail::BoundedMessageQueue>(options_.message_queue_limit)),
state_(WsState::DISCONNECTED),
running_(false),
should_stop_(false)
{
apply_options();
}

WebSocketClient::~WebSocketClient()
Expand All @@ -23,25 +28,26 @@ namespace polymarket

void WebSocketClient::set_ping_interval_ms(int interval_ms)
{
ping_interval_ms_ = interval_ms;
ws_.setPingInterval(interval_ms);
options_.ping_interval_ms = interval_ms;
apply_options();
}

void WebSocketClient::set_auto_reconnect(bool enabled)
{
auto_reconnect_ = enabled;
ws_.enableAutomaticReconnection();
if (!enabled)
{
ws_.disableAutomaticReconnection();
}
options_.reconnect_enabled = enabled;
apply_options();
}

void WebSocketClient::on_message(OnMessageCallback callback)
{
on_message_cb_ = std::move(callback);
}

void WebSocketClient::on_typed_message(OnTypedMessageCallback callback)
{
on_typed_message_cb_ = std::move(callback);
}

void WebSocketClient::on_connect(OnConnectCallback callback)
{
on_connect_cb_ = std::move(callback);
Expand Down Expand Up @@ -71,22 +77,27 @@ namespace polymarket
{
case ix::WebSocketMessageType::Open:
state_.store(WsState::CONNECTED);
if (has_connected_.exchange(true))
{
reconnects_++;
}
restore_subscriptions();
if (on_connect_cb_)
{
on_connect_cb_();
}
break;

case ix::WebSocketMessageType::Close:
state_.store(WsState::DISCONNECTED);
state_.store(options_.reconnect_enabled ? WsState::RECONNECTING : WsState::DISCONNECTED);
if (on_disconnect_cb_)
{
on_disconnect_cb_();
}
break;

case ix::WebSocketMessageType::Error:
state_.store(WsState::DISCONNECTED);
state_.store(options_.reconnect_enabled ? WsState::RECONNECTING : WsState::DISCONNECTED);
if (on_error_cb_)
{
on_error_cb_(msg->errorInfo.reason);
Expand All @@ -96,10 +107,8 @@ namespace polymarket
case ix::WebSocketMessageType::Message:
messages_received_++;
bytes_received_ += msg->str.size();
if (on_message_cb_)
{
on_message_cb_(msg->str);
}
last_message_time_ns_.store(now_ns());
enqueue_message(msg->str);
break;

case ix::WebSocketMessageType::Ping:
Expand All @@ -110,6 +119,7 @@ namespace polymarket
} });

state_.store(WsState::CONNECTING);
start_message_worker();
ws_.start();

return true;
Expand All @@ -119,6 +129,7 @@ namespace polymarket
{
state_.store(WsState::CLOSING);
ws_.stop();
stop_message_worker();
state_.store(WsState::DISCONNECTED);
}

Expand Down
Loading
Loading