diff --git a/.gitmodules b/.gitmodules index 651e1a03df..88f8e871fd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,7 +1,7 @@ [submodule "thirdparty/fc"] path = thirdparty/fc - url = https://github.com/VIZ-Blockchain/fc.git - branch = update + url = https://github.com/m0ssa99/fc.git + branch = windows_suport_fc [submodule "thirdparty/chainbase"] path = thirdparty/chainbase url = https://github.com/VIZ-Blockchain/chainbase.git diff --git a/libraries/network/dlt_p2p_node.cpp b/libraries/network/dlt_p2p_node.cpp index e18a58f394..4814055ca5 100644 --- a/libraries/network/dlt_p2p_node.cpp +++ b/libraries/network/dlt_p2p_node.cpp @@ -82,6 +82,26 @@ void dlt_p2p_node::set_witness_diag_provider(std::function fn) { } void dlt_p2p_node::block_incoming_ip(uint32_t ip, const std::string& reason) { + // NAT safety: if multiple active peers share this IP (nodes behind the same NAT), + // blocking the IP would kill all of them. Only block when a single peer is using + // this IP — that's the typical single-machine attacker scenario. + uint32_t peers_with_ip = 0; + for (const auto& item : _peer_states) { + const auto& s = item.second; + if ((uint32_t)s.endpoint.get_address() == ip && + (s.lifecycle_state == DLT_PEER_LIFECYCLE_CONNECTING || + s.lifecycle_state == DLT_PEER_LIFECYCLE_HANDSHAKING || + s.lifecycle_state == DLT_PEER_LIFECYCLE_SYNCING || + s.lifecycle_state == DLT_PEER_LIFECYCLE_ACTIVE)) { + ++peers_with_ip; + } + } + if (peers_with_ip > 1) { + wlog(DLT_LOG_ORANGE "NAT: NOT blocking IP ${ip} (${n} active peers share this IP) — reason was: ${r}" DLT_LOG_RESET, + ("ip", std::string(fc::ip::address(ip)))("n", peers_with_ip)("r", reason)); + return; // Don't punish NAT peers for one misbehaving connection + } + fc::time_point unblock_at = fc::time_point::now() + fc::seconds(BLOCKED_IP_DURATION_SEC); _blocked_ips[ip] = unblock_at; wlog(DLT_LOG_ORANGE "Blocking IP ${ip} for ${d}s: ${r}" DLT_LOG_RESET, @@ -193,12 +213,53 @@ void dlt_p2p_node::close() { } // ── Connection management ──────────────────────────────────────────── +// ── Backward-compatible hello deserializer ─────────────────────────────────── +// FC_REFLECT-based deserialization (msg.as()) expects ALL +// fields to be present in the byte stream. Old nodes (protocol_version=1, +// 62-byte payload) do NOT include the node_id field (added in v2, 33 bytes). +// Calling msg.as<>() on such a message throws out_of_range_exception ("over by 1" +// — the first byte of the missing node_id cannot be read). +// +// This helper deserializes field-by-field and treats node_id as OPTIONAL: +// it is read only when the stream still has >= sizeof(node_id_t) bytes left. +// Unknown trailing bytes (future fields) are silently ignored. +static dlt_hello_message unpack_hello_compat(const message& msg) { + FC_ASSERT(msg.msg_type == dlt_hello_message_type); + dlt_hello_message hello; + if (msg.data.empty()) return hello; + fc::datastream ds(msg.data.data(), msg.data.size()); + fc::raw::unpack(ds, hello.protocol_version); + fc::raw::unpack(ds, hello.head_block_id); + fc::raw::unpack(ds, hello.head_block_num); + fc::raw::unpack(ds, hello.lib_block_id); + fc::raw::unpack(ds, hello.lib_block_num); + fc::raw::unpack(ds, hello.dlt_earliest_block); + fc::raw::unpack(ds, hello.dlt_latest_block); + fc::raw::unpack(ds, hello.emergency_active); + fc::raw::unpack(ds, hello.has_emergency_key); + fc::raw::unpack(ds, hello.fork_status); + fc::raw::unpack(ds, hello.node_status); + // node_id (sizeof = 33 bytes for compressed secp256k1 key) is optional: + // - absent → old protocol (v1 peer, 62-byte hello) — treat as zero_id + // - present → new protocol (v2+ peer, 95-byte hello) — use for NAT dedup + if (ds.remaining() >= sizeof(node_id_t)) { + fc::raw::unpack(ds, hello.node_id); + } + // Any remaining bytes are future protocol fields — ignored for forward compat. + return hello; +} -// ── Per-IP dedup: find any existing active connection from the same IP ─ -dlt_p2p_node::peer_id dlt_p2p_node::find_active_peer_by_ip(const fc::ip::address& addr) const { +// ── Per-node-id dedup: find any existing active connection to the same node ─ +// We identify nodes by the node_id they advertise in their hello message. +// This correctly handles multiple nodes behind the same NAT (same IP, different +// ports) — each node has a unique keypair, so only true duplicates are rejected. +// Returns INVALID_PEER_ID for zero node_id (old peer that didn't send one). +dlt_p2p_node::peer_id dlt_p2p_node::find_active_peer_by_node_id(const node_id_t& nid) const { + static const node_id_t zero_id; + if (nid == zero_id) return INVALID_PEER_ID; for (const auto& item : _peer_states) { const auto& state = item.second; - if (state.endpoint.get_address() == addr && + if (state.node_id == nid && (state.lifecycle_state == DLT_PEER_LIFECYCLE_CONNECTING || state.lifecycle_state == DLT_PEER_LIFECYCLE_HANDSHAKING || state.lifecycle_state == DLT_PEER_LIFECYCLE_SYNCING || @@ -234,7 +295,7 @@ void dlt_p2p_node::connect_to_peer(const fc::ip::endpoint& ep) { // which causes broadcast amplification. // EXCEPTION: Allow reconnect if the target peer itself is DISCONNECTED, // even if another connection to the same IP exists (different port). - if (!found_existing) { + /* if (!found_existing) { fc::ip::address target_ip = ep.get_address(); peer_id existing_ip_conn = find_active_peer_by_ip(target_ip); if (existing_ip_conn != INVALID_PEER_ID) { @@ -242,7 +303,11 @@ void dlt_p2p_node::connect_to_peer(const fc::ip::endpoint& ep) { ("ep", ep)("pid", existing_ip_conn)); return; } - } + }*/ + // NOTE: We no longer skip outbound connections based on IP address alone. + // Multiple nodes behind the same NAT share the same public IP but have + // different P2P ports and unique node_ids. Deduplication happens post-hello + // in on_dlt_hello() where we compare node_id values. if (!found_existing) { pid = _next_peer_id++; @@ -298,11 +363,17 @@ void dlt_p2p_node::connect_to_peer(const fc::ip::endpoint& ep) { std::string detail = e.to_detail_string(); bool is_expected = (detail.find("Connection refused") != std::string::npos) || (detail.find("connection refused") != std::string::npos) + || (detail.find("actively refused") != std::string::npos) // Windows WSA 10061 || (detail.find("Connection timed out") != std::string::npos) + || (detail.find("timed out") != std::string::npos) || (detail.find("Host unreachable") != std::string::npos) + || (detail.find("host unreachable") != std::string::npos) // Windows WSA 10065 || (detail.find("No route to host") != std::string::npos) + || (detail.find("network is unreachable") != std::string::npos) // Windows WSA 10051 || (detail.find("End of file") != std::string::npos) - || (detail.find("Operation aborted") != std::string::npos); + || (detail.find("end of file") != std::string::npos) + || (detail.find("Operation aborted") != std::string::npos) + || (detail.find("operation aborted") != std::string::npos); // Windows WSA 10004 if (is_expected) dlog(DLT_LOG_DGRAY "Connect to ${ep} failed: ${w}" DLT_LOG_RESET, ("ep", ep)("w", e.what())); else @@ -348,7 +419,12 @@ void dlt_p2p_node::handle_disconnect(peer_id peer, const std::string& reason, bo // _peer_states, so state/it remain valid when we resume here. auto fiber_it = _read_fibers.find(peer); if (fiber_it != _read_fibers.end()) { - try { if (fiber_it->second.valid()) fiber_it->second.cancel_and_wait(__FUNCTION__); } catch (...) {} + if (std::current_exception() != std::exception_ptr()) { + // Suntem în catch block — amânăm cancel_and_wait pentru periodic_task + _dead_fibers.push_back(std::move(fiber_it->second)); + } else { + try { if (fiber_it->second.valid()) fiber_it->second.cancel_and_wait(__FUNCTION__); } catch (...) {} + } _read_fibers.erase(fiber_it); } @@ -586,7 +662,14 @@ void dlt_p2p_node::drain_send_queue(peer_id peer, std::vector buf) { void dlt_p2p_node::send_to_all_our_fork_peers(const message& msg, peer_id exclude, const block_id_type& block_id) { // Per-IP dedup: send to each unique IP only once, even if multiple // peer entries exist for the same IP (belt-and-suspenders safety net). - std::set sent_to_ips; + // Dedup by node_id: send to each unique node only once, even if multiple + // peer entries exist for the same node (e.g. duplicate connections still + // being cleaned up). We do NOT dedup by IP address — multiple distinct + // nodes can share the same NAT IP and each deserves its own copy. + // Falls back to endpoint dedup for peers without a node_id (old protocol). + std::set sent_to_node_ids; + std::set sent_to_endpoints; + static const node_id_t zero_id; // Diagnostic: count eligible vs skipped peers uint32_t eligible = 0, skipped_not_exchange = 0, skipped_not_active = 0, skipped_echo = 0, skipped_peer_syncing = 0; @@ -626,9 +709,16 @@ void dlt_p2p_node::send_to_all_our_fork_peers(const message& msg, peer_id exclud skipped_echo++; continue; } - fc::ip::address ip = state.endpoint.get_address(); - if (sent_to_ips.count(ip)) continue; // already sent to this IP - sent_to_ips.insert(ip); + // Dedup: skip if we already queued a send to the same node. + // Use node_id when available (correctly handles NAT), fall back to + // full endpoint (IP:port) for old peers without a node_id. + if (state.node_id != zero_id) { + if (sent_to_node_ids.count(state.node_id)) continue; + sent_to_node_ids.insert(state.node_id); + } else { + if (sent_to_endpoints.count(state.endpoint)) continue; + sent_to_endpoints.insert(state.endpoint); + } targets.push_back(id); eligible++; } @@ -716,7 +806,9 @@ bool dlt_p2p_node::on_message(peer_id peer, const message& msg) { try { switch (msg.msg_type) { case dlt_hello_message_type: - on_dlt_hello(peer, msg.as()); + // unpack_hello_compat handles both v1 (no node_id, 62 bytes) and + // v2+ (with node_id, 95 bytes) — avoids out_of_range_exception. + on_dlt_hello(peer, unpack_hello_compat(msg)); break; case dlt_hello_reply_message_type: on_dlt_hello_reply(peer, msg.as()); @@ -829,6 +921,7 @@ dlt_hello_message dlt_p2p_node::build_hello_message() const { hello.has_emergency_key = _delegate->has_emergency_private_key(); hello.fork_status = _fork_status; hello.node_status = _node_status; + hello.node_id = _node_id; // identify ourselves so NAT peers can dedup by node, not IP return hello; } @@ -904,6 +997,29 @@ void dlt_p2p_node::on_dlt_hello(peer_id peer, const dlt_hello_message& hello) { wlog("Peer ${ep} has different protocol version (${theirs} vs ${ours}), disabling exchange", ("ep", state.endpoint)("theirs", their_major)("ours", our_major)); } + // Persist node_id — used for dedup and peer-exchange identity. + state.node_id = hello.node_id; + + // ── Post-hello node_id dedup ──────────────────────────────────────────── + // Now that we know the remote node's identity, check if we already have an + // active connection to the exact same node. This correctly handles: + // • A node reconnecting before the old connection was cleaned up + // • Simultaneous inbound + outbound to the same node + // It does NOT fire for two different nodes sharing the same NAT IP, because + // each node generates a unique keypair (node_id). + static const node_id_t zero_id; + if (hello.node_id != zero_id) { + peer_id dup = find_active_peer_by_node_id(hello.node_id); + if (dup != INVALID_PEER_ID && dup != peer) { + auto dup_it = _peer_states.find(dup); + auto dup_ep = (dup_it != _peer_states.end()) ? dup_it->second.endpoint : fc::ip::endpoint(); + dlog(DLT_LOG_DGRAY "Closing duplicate connection from ${ep} " + "(same node_id already active as peer ${dup} at ${dep})" DLT_LOG_RESET, + ("ep", state.endpoint)("dup", dup)("dep", dup_ep)); + handle_disconnect(peer, "duplicate node_id"); + return; + } + } // Store peer's chain state state.peer_head_id = hello.head_block_id; @@ -3286,7 +3402,13 @@ void dlt_p2p_node::periodic_peer_exchange() { if (_isolated_peers) return; if (_node_status != DLT_NODE_STATUS_FORWARD) return; - // Pick a random active peer to request exchange from + + // Pick a random active peer to request exchange from. + // When only one exchange-enabled peer exists (common for nodes behind NAT + // or freshly started nodes), all requests go to that single peer and hit + // the 3/300s rate-limit quickly. Back off to one request per 90s in that + // case so we never exceed the limit (3 requests / 300s = 1 per 100s max). + std::vector candidates; for (auto& _peer_item : _peer_states) { auto& id = _peer_item.first; @@ -3299,7 +3421,33 @@ void dlt_p2p_node::periodic_peer_exchange() { } if (candidates.empty()) return; - +// Dynamic throttle: ensure no single peer is asked more than 3 times per 300s. + // + // With N exchange-enabled peers and a random pick each loop (5s interval): + // requests per peer per 300s ≈ 300s / 5s / N = 60 / N + // rate-limit threshold = 3 requests / 300s + // safe minimum loop interval = 300s / (3 × N) = 100s / N + // + // Examples: + // N=1 → min interval 100s (was hardcoded 90s, now exact) + // N=2 → min interval 50s + // N=5 → min interval 20s + // N=20 → min interval 5s (≥ loop tick, no extra throttle needed) + // + // We track _last_peer_exchange globally; the random peer pick spreads + // load evenly across candidates so this global gate is sufficient. + { + size_t n = candidates.size(); + int64_t min_interval_us = (n >= 20) + ? 0LL + : static_cast(100'000'000LL / static_cast(n)); // 100s / N in microseconds + if (min_interval_us > 0) { + auto now = fc::time_point::now(); + if ((now - _last_peer_exchange_time).count() < min_interval_us) return; + _last_peer_exchange_time = now; + } + } + thread_local std::mt19937 peer_rng(std::hash{}(std::this_thread::get_id()) ^ uint32_t(fc::time_point::now().sec_since_epoch())); size_t idx = peer_rng() % candidates.size(); send_message(candidates[idx], message(dlt_peer_exchange_request())); @@ -3344,6 +3492,19 @@ void dlt_p2p_node::block_validation_timeout() { // ── Periodic task ──────────────────────────────────────────────────── void dlt_p2p_node::periodic_task() { + if (!_dead_fibers.empty()) { + std::vector> to_clean; + to_clean.swap(_dead_fibers); + for (auto& f : to_clean) { + try { + // Nu apela ready() — poate crapa dacă promise e distrus + // cancel_and_wait are acum garda valid() după fix-ul din future.hpp + f.cancel_and_wait(__FUNCTION__); + } catch (...) {} + // Eliberează explicit promise-ul imediat după + f = fc::future(); + } +} // Non-DB-access housekeeping always runs. periodic_reconnect_check(); periodic_lifecycle_timeout_check(); @@ -3486,7 +3647,7 @@ void dlt_p2p_node::accept_loop() { continue; } - peer_id existing = find_active_peer_by_ip(incoming_ip); + /* peer_id existing = find_active_peer_by_ip(incoming_ip); if (existing != INVALID_PEER_ID) { auto ex_it = _peer_states.find(existing); auto ex_ep = (ex_it != _peer_states.end()) ? ex_it->second.endpoint : fc::ip::endpoint(); @@ -3497,8 +3658,13 @@ void dlt_p2p_node::accept_loop() { _connections.erase(pid); sock->close(); continue; - } - + }*/ + // NOTE: We do NOT reject here based on IP address alone. + // Multiple nodes behind the same NAT share the same public IP but + // have different P2P ports and unique node_ids. Deduplication of + // truly-duplicate connections (same node reconnecting) is done + // post-hello in on_dlt_hello() by comparing node_id values. + // Isolated-peers: only accept inbound from configured seed IPs. if (_isolated_peers) { bool is_seed = false; @@ -3537,7 +3703,18 @@ void dlt_p2p_node::accept_loop() { return; } catch (const fc::exception& e) { elog("Error in accept loop: ${e}", ("e", e.to_detail_string())); - if (_running) fc::usleep(fc::seconds(1)); + + // NOTE: do NOT call fc::usleep here — fc::usleep yields the fiber and + // FC_ASSERT(current_exception == nullptr) fires if called while an + // exception is still active (Windows x64 SEH / ucrtbase abort). + // The sleep happens AFTER this catch block, below. + } + // Sleep OUTSIDE the catch block so no exception is active when we yield. + if (_running) + fc::usleep(fc::seconds(1)); + + } +} } } } @@ -3634,16 +3811,26 @@ void dlt_p2p_node::start_read_loop(peer_id peer) { const auto& detail = e.to_detail_string(); bool is_transient = detail.find("Connection reset by peer") != std::string::npos || + detail.find("forcibly closed") != std::string::npos || // Windows WSA 10054 detail.find("Connection refused") != std::string::npos || + detail.find("actively refused") != std::string::npos || // Windows WSA 10061 detail.find("Broken pipe") != std::string::npos || + detail.find("connection was aborted") != std::string::npos || // Windows WSA 10053 detail.find("end of stream") != std::string::npos || + detail.find("End of file") != std::string::npos || detail.find("Operation aborted") != std::string::npos || + detail.find("operation aborted") != std::string::npos || // Windows WSA 10004 detail.find("Network is unreachable") != std::string::npos || + detail.find("network is unreachable") != std::string::npos || // Windows WSA 10051 detail.find("No route to host") != std::string::npos || detail.find("Connection timed out") != std::string::npos || - detail.find("Host is unreachable") != std::string::npos; + detail.find("timed out") != std::string::npos || + detail.find("Host is unreachable") != std::string::npos || + detail.find("host unreachable") != std::string::npos; // Windows WSA 10065 bool is_benign_close = - detail.find("Bad file descriptor") != std::string::npos; + detail.find("Bad file descriptor") != std::string::npos || + detail.find("bad file descriptor") != std::string::npos || + detail.find("invalid argument") != std::string::npos; // Windows: closed socket reuse if (is_benign_close) { dlog(DLT_LOG_DGRAY "Peer ${ep} read canceled (socket already closed)" DLT_LOG_RESET, diff --git a/libraries/network/include/graphene/network/dlt_p2p_messages.hpp b/libraries/network/include/graphene/network/dlt_p2p_messages.hpp index ba8e96d47d..4a5ddc2964 100644 --- a/libraries/network/include/graphene/network/dlt_p2p_messages.hpp +++ b/libraries/network/include/graphene/network/dlt_p2p_messages.hpp @@ -81,6 +81,10 @@ struct dlt_hello_message { bool has_emergency_key = false; uint8_t fork_status = DLT_FORK_STATUS_NORMAL; uint8_t node_status = DLT_NODE_STATUS_SYNC; + // Persistent node identity key — used to deduplicate connections from nodes + // sharing the same NAT IP (different ports). Each node generates a random + // keypair at startup. Zero-value means "unknown" (old protocol peer). + node_id_t node_id; }; // ── DLT Hello Reply ───────────────────────────────────────────────── @@ -269,7 +273,7 @@ FC_REFLECT_ENUM(graphene::network::dlt_peer_lifecycle_state, FC_REFLECT((graphene::network::dlt_hello_message), (protocol_version)(head_block_id)(head_block_num)(lib_block_id)(lib_block_num) (dlt_earliest_block)(dlt_latest_block)(emergency_active)(has_emergency_key) - (fork_status)(node_status)) + (fork_status)(node_status)(node_id)) FC_REFLECT((graphene::network::dlt_hello_reply_message), (exchange_enabled)(fork_alignment)(initiator_head_seen)(initiator_lib_seen) diff --git a/libraries/network/include/graphene/network/dlt_p2p_node.hpp b/libraries/network/include/graphene/network/dlt_p2p_node.hpp index c3f451f253..1bc4db9993 100644 --- a/libraries/network/include/graphene/network/dlt_p2p_node.hpp +++ b/libraries/network/include/graphene/network/dlt_p2p_node.hpp @@ -286,7 +286,7 @@ class dlt_p2p_node { bool is_same_subnet(const fc::ip::address& a, const fc::ip::address& b) const; // ── Per-IP dedup ───────────────────────────────────────────── - peer_id find_active_peer_by_ip(const fc::ip::address& addr) const; + peer_id find_active_peer_by_node_id(const node_id_t& nid) const; private: dlt_p2p_delegate* _delegate = nullptr; @@ -344,6 +344,7 @@ class dlt_p2p_node { fc::thread* _thread = nullptr; bool _running = false; std::map> _read_fibers; + std::vector> _dead_fibers; fc::future _accept_fiber; fc::future _periodic_fiber; @@ -443,6 +444,9 @@ class dlt_p2p_node { std::map _blocked_ips; static constexpr uint32_t BLOCKED_IP_DURATION_SEC = 3600; // 1 hour void block_incoming_ip(uint32_t ip, const std::string& reason); + // Last time a peer exchange request was sent — used by periodic_peer_exchange() + // to dynamically throttle based on the number of active peers (see impl). + fc::time_point _last_peer_exchange_time; bool is_ip_blocked(uint32_t ip); // ── Diagnostics ─────────────────────────────────────────────── diff --git a/plugins/validator/validator.cpp b/plugins/validator/validator.cpp index d37dac3045..eaac6361ca 100644 --- a/plugins/validator/validator.cpp +++ b/plugins/validator/validator.cpp @@ -1685,7 +1685,12 @@ namespace graphene { { int64_t ntp_us = 0; try { ntp_us = graphene::time::ntp_error().count(); } catch (...) {} - if (ntp_us > 250000) { // local clock >250ms behind NTP + #if defined(_WIN32) + constexpr int64_t NTP_WARN_THRESHOLD_US = 2000000; // 2s pe Windows + #else + constexpr int64_t NTP_WARN_THRESHOLD_US = 250000; // 250ms pe Linux/macOS + #endif + if (ntp_us > NTP_WARN_THRESHOLD_US) { // local clock >250ms behind NTP static fc::time_point _last_ntp_drift_log; auto _now_nd = fc::time_point::now(); if ((_now_nd - _last_ntp_drift_log).count() > 10000000) { diff --git a/thirdparty/fc b/thirdparty/fc index cf033fee1b..c4f38357d4 160000 --- a/thirdparty/fc +++ b/thirdparty/fc @@ -1 +1 @@ -Subproject commit cf033fee1b802a39fe11de4cc1e311faae29f168 +Subproject commit c4f38357d4f2f45228b5549e96b572ffb8ee9142