diff --git a/build-windows.ps1 b/build-windows.ps1 new file mode 100644 index 00000000..e41ac4c1 --- /dev/null +++ b/build-windows.ps1 @@ -0,0 +1,23 @@ +param( + [string]$Generator = "Ninja", + [string]$BuildDir = "out\\build\\windows", + [string]$Config = "RelWithDebInfo" +) + +function Abort($msg) { + Write-Error $msg + exit 1 +} + +if (-not (Get-Command cmake -ErrorAction SilentlyContinue)) { Abort "cmake not found on PATH. Install CMake and retry." } +if ($Generator -eq "Ninja" -and -not (Get-Command ninja -ErrorAction SilentlyContinue)) { Abort "ninja not found on PATH. Install Ninja and retry." } + +Write-Host "Configuring (Generator=$Generator, BuildDir=$BuildDir, Config=$Config)" +cmake -S . -B $BuildDir -G $Generator -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=$Config +if ($LASTEXITCODE -ne 0) { Abort "CMake configure failed." } + +Write-Host "Building" +cmake --build $BuildDir --config $Config -- -v +if ($LASTEXITCODE -ne 0) { Abort "Build failed." } + +Write-Host "Build finished. Artifacts are in: $BuildDir" \ No newline at end of file diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml index 2fb1c119..5c221faf 100644 --- a/configs/fne-config.example.yml +++ b/configs/fne-config.example.yml @@ -120,6 +120,17 @@ master: # spanning tree updates.) spanningTreeFastReconnect: true + # Console patch status registry configuration. + patchStatus: + # Flag indicating whether or not console patch status publishing is enabled. + enabled: true + # Default TTL, in seconds, for console patch status updates that do not specify one. + defaultTtlSeconds: 15 + # Minimum accepted TTL, in seconds, for console patch status updates. + minTtlSeconds: 5 + # Maximum accepted TTL, in seconds, for console patch status updates. + maxTtlSeconds: 300 + # Flag indicating whether or not peer pinging will be reported. reportPeerPing: true diff --git a/docs/WINDOWS_BUILD.md b/docs/WINDOWS_BUILD.md new file mode 100644 index 00000000..5fc00413 --- /dev/null +++ b/docs/WINDOWS_BUILD.md @@ -0,0 +1,30 @@ +Windows build prerequisites and steps for dvmhost + +Prerequisites +- Visual Studio 2019/2022 (Desktop development with C++ workload) or at least the MSVC build tools. +- CMake 3.16 or newer +- Ninja (recommended generator) +- Git +- (Optional) OpenSSL for Windows if you need SSL; not required by default when building with `-DCOMPILE_WIN32=ON`. + +Quick steps +1. Open a Developer command prompt (e.g. "x64 Native Tools Command Prompt for VS 2022") or run the MSVC environment so compilers are on PATH. +2. Ensure `cmake` and `ninja` are on PATH. +3. From the repository root: + +```powershell +# create an out/build directory and configure +mkdir -p out\build\windows +cmake -S . -B out\build\windows -G Ninja -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo + +# build +cmake --build out\build\windows --config RelWithDebInfo +``` + +Notes +- The project provides `CMakeSettings.json` presets for Visual Studio Code/CMake Tools which enable `COMPILE_WIN32` and use Ninja. +- TUI support is disabled on Windows by design; some utilities that require ncurses will not be available when `COMPILE_WIN32=ON`. +- If you prefer Visual Studio IDE: open the CMake project in Visual Studio, select the provided configuration (see `CMakeSettings.json`) and build. +- If you need OpenSSL for Windows, install a binary distribution and set `-DOPENSSL_ROOT_DIR="C:/path/to/openssl"` when invoking `cmake`. + +If you want, run `.uild-windows.ps1` from a Developer PowerShell prompt — it will run the configure+build steps automatically. \ No newline at end of file diff --git a/src/common/network/BaseNetwork.h b/src/common/network/BaseNetwork.h index e6538f00..a4023011 100644 --- a/src/common/network/BaseNetwork.h +++ b/src/common/network/BaseNetwork.h @@ -69,6 +69,7 @@ #define TAG_TRANSFER_ACT_LOG "TRNSLOG" #define TAG_TRANSFER_DIAG_LOG "TRNSDIAG" #define TAG_TRANSFER_STATUS "TRNSSTS" +#define TAG_TRANSFER_PATCH_STATUS "TRNSPTCH" #define TAG_ANNOUNCE "ANNC" #define TAG_PEER_REPLICA "REPL" diff --git a/src/common/network/RTPFNEHeader.h b/src/common/network/RTPFNEHeader.h index 3b842276..67563259 100644 --- a/src/common/network/RTPFNEHeader.h +++ b/src/common/network/RTPFNEHeader.h @@ -100,6 +100,7 @@ namespace network TRANSFER_SUBFUNC_ACTIVITY = 0x01U, //!< Activity Log Transfer TRANSFER_SUBFUNC_DIAG = 0x02U, //!< Diagnostic Log Transfer TRANSFER_SUBFUNC_STATUS = 0x03U, //!< Status Transfer + TRANSFER_SUBFUNC_PATCH_STATUS = 0x04U, //!< Console Patch Status Transfer ANNC_SUBFUNC_GRP_AFFIL = 0x00U, //!< Announce Group Affiliation ANNC_SUBFUNC_UNIT_REG = 0x01U, //!< Announce Unit Registration @@ -114,6 +115,7 @@ namespace network REPL_ACT_PEER_LIST = 0xA2U, //!< FNE Replication Active Peer List Transfer REPL_HA_PARAMS = 0xA3U, //!< FNE Replication HA Parameters + REPL_PATCH_STATUS = 0xA4U, //!< FNE Replication Patch Status Transfer NET_TREE_LIST = 0x00U, //!< FNE Network Tree List NET_TREE_DISC = 0x01U //!< FNE Network Tree Disconnect @@ -215,4 +217,4 @@ namespace network } // namespace frame } // namespace network -#endif // __RTP_FNE_HEADER_H__ \ No newline at end of file +#endif // __RTP_FNE_HEADER_H__ diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp index cb85d22f..52f969d5 100644 --- a/src/fne/HostFNE.cpp +++ b/src/fne/HostFNE.cpp @@ -899,6 +899,7 @@ bool HostFNE::createPeerNetworks() network->setNetTreeDiscCallback(std::bind(&HostFNE::processNetworkTreeDisconnect, this, std::placeholders::_1, std::placeholders::_2)); network->setNotifyPeerReplicaCallback(std::bind(&HostFNE::processPeerReplicaNotify, this, std::placeholders::_1)); + network->setPatchStatusCallback(std::bind(&HostFNE::processPeerPatchStatus, this, std::placeholders::_1, std::placeholders::_2)); network->enable(enabled); if (enabled) { @@ -1194,3 +1195,12 @@ void HostFNE::processPeerReplicaNotify(network::PeerNetwork* peerNetwork) m_network->setPeerReplica(true); } } + +/* Processes peer patch status replication. */ + +void HostFNE::processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj) +{ + if (m_network != nullptr && peerNetwork != nullptr) { + m_network->processReplicatedPatchStatus(peerNetwork->getPeerId(), obj); + } +} diff --git a/src/fne/HostFNE.h b/src/fne/HostFNE.h index 83d38a4e..ef9d706b 100644 --- a/src/fne/HostFNE.h +++ b/src/fne/HostFNE.h @@ -17,6 +17,7 @@ #define __HOST_FNE_H__ #include "Defines.h" +#include "common/json/json.h" #include "common/lookups/RadioIdLookup.h" #include "common/lookups/TalkgroupRulesLookup.h" #include "common/lookups/PeerListLookup.h" @@ -265,6 +266,12 @@ class HOST_SW_API HostFNE { * @param peerNetwork Peer network instance. */ void processPeerReplicaNotify(network::PeerNetwork* peerNetwork); + /** + * @brief Processes peer patch status replication. + * @param peerNetwork Peer network instance. + * @param obj Patch status JSON payload. + */ + void processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj); }; #endif // __HOST_FNE_H__ diff --git a/src/fne/PatchStatusRegistry.cpp b/src/fne/PatchStatusRegistry.cpp new file mode 100644 index 00000000..eb5870dc --- /dev/null +++ b/src/fne/PatchStatusRegistry.cpp @@ -0,0 +1,463 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Digital Voice Modem - Converged FNE Software + * GPLv2 Open Source. Use is subject to license terms. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright (C) 2026 DVMProject Authors + * + */ +#include "fne/PatchStatusRegistry.h" + +#include "common/Log.h" + +#include +#include +#include +#include +#include + +constexpr uint32_t PatchStatusRegistry::DEFAULT_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS; + +/* Initializes a new instance of the PatchStatusRegistry class. */ + +PatchStatusRegistry::PatchStatusRegistry() : + m_mutex(), + m_revisionChanged(), + m_peerPatches(), + m_revision(0U), + m_defaultTtlSeconds(DEFAULT_TTL_SECONDS), + m_minTtlSeconds(MIN_TTL_SECONDS), + m_maxTtlSeconds(MAX_TTL_SECONDS) +{ + /* stub */ +} + +/* Configures the accepted TTL range for patch status records. */ + +void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds) +{ + if (minTtlSeconds == 0U) + minTtlSeconds = MIN_TTL_SECONDS; + if (maxTtlSeconds < minTtlSeconds) + maxTtlSeconds = minTtlSeconds; + + std::lock_guard guard(m_mutex); + m_minTtlSeconds = minTtlSeconds; + m_maxTtlSeconds = maxTtlSeconds; + m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds)); +} + +/* Publishes a complete patch snapshot for one console peer. */ + +bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) +{ + if (!request["peerId"].is()) { + errorMessage = "peerId was not a valid integer"; + return false; + } + + if (!request["patches"].is()) { + errorMessage = "patches was not a valid array"; + return false; + } + + PeerPatchSnapshot incoming; + incoming.peerId = request["peerId"].get(); + if (incoming.peerId == 0U) { + errorMessage = "peerId cannot be zero"; + return false; + } + + if (request["peerName"].is()) + incoming.peerName = request["peerName"].get(); + + if (request["originFnePeerId"].is()) + incoming.originFnePeerId = request["originFnePeerId"].get(); + + if (request["sequence"].is()) + incoming.sequence = request["sequence"].get(); + + uint32_t ttlSeconds = defaultTtlSeconds(); + if (request["ttlSeconds"].is()) + ttlSeconds = request["ttlSeconds"].get(); + ttlSeconds = clampTtl(ttlSeconds); + + incoming.updatedAt = nowMs(); + incoming.expiresAt = incoming.updatedAt + (static_cast(ttlSeconds) * 1000U); + + json::array patches = request["patches"].get(); + for (json::value& value : patches) { + if (!value.is()) { + errorMessage = "patches contained a non-object entry"; + return false; + } + + json::object patchObj = value.get(); + PatchRecord patch; + if (!parsePatch(patchObj, patch, errorMessage)) + return false; + + incoming.patches.push_back(patch); + } + + cleanupExpired(); + + { + std::lock_guard guard(m_mutex); + auto existing = m_peerPatches.find(incoming.peerId); + if (existing != m_peerPatches.end() && incoming.sequence > 0U && existing->second.sequence > incoming.sequence) { + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + return true; + } + + if (incoming.patches.empty()) + m_peerPatches.erase(incoming.peerId); + else + m_peerPatches[incoming.peerId] = incoming; + bumpRevisionLocked(); + + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + } + + m_revisionChanged.notify_all(); + return true; +} + +/* Removes all patch records associated with a console peer. */ + +bool PatchStatusRegistry::removePeer(uint32_t peerId) +{ + if (peerId == 0U) + return false; + + bool removed = false; + { + std::lock_guard guard(m_mutex); + removed = m_peerPatches.erase(peerId) > 0U; + if (removed) + bumpRevisionLocked(); + } + + if (removed) + m_revisionChanged.notify_all(); + + return removed; +} + +/* Removes expired patch status records. */ + +uint32_t PatchStatusRegistry::cleanupExpired() +{ + uint32_t removed = 0U; + bool changed = false; + uint64_t now = nowMs(); + + { + std::lock_guard guard(m_mutex); + for (auto it = m_peerPatches.begin(); it != m_peerPatches.end();) { + if (it->second.expiresAt <= now) { + it = m_peerPatches.erase(it); + removed++; + changed = true; + } + else { + ++it; + } + } + + if (changed) + bumpRevisionLocked(); + } + + if (changed) + m_revisionChanged.notify_all(); + + return removed; +} + +/* Creates a complete JSON snapshot of the registry. */ + +json::object PatchStatusRegistry::snapshot() +{ + cleanupExpired(); + + std::lock_guard guard(m_mutex); + return snapshotLocked(); +} + +/* Waits for registry changes after the supplied revision. */ + +json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs) +{ + waitMs = std::min(waitMs, MAX_WAIT_MS); + cleanupExpired(); + + std::unique_lock lock(m_mutex); + if (waitMs > 0U && sinceRevision >= m_revision) { + m_revisionChanged.wait_for(lock, std::chrono::milliseconds(waitMs), [&]() { + return m_revision > sinceRevision; + }); + } + + return snapshotLocked(); +} + +/* Gets the current registry revision. */ + +uint64_t PatchStatusRegistry::revision() const +{ + std::lock_guard guard(m_mutex); + return m_revision; +} + +/* Gets the configured default patch status TTL. */ + +uint32_t PatchStatusRegistry::defaultTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_defaultTtlSeconds; +} + +/* Gets the configured minimum patch status TTL. */ + +uint32_t PatchStatusRegistry::minTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_minTtlSeconds; +} + +/* Gets the configured maximum patch status TTL. */ + +uint32_t PatchStatusRegistry::maxTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_maxTtlSeconds; +} + +/* Gets the current system time in milliseconds. */ + +uint64_t PatchStatusRegistry::nowMs() +{ + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); +} + +/* Normalizes a mode string for key generation. */ + +std::string PatchStatusRegistry::normalizeMode(const std::string& mode) +{ + std::string normalized = mode; + std::transform(normalized.begin(), normalized.end(), normalized.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return normalized; +} + +/* Builds a stable talkgroup lookup key for a patch member. */ + +std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) +{ + std::ostringstream ss; + ss << normalizeMode(member.mode) << ':' << member.tgid << ':' << static_cast(member.slot); + return ss.str(); +} + +/* Serializes a patch member to JSON. */ + +json::object PatchStatusRegistry::memberToJson(const PatchMember& member) +{ + json::object obj = json::object(); + obj["system"].set(member.system); + obj["mode"].set(member.mode); + obj["tgid"].set(member.tgid); + obj["slot"].set(member.slot); + obj["key"].set(buildTalkgroupKey(member)); + return obj; +} + +/* Serializes a patch record to JSON. */ + +json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) +{ + json::object obj = json::object(); + obj["patchId"].set(patch.patchId); + obj["active"].set(patch.active); + obj["oneWay"].set(patch.oneWay); + + json::array members = json::array(); + for (const PatchMember& member : patch.members) + members.push_back(json::value(memberToJson(member))); + obj["members"].set(members); + + return obj; +} + +/* Serializes a peer patch snapshot to JSON. */ + +json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer) +{ + json::object obj = json::object(); + obj["peerId"].set(peer.peerId); + obj["originFnePeerId"].set(peer.originFnePeerId); + obj["peerName"].set(peer.peerName); + obj["sequence"].set(peer.sequence); + obj["updatedAt"].set(peer.updatedAt); + obj["expiresAt"].set(peer.expiresAt); + + json::array patches = json::array(); + for (const PatchRecord& patch : peer.patches) + patches.push_back(json::value(patchToJson(patch))); + obj["patches"].set(patches); + + return obj; +} + +/* Parses one patch record from JSON. */ + +bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const +{ + if (obj["patchId"].is()) + patch.patchId = obj["patchId"].get(); + + if (obj["active"].is()) + patch.active = obj["active"].get(); + + if (obj["oneWay"].is()) + patch.oneWay = obj["oneWay"].get(); + + if (!obj["members"].is()) { + errorMessage = "patch members was not a valid array"; + return false; + } + + json::array members = obj["members"].get(); + for (json::value& value : members) { + if (!value.is()) { + errorMessage = "patch members contained a non-object entry"; + return false; + } + + json::object memberObj = value.get(); + PatchMember member; + if (!parseMember(memberObj, member, errorMessage)) + return false; + + patch.members.push_back(member); + } + + return true; +} + +/* Parses one patch member from JSON. */ + +bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const +{ + if (obj["system"].is()) + member.system = obj["system"].get(); + + if (obj["mode"].is()) + member.mode = normalizeMode(obj["mode"].get()); + else + member.mode = "unknown"; + + if (!obj["tgid"].is()) { + errorMessage = "patch member tgid was not a valid integer"; + return false; + } + + member.tgid = obj["tgid"].get(); + if (member.tgid == 0U) { + errorMessage = "patch member tgid cannot be zero"; + return false; + } + + if (obj["slot"].is()) + member.slot = obj["slot"].get(); + else if (obj["slot"].is()) { + uint32_t slot = obj["slot"].get(); + if (slot > std::numeric_limits::max()) { + errorMessage = "patch member slot was out of range"; + return false; + } + member.slot = static_cast(slot); + } + + return true; +} + +/* Clamps a TTL value into the configured TTL range. */ + +uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const +{ + std::lock_guard guard(m_mutex); + return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds)); +} + +/* Creates a registry snapshot while the registry mutex is held. */ + +json::object PatchStatusRegistry::snapshotLocked() const +{ + json::object response = json::object(); + response["revision"].set(m_revision); + + json::array peers = json::array(); + json::array patches = json::array(); + json::object byTalkgroup = json::object(); + + for (const auto& entry : m_peerPatches) { + const PeerPatchSnapshot& peer = entry.second; + peers.push_back(json::value(peerSnapshotToJson(peer))); + + for (const PatchRecord& patch : peer.patches) { + json::object patchObj = patchToJson(patch); + patchObj["peerId"].set(peer.peerId); + patchObj["originFnePeerId"].set(peer.originFnePeerId); + patchObj["peerName"].set(peer.peerName); + patchObj["updatedAt"].set(peer.updatedAt); + patchObj["expiresAt"].set(peer.expiresAt); + patches.push_back(json::value(patchObj)); + + for (const PatchMember& member : patch.members) { + std::string key = buildTalkgroupKey(member); + json::array entries = json::array(); + if (byTalkgroup[key].is()) + entries = byTalkgroup[key].get(); + + json::object tgPatch = json::object(); + tgPatch["peerId"].set(peer.peerId); + tgPatch["originFnePeerId"].set(peer.originFnePeerId); + tgPatch["peerName"].set(peer.peerName); + tgPatch["patchId"].set(patch.patchId); + tgPatch["active"].set(patch.active); + tgPatch["oneWay"].set(patch.oneWay); + tgPatch["updatedAt"].set(peer.updatedAt); + tgPatch["expiresAt"].set(peer.expiresAt); + tgPatch["member"].set(memberToJson(member)); + entries.push_back(json::value(tgPatch)); + byTalkgroup[key].set(entries); + } + } + } + + response["peers"].set(peers); + response["patches"].set(patches); + response["byTalkgroup"].set(byTalkgroup); + return response; +} + +/* Advances the registry revision while the registry mutex is held. */ + +void PatchStatusRegistry::bumpRevisionLocked() +{ + m_revision++; + if (m_revision == 0U) + m_revision = 1U; +} diff --git a/src/fne/PatchStatusRegistry.h b/src/fne/PatchStatusRegistry.h new file mode 100644 index 00000000..3c8ee02c --- /dev/null +++ b/src/fne/PatchStatusRegistry.h @@ -0,0 +1,238 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Digital Voice Modem - Converged FNE Software + * GPLv2 Open Source. Use is subject to license terms. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright (C) 2026 DVMProject Authors + * + */ +/** + * @file PatchStatusRegistry.h + * @ingroup fne + */ +#if !defined(__FNE_PATCH_STATUS_REGISTRY_H__) +#define __FNE_PATCH_STATUS_REGISTRY_H__ + +#include "fne/Defines.h" +#include "common/json/json.h" + +#include +#include +#include +#include +#include +#include + +/** + * @brief In-memory registry for console-advertised patch status. + * + * The registry owns all patch status records published by console peers and + * replicated from neighboring FNEs. All access to the backing storage is + * serialized through the registry mutex; callers receive JSON snapshots and + * never receive direct references or iterators into the registry storage. + * + * @ingroup fne + */ +class HOST_SW_API PatchStatusRegistry { +public: + /** + * @brief Default number of seconds before a patch status record expires. + */ + static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U; + /** + * @brief Minimum accepted patch status TTL in seconds. + */ + static constexpr uint32_t MIN_TTL_SECONDS = 5U; + /** + * @brief Maximum accepted patch status TTL in seconds. + */ + static constexpr uint32_t MAX_TTL_SECONDS = 300U; + /** + * @brief Maximum wait time for long-poll style change requests. + */ + static constexpr uint32_t MAX_WAIT_MS = 30000U; + + /** + * @brief Initializes a new instance of the PatchStatusRegistry class. + */ + PatchStatusRegistry(); + /** + * @brief Finalizes an instance of the PatchStatusRegistry class. + */ + ~PatchStatusRegistry() = default; + + /** + * @brief Configures the accepted TTL range for patch status records. + * @param defaultTtlSeconds Default TTL used when a publish request omits a TTL. + * @param minTtlSeconds Minimum accepted TTL. + * @param maxTtlSeconds Maximum accepted TTL. + */ + void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds); + + /** + * @brief Publishes a complete patch snapshot for one console peer. + * @param request JSON request containing peerId, peerName, optional sequence, optional ttlSeconds, and patches. + * @param response JSON registry snapshot returned after the publish is applied. + * @param errorMessage Validation error text populated when the request is invalid. + * @returns bool True, if the publish request was valid and applied, otherwise false. + */ + bool publish(json::object& request, json::object& response, std::string& errorMessage); + /** + * @brief Removes all patch records associated with a console peer. + * @param peerId Console peer ID whose records should be removed. + * @returns bool True, if records were removed, otherwise false. + */ + bool removePeer(uint32_t peerId); + /** + * @brief Removes expired patch status records. + * @returns uint32_t Number of peer records removed. + */ + uint32_t cleanupExpired(); + + /** + * @brief Creates a complete JSON snapshot of the registry. + * @returns json::object Registry snapshot. + */ + json::object snapshot(); + /** + * @brief Waits for registry changes after the supplied revision. + * @param sinceRevision Revision already known to the caller. + * @param waitMs Maximum wait time in milliseconds. + * @returns json::object Registry snapshot after change or timeout. + */ + json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs); + + /** + * @brief Gets the current registry revision. + * @returns uint64_t Registry revision number. + */ + uint64_t revision() const; + /** + * @brief Gets the configured default patch status TTL. + * @returns uint32_t Default TTL in seconds. + */ + uint32_t defaultTtlSeconds() const; + /** + * @brief Gets the configured minimum patch status TTL. + * @returns uint32_t Minimum TTL in seconds. + */ + uint32_t minTtlSeconds() const; + /** + * @brief Gets the configured maximum patch status TTL. + * @returns uint32_t Maximum TTL in seconds. + */ + uint32_t maxTtlSeconds() const; + +private: + /** + * @brief Represents a talkgroup member participating in a patch. + */ + struct PatchMember { + std::string system; //!< System name for the member. + std::string mode; //!< Digital mode for the member. + uint32_t tgid = 0U; //!< Talkgroup ID for the member. + uint8_t slot = 0U; //!< Timeslot for the member, if applicable. + }; + + /** + * @brief Represents one active console patch. + */ + struct PatchRecord { + std::string patchId; //!< Console-defined patch ID. + bool active = true; //!< Flag indicating whether the patch is active. + bool oneWay = false; //!< Flag indicating whether the patch is one-way. + std::vector members; //!< Talkgroup members in the patch. + }; + + /** + * @brief Represents a console peer's complete patch status snapshot. + */ + struct PeerPatchSnapshot { + uint32_t peerId = 0U; //!< Console peer ID. + uint32_t originFnePeerId = 0U; //!< FNE peer ID where this status originated. + std::string peerName; //!< Console peer display name. + uint32_t sequence = 0U; //!< Console-defined sequence number. + uint64_t updatedAt = 0U; //!< Time this snapshot was accepted. + uint64_t expiresAt = 0U; //!< Time this snapshot expires. + std::vector patches; //!< Complete patch list for this console peer. + }; + + /** + * @brief Gets the current system time in milliseconds. + * @returns uint64_t Current time in milliseconds. + */ + static uint64_t nowMs(); + /** + * @brief Normalizes a mode string for key generation. + * @param mode Mode string. + * @returns std::string Normalized mode string. + */ + static std::string normalizeMode(const std::string& mode); + /** + * @brief Builds a stable talkgroup lookup key for a patch member. + * @param member Patch member. + * @returns std::string Talkgroup key. + */ + static std::string buildTalkgroupKey(const PatchMember& member); + /** + * @brief Serializes a patch member to JSON. + * @param member Patch member. + * @returns json::object JSON patch member. + */ + static json::object memberToJson(const PatchMember& member); + /** + * @brief Serializes a patch record to JSON. + * @param patch Patch record. + * @returns json::object JSON patch record. + */ + static json::object patchToJson(const PatchRecord& patch); + /** + * @brief Serializes a peer patch snapshot to JSON. + * @param peer Peer patch snapshot. + * @returns json::object JSON peer patch snapshot. + */ + static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); + + /** + * @brief Parses one patch record from JSON. + * @param obj JSON patch object. + * @param patch Parsed patch record. + * @param errorMessage Validation error text. + * @returns bool True, if parsed successfully, otherwise false. + */ + bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const; + /** + * @brief Parses one patch member from JSON. + * @param obj JSON patch member object. + * @param member Parsed patch member. + * @param errorMessage Validation error text. + * @returns bool True, if parsed successfully, otherwise false. + */ + bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const; + /** + * @brief Clamps a TTL value into the configured TTL range. + * @param ttlSeconds TTL in seconds. + * @returns uint32_t Clamped TTL in seconds. + */ + uint32_t clampTtl(uint32_t ttlSeconds) const; + /** + * @brief Creates a registry snapshot while the registry mutex is held. + * @returns json::object Registry snapshot. + */ + json::object snapshotLocked() const; + /** + * @brief Advances the registry revision while the registry mutex is held. + */ + void bumpRevisionLocked(); + + mutable std::mutex m_mutex; //!< Mutex guarding registry state. + std::condition_variable m_revisionChanged; //!< Condition variable signaled when revision changes. + std::map m_peerPatches; //!< Peer patch snapshots keyed by console peer ID. + uint64_t m_revision; //!< Monotonic registry revision. + uint32_t m_defaultTtlSeconds; //!< Default TTL in seconds. + uint32_t m_minTtlSeconds; //!< Minimum TTL in seconds. + uint32_t m_maxTtlSeconds; //!< Maximum TTL in seconds. +}; + +#endif // __FNE_PATCH_STATUS_REGISTRY_H__ diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index 560faaeb..c5702412 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -12,6 +12,7 @@ #include "common/Log.h" #include "common/Utils.h" #include "network/MetadataNetwork.h" +#include "common/json/json.h" #include "fne/ActivityLog.h" #include "HostFNE.h" @@ -375,6 +376,71 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } break; + case NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS: // Console Patch Status Transfer + { + if (pktPeerId > 0 && validPeerId) { + FNEPeerConnection* connection = network->m_peers[pktPeerId]; + if (connection != nullptr) { + if (!network->patchStatusEnabled()) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + break; + } + + std::string ip = udp::Socket::address(req->address); + + // Only authenticated console peers may publish or request patch registry state. + if (req->length <= 11U) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) { + DECLARE_UINT8_ARRAY(rawPayload, req->length - 11U); + ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U); + std::string payload(rawPayload, rawPayload + (req->length - 11U)); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + json::object reqObj = v.get(); + std::string type = "snapshot"; + if (reqObj["type"].is()) + type = reqObj["type"].get(); + + if (type == "request") { + json::object snapshot = network->patchStatusRegistry().snapshot(); + network->writePatchStatusToPeer(pktPeerId, snapshot); + break; + } + + // The authenticated peer identity is authoritative; do not allow spoofed peer IDs. + reqObj["peerId"].set(pktPeerId); + reqObj["originFnePeerId"].set(network->m_peerId); + if (!reqObj["peerName"].is() || reqObj["peerName"].get().empty()) + reqObj["peerName"].set(connection->identity()); + + json::object response = json::object(); + std::string errorMessage; + if (!network->patchStatusRegistry().publish(reqObj, response, errorMessage)) { + LogWarning(LOG_MASTER, "PEER %u (%s) invalid patch status payload, %s", pktPeerId, connection->identWithQualifier().c_str(), errorMessage.c_str()); + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + network->writePatchStatusToConsoles(response); + network->replicatePatchStatus(reqObj); + } + else { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } + break; default: network->writePeerNAK(peerId, network->createStreamId(), TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("Unknown transfer opcode from the peer", req->buffer, req->length); @@ -569,6 +635,89 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } } } + else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::REPL_PATCH_STATUS) { // Peer Replication Patch Status + if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { + FNEPeerConnection* connection = network->m_peers[peerId]; + if (connection != nullptr) { + std::string ip = udp::Socket::address(req->address); + + // validate peer (simple validation really) + if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_NEIGHBOR && + connection->isReplica()) { + DECLARE_UINT8_ARRAY(rawPayload, req->length); + ::memcpy(rawPayload, req->buffer, req->length); + + if (mdNetwork->m_peerPatchStatusPkt.find(peerId) == mdNetwork->m_peerPatchStatusPkt.end()) { + mdNetwork->m_peerPatchStatusPkt.insert(peerId, MetadataNetwork::PacketBufferEntry()); + + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + pkt.buffer = new PacketBuffer(true, "Peer Replication, Patch Status"); + pkt.streamId = streamId; + + pkt.locked = false; + } else { + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + if (!pkt.locked && pkt.streamId != streamId) { + LogError(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, stream ID mismatch, expected %u, got %u", peerId, + connection->identWithQualifier().c_str(), pkt.streamId, streamId); + pkt.buffer->clear(); + pkt.streamId = streamId; + } + + if (pkt.streamId != streamId) { + // otherwise drop the packet + break; + } + } + + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + if (pkt.locked) { + while (pkt.locked) + Thread::sleep(1U); + } + + pkt.locked = true; + + uint32_t decompressedLen = 0U; + uint8_t* decompressed = nullptr; + + if (pkt.buffer->decode(rawPayload, &decompressed, &decompressedLen)) { + mdNetwork->m_peerPatchStatusPkt.lock(); + std::string payload(decompressed + 8U, decompressed + decompressedLen); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + LogError(LOG_REPL, "PEER %u (%s) error parsing patch status replication, %s", peerId, connection->identWithQualifier().c_str(), err.c_str()); + pkt.buffer->clear(); + pkt.streamId = 0U; + if (decompressed != nullptr) { + delete[] decompressed; + } + mdNetwork->m_peerPatchStatusPkt.unlock(); + mdNetwork->m_peerPatchStatusPkt.erase(peerId); + break; + } + + network->processReplicatedPatchStatus(peerId, v.get()); + + pkt.buffer->clear(); + delete pkt.buffer; + pkt.streamId = 0U; + if (decompressed != nullptr) { + delete[] decompressed; + } + mdNetwork->m_peerPatchStatusPkt.unlock(); + mdNetwork->m_peerPatchStatusPkt.erase(peerId); + } else { + pkt.locked = false; + } + } else { + network->writePeerNAK(peerId, 0U, TAG_PEER_REPLICA, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } break; case NET_FUNC::NET_TREE: diff --git a/src/fne/network/MetadataNetwork.h b/src/fne/network/MetadataNetwork.h index 62629a2e..dcc6dda4 100644 --- a/src/fne/network/MetadataNetwork.h +++ b/src/fne/network/MetadataNetwork.h @@ -117,6 +117,7 @@ namespace network bool locked; }; concurrent::unordered_map m_peerReplicaActPkt; + concurrent::unordered_map m_peerPatchStatusPkt; concurrent::unordered_map m_peerTreeListPkt; ThreadPool m_threadPool; diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index b2d517cb..477ad985 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -48,6 +48,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc m_analogCallback(nullptr), m_netTreeDiscCallback(nullptr), m_peerReplicaCallback(nullptr), + m_patchStatusCallback(nullptr), m_masterPeerId(0U), m_pidLookup(nullptr), m_peerReplica(false), @@ -55,6 +56,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc m_tgidPkt(true, "Peer Replication, TGID List"), m_ridPkt(true, "Peer Replication, RID List"), m_pidPkt(true, "Peer Replication, PID List"), + m_patchStatusPkt(true, "Peer Replication, Patch Status"), m_threadPool(WORKER_CNT, "peer"), m_prevSpanningTreeChildren(0U), m_nakFallOver(false), @@ -233,6 +235,40 @@ bool PeerNetwork::writeHAParams(std::vector& haParams) return false; } +/* Writes a complete console patch status update upstream. */ + +bool PeerNetwork::writePatchStatus(json::object obj) +{ + if (!m_peerReplica) + return false; + + obj["type"].set("publish"); + json::value v = json::value(obj); + std::string json = std::string(v.serialize()); + + size_t len = json.length() + 9U; + DECLARE_CHAR_ARRAY(buffer, len); + + ::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + PacketBuffer pkt(true, "Peer Replication, Patch Status"); + pkt.encode((uint8_t*)buffer, len); + + uint32_t streamId = createStreamId(); + LogInfoEx(LOG_REPL, "PEER %u Peer Replication, Patch Status, blocks %u, streamId = %u", m_peerId, pkt.fragments.size(), streamId); + if (pkt.fragments.size() > 0U) { + for (auto frag : pkt.fragments) { + writeMaster({ NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId, true); + Thread::sleep(60U); // pace block transmission + } + } + + pkt.clear(); + return true; +} + // --------------------------------------------------------------------------- // Protected Class Members // --------------------------------------------------------------------------- @@ -462,6 +498,32 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco } break; + case NET_SUBFUNC::REPL_PATCH_STATUS: // Patch Status + { + uint32_t decompressedLen = 0U; + uint8_t* decompressed = nullptr; + + if (m_patchStatusPkt.decode(data, &decompressed, &decompressedLen)) { + std::string payload(decompressed + 8U, decompressed + decompressedLen); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + LogError(LOG_PEER, "PEER %u error parsing patch status replication, %s", m_peerId, err.c_str()); + m_patchStatusPkt.clear(); + delete[] decompressed; + break; + } + + if (m_patchStatusCallback != nullptr) + m_patchStatusCallback(this, v.get()); + + m_patchStatusPkt.clear(); + delete[] decompressed; + } + } + break; + default: break; } diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h index 48fbf40b..74f3c8a4 100644 --- a/src/fne/network/PeerNetwork.h +++ b/src/fne/network/PeerNetwork.h @@ -17,6 +17,7 @@ #define __PEER_NETWORK_H__ #include "Defines.h" +#include "common/json/json.h" #include "common/lookups/PeerListLookup.h" #include "common/network/Network.h" #include "common/network/PacketBuffer.h" @@ -138,9 +139,14 @@ namespace network void setNetTreeDiscCallback(std::function&& callback) { m_netTreeDiscCallback = callback; } /** * @brief Helper to set the peer replica notification callback. - * @param callback + * @param callback */ void setNotifyPeerReplicaCallback(std::function&& callback) { m_peerReplicaCallback = callback; } + /** + * @brief Helper to set the peer patch status callback. + * @param callback + */ + void setPatchStatusCallback(std::function&& callback) { m_patchStatusCallback = callback; } /** * @brief Writes a complete update of this CFNE's active peer list to the network. @@ -235,6 +241,26 @@ namespace network * @returns bool True, if list was sent, otherwise false. */ bool writeHAParams(std::vector& haParams); + /** + * @brief Writes a complete console patch status update upstream. + * \code{.unparsed} + * The patch status replication message is a JSON body, and is a packet + * buffer compressed message. + * + * { + * "type": "publish", + * "peerId": , + * "originFnePeerId": , + * "peerName": "", + * "sequence": , + * "ttlSeconds": , + * "patches": [] + * } + * \endcode + * @param obj JSON patch status publish payload. + * @returns bool True, if patch status was sent, otherwise false. + */ + bool writePatchStatus(json::object obj); /** * @brief Returns flag indicating whether or not this peer connection is peer replication enabled. @@ -298,6 +324,10 @@ namespace network * @brief Peer Replica Notification Callback. */ std::function m_peerReplicaCallback; + /** + * @brief Peer patch status callback. + */ + std::function m_patchStatusCallback; /** * @brief User overrideable handler that allows user code to process network packets not handled by this class. @@ -336,6 +366,7 @@ namespace network PacketBuffer m_tgidPkt; PacketBuffer m_ridPkt; PacketBuffer m_pidPkt; + PacketBuffer m_patchStatusPkt; ThreadPool m_threadPool; diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index 176a80ae..16b23cf3 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -109,6 +109,8 @@ TrafficNetwork::TrafficNetwork(HostFNE* host, const std::string& address, uint16 m_maintainenceTimer(1000U, pingTime), m_updateLookupTimer(1000U, (updateLookupTime * 60U)), m_haUpdateTimer(1000U, FIXED_HA_UPDATE_INTERVAL), + m_patchStatusRegistry(), + m_patchStatusEnabled(true), m_softConnLimit(0U), m_enableSpanningTree(true), m_logSpanningTreeChanges(false), @@ -229,6 +231,13 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions) m_logSpanningTreeChanges = conf["logSpanningTreeChanges"].as(false); m_spanningTreeFastReconnect = conf["spanningTreeFastReconnect"].as(true); + yaml::Node patchStatusConf = conf["patchStatus"]; + m_patchStatusEnabled = patchStatusConf["enabled"].as(true); + uint32_t patchStatusDefaultTtl = patchStatusConf["defaultTtlSeconds"].as(PatchStatusRegistry::DEFAULT_TTL_SECONDS); + uint32_t patchStatusMinTtl = patchStatusConf["minTtlSeconds"].as(PatchStatusRegistry::MIN_TTL_SECONDS); + uint32_t patchStatusMaxTtl = patchStatusConf["maxTtlSeconds"].as(PatchStatusRegistry::MAX_TTL_SECONDS); + m_patchStatusRegistry.configure(patchStatusDefaultTtl, patchStatusMinTtl, patchStatusMaxTtl); + // always force disable ADJ_STS_BCAST to neighbor FNE peers if the all option // is enabled if (m_disallowAdjStsBcast) { @@ -350,6 +359,12 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions) LogInfo(" Enable Peer Spanning Tree: %s", m_enableSpanningTree ? "yes" : "no"); LogInfo(" Log Spanning Tree Changes: %s", m_logSpanningTreeChanges ? "yes" : "no"); LogInfo(" Spanning Tree Allow Fast Reconnect: %s", m_spanningTreeFastReconnect ? "yes" : "no"); + LogInfo(" Console Patch Status Enabled: %s", m_patchStatusEnabled ? "yes" : "no"); + if (m_patchStatusEnabled) { + LogInfo(" Console Patch Status Default TTL: %us", m_patchStatusRegistry.defaultTtlSeconds()); + LogInfo(" Console Patch Status Minimum TTL: %us", m_patchStatusRegistry.minTtlSeconds()); + LogInfo(" Console Patch Status Maximum TTL: %us", m_patchStatusRegistry.maxTtlSeconds()); + } LogInfo(" Disable adjacent site broadcasts to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no"); if (m_disallowAdjStsBcast) { LogWarning(LOG_MASTER, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); @@ -517,6 +532,27 @@ void TrafficNetwork::processNetworkTreeDisconnect(uint32_t peerId, uint32_t offe } } +/* Processes a replicated console patch status update. */ + +void TrafficNetwork::processReplicatedPatchStatus(uint32_t peerId, json::object obj) +{ + if (!m_patchStatusEnabled) + return; + + if (!obj["peerId"].is() || !obj["patches"].is()) + return; + + json::object response = json::object(); + std::string errorMessage; + if (!m_patchStatusRegistry.publish(obj, response, errorMessage)) { + LogWarning(LOG_MASTER, "PEER %u invalid replicated patch status payload, %s", peerId, errorMessage.c_str()); + return; + } + + writePatchStatusToConsoles(response); + replicatePatchStatus(obj, peerId); +} + /* Helper to process an downstream peer In-Call Control message. */ void TrafficNetwork::processDownstreamInCallCtrl(network::NET_ICC::ENUM command, network::NET_SUBFUNC::ENUM subFunc, uint32_t dstId, @@ -636,6 +672,8 @@ void TrafficNetwork::clock(uint32_t ms) } // cleanup possibly stale data calls + if (m_patchStatusEnabled && m_patchStatusRegistry.cleanupExpired() > 0U) + writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); m_tagDMR->packetData()->cleanupStale(); m_tagP25->packetData()->cleanupStale(); @@ -1571,6 +1609,8 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req) // spin up a thread and send metadata over to peer network->peerMetadataUpdate(peerId); + if (network->m_patchStatusEnabled && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) + network->writePatchStatusToPeer(peerId, network->patchStatusRegistry().snapshot()); } } } @@ -2304,6 +2344,20 @@ void TrafficNetwork::erasePeer(uint32_t peerId) } } + // erase any console patch status records for this peer + if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) { + json::object clearPatchStatus = json::object(); + uint32_t originFnePeerId = m_peerId; + uint32_t ttlSeconds = m_patchStatusRegistry.defaultTtlSeconds(); + clearPatchStatus["type"].set("publish"); + clearPatchStatus["peerId"].set(peerId); + clearPatchStatus["originFnePeerId"].set(originFnePeerId); + clearPatchStatus["ttlSeconds"].set(ttlSeconds); + clearPatchStatus["patches"].set(json::array()); + writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); + replicatePatchStatus(clearPatchStatus); + } + // erase any HA parameters for this peer { auto it = std::find_if(m_peerReplicaHAParams.begin(), m_peerReplicaHAParams.end(), [&](auto& x) { return x.peerId == peerId; }); @@ -2408,6 +2462,161 @@ json::object TrafficNetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *c return peerObj; } +/* Helper to send patch status state to one console peer. */ + +bool TrafficNetwork::writePatchStatusToPeer(uint32_t peerId, json::object obj) +{ + if (peerId == 0U) + return false; + if (!m_patchStatusEnabled) + return false; + + bool ret = false; + m_peers.shared_lock(); + auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); + if (it != m_peers.end() && it->second != nullptr) + ret = writePatchStatusPayload(it->second, obj); + m_peers.shared_unlock(); + + return ret; +} + +/* Helper to broadcast patch status state to connected console peers. */ + +void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId) +{ + if (!m_patchStatusEnabled) + return; + + m_peers.shared_lock(); + if (m_peers.size() == 0U) { + m_peers.shared_unlock(); + return; + } + + for (auto peer : m_peers) { + if (peer.first == exceptPeerId) + continue; + if (peer.second == nullptr) + continue; + if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_CONSOLE) + continue; + + writePatchStatusPayload(peer.second, obj); + } + m_peers.shared_unlock(); +} + +/* Helper to replicate patch status state to neighboring FNE peers. */ + +void TrafficNetwork::replicatePatchStatus(json::object obj, uint32_t exceptPeerId) +{ + if (!m_patchStatusEnabled) + return; + + obj["type"].set("publish"); + + if (m_host->m_peerNetworks.size() > 0U) { + for (auto peer : m_host->m_peerNetworks) { + if (peer.first == exceptPeerId) + continue; + if (peer.second != nullptr && peer.second->isEnabled() && peer.second->isReplica()) + peer.second->writePatchStatus(obj); + } + } + + m_peers.shared_lock(); + for (auto peer : m_peers) { + if (peer.first == exceptPeerId) + continue; + if (peer.second == nullptr) + continue; + if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !peer.second->isReplica()) + continue; + + writePatchStatusReplicationPayload(peer.second, obj); + } + m_peers.shared_unlock(); +} + +/* Helper to serialize and queue a patch status transfer payload. */ + +bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj) +{ + if (connection == nullptr) + return false; + if (!m_patchStatusEnabled) + return false; + if (!connection->connected()) + return false; + if (connection->peerClass() != PEER_CONN_CLASS_CONSOLE) + return false; + + obj["type"].set("registry"); + json::value v = json::value(obj); + std::string payload = std::string(v.serialize()); + uint32_t len = static_cast(payload.length()); + if ((len + 11U) > DATA_PACKET_LENGTH) { + LogError(LOG_MASTER, "PEER %u (%s) patch status registry payload too large, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); + return false; + } + + uint8_t buffer[DATA_PACKET_LENGTH]; + ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); + ::memcpy(buffer + 11U, payload.c_str(), len); + + sockaddr_storage addr = connection->socketStorage(); + uint32_t addrLen = connection->sockStorageLen(); + + if (m_debug) { + LogDebug(LOG_MASTER, "PEER %u (%s) sending patch status registry, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); + } + + return m_frameQueue->write(buffer, len + 11U, createStreamId(), connection->id(), m_peerId, + { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); +} + +/* Helper to serialize and queue a patch status replication payload. */ + +bool TrafficNetwork::writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj) +{ + if (connection == nullptr) + return false; + if (!m_patchStatusEnabled) + return false; + if (!connection->connected()) + return false; + if (connection->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !connection->isReplica()) + return false; + + obj["type"].set("publish"); + json::value v = json::value(obj); + std::string json = std::string(v.serialize()); + + size_t len = json.length() + 9U; + DECLARE_CHAR_ARRAY(buffer, len); + + ::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + PacketBuffer pkt(true, "Peer Replication, Patch Status"); + pkt.encode((uint8_t*)buffer, len); + + uint32_t streamId = createStreamId(); + LogInfoEx(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, blocks %u, streamId = %u", connection->id(), + connection->identWithQualifier().c_str(), pkt.fragments.size(), streamId); + if (pkt.fragments.size() > 0U) { + for (auto frag : pkt.fragments) { + writePeer(connection->id(), m_peerId, { NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId); + Thread::sleep(60U); // pace block transmission + } + } + + pkt.clear(); + return true; +} + /* Helper to reset a peer connection. */ bool TrafficNetwork::resetPeer(uint32_t peerId) diff --git a/src/fne/network/TrafficNetwork.h b/src/fne/network/TrafficNetwork.h index 27d7f4d3..1a1a9ccc 100644 --- a/src/fne/network/TrafficNetwork.h +++ b/src/fne/network/TrafficNetwork.h @@ -41,6 +41,7 @@ #include "fne/network/FNEPeerConnection.h" #include "fne/network/SpanningTree.h" #include "fne/network/HAParameters.h" +#include "fne/PatchStatusRegistry.h" #include "fne/CryptoContainer.h" #include @@ -236,6 +237,12 @@ namespace network * @param offendingPeerId Offending Peer ID. */ void processNetworkTreeDisconnect(uint32_t peerId, uint32_t offendingPeerId); + /** + * @brief Processes a replicated console patch status update. + * @param peerId Peer ID that delivered the replication update. + * @param obj Patch status JSON payload. + */ + void processReplicatedPatchStatus(uint32_t peerId, json::object obj); /** * @brief Helper to process an downstream peer In-Call Control message. @@ -274,6 +281,35 @@ namespace network * @return json::object */ json::object fneConnObject(uint32_t peerId, FNEPeerConnection* conn); + /** + * @brief Gets the console patch status registry. + * @return PatchStatusRegistry& Patch status registry. + */ + PatchStatusRegistry& patchStatusRegistry() { return m_patchStatusRegistry; } + /** + * @brief Flag indicating whether console patch status handling is enabled. + * @returns bool True, if enabled. + */ + bool patchStatusEnabled() const { return m_patchStatusEnabled; } + /** + * @brief Sends patch status registry state to one console peer. + * @param peerId Destination peer ID. + * @param obj Patch status JSON payload. + * @returns bool True, if the message was queued, otherwise false. + */ + bool writePatchStatusToPeer(uint32_t peerId, json::object obj); + /** + * @brief Broadcasts patch status registry state to connected console peers. + * @param obj Patch status JSON payload. + * @param exceptPeerId Optional peer ID to skip. + */ + void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U); + /** + * @brief Replicates patch status state to neighboring FNE peers. + * @param obj Patch status JSON payload. + * @param exceptPeerId Optional peer ID to skip. + */ + void replicatePatchStatus(json::object obj, uint32_t exceptPeerId = 0U); /** * @brief Helper to reset a peer connection. @@ -359,6 +395,9 @@ namespace network Timer m_updateLookupTimer; Timer m_haUpdateTimer; + PatchStatusRegistry m_patchStatusRegistry; + bool m_patchStatusEnabled; + uint32_t m_softConnLimit; bool m_enableSpanningTree; @@ -793,6 +832,21 @@ namespace network */ bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason, sockaddr_storage& addr, uint32_t addrLen); + /** + * @brief Serializes and queues a patch status transfer payload. + * @param connection Destination connection. + * @param obj Patch status JSON payload. + * @returns bool True, if message was queued, otherwise false. + */ + bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj); + /** + * @brief Serializes and queues a patch status replication payload. + * @param connection Destination neighbor connection. + * @param obj Patch status JSON payload. + * @returns bool True, if message was queued, otherwise false. + */ + bool writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj); + /* ** Internal KMM Callback. */