From 590903613541b78448ca410ec579e2b943913072 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 5 May 2026 14:39:13 -0700 Subject: [PATCH 1/5] feat: add server FDv2 data system orchestrator --- libs/server-sdk/src/CMakeLists.txt | 3 + .../change_notifier/change_notifier.cpp | 89 ++- .../change_notifier/change_notifier.hpp | 11 +- .../memory_store/memory_store.hpp | 7 +- .../itransactional_destination.hpp | 36 ++ .../source/ifdv2_initializer_factory.hpp | 27 + .../source/ifdv2_synchronizer_factory.hpp | 28 + .../data_systems/fdv2/fdv2_data_system.cpp | 313 ++++++++++ .../data_systems/fdv2/fdv2_data_system.hpp | 263 ++++++++ .../tests/fdv2_data_system_test.cpp | 590 ++++++++++++++++++ 10 files changed, 1359 insertions(+), 8 deletions(-) create mode 100644 libs/server-sdk/src/data_interfaces/destination/itransactional_destination.hpp create mode 100644 libs/server-sdk/src/data_interfaces/source/ifdv2_initializer_factory.hpp create mode 100644 libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp create mode 100644 libs/server-sdk/tests/fdv2_data_system_test.cpp diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 7f27984a4..bf3177473 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -39,6 +39,7 @@ target_sources(${LIBNAME} data_components/dependency_tracker/dependency_tracker.cpp data_components/expiration_tracker/expiration_tracker.hpp data_components/expiration_tracker/expiration_tracker.cpp + data_interfaces/destination/itransactional_destination.hpp data_components/memory_store/memory_store.hpp data_components/memory_store/memory_store.cpp data_components/serialization_adapters/json_deserializer.hpp @@ -59,6 +60,8 @@ target_sources(${LIBNAME} data_systems/fdv2/polling_synchronizer.cpp data_systems/fdv2/streaming_synchronizer.hpp data_systems/fdv2/streaming_synchronizer.cpp + data_systems/fdv2/fdv2_data_system.hpp + data_systems/fdv2/fdv2_data_system.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp b/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp index 71990ae09..f42991fcf 100644 --- a/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp +++ b/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp @@ -1,10 +1,24 @@ #include "change_notifier.hpp" #include + #include +#include +#include namespace launchdarkly::server_side::data_components { +namespace { + +template +struct overloaded : Ts... { + using Ts::operator()...; +}; +template +overloaded(Ts...) -> overloaded; + +} // namespace + std::unique_ptr ChangeNotifier::OnFlagChange( ChangeHandler handler) { std::lock_guard lock{signal_mutex_}; @@ -55,6 +69,79 @@ void ChangeNotifier::Upsert(std::string const& key, std::move(segment)); } +void ChangeNotifier::Apply( + data_model::ChangeSet change_set) { + if (change_set.type == data_model::ChangeSetType::kNone) { + return; + } + + // Compute changed dependencies before passing the changeset to the sink. + std::optional change_notifications; + if (HasListeners()) { + DependencySet affected; + if (change_set.type == data_model::ChangeSetType::kFull) { + // Group items by kind so the existing per-kind diff helper can + // compare the new state to the existing store contents. + Collection new_flags; + Collection new_segments; + for (auto const& change : change_set.data) { + std::visit( + overloaded{ + [&](data_model::ItemDescriptor const& + f) { new_flags.emplace(change.key, f); }, + [&](data_model::ItemDescriptor< + data_model::Segment> const& s) { + new_segments.emplace(change.key, s); + }, + }, + change.object); + } + CalculateChanges(DataKind::kFlag, source_.AllFlags(), new_flags, + affected); + CalculateChanges(DataKind::kSegment, source_.AllSegments(), + new_segments, affected); + } else { + // Partial: every item in the changeset is treated as a change; + // no version comparison. + for (auto const& change : change_set.data) { + std::visit(overloaded{ + [&](data_model::ItemDescriptor< + data_model::Flag> const&) { + dependency_tracker_.CalculateChanges( + DataKind::kFlag, change.key, affected); + }, + [&](data_model::ItemDescriptor< + data_model::Segment> const&) { + dependency_tracker_.CalculateChanges( + DataKind::kSegment, change.key, + affected); + }, + }, + change.object); + } + } + change_notifications = std::move(affected); + } + + // Update the dependency tracker. + if (change_set.type == data_model::ChangeSetType::kFull) { + dependency_tracker_.Clear(); + } + for (auto const& change : change_set.data) { + std::visit( + [&](auto const& descriptor) { + dependency_tracker_.UpdateDependencies(change.key, descriptor); + }, + change.object); + } + + sink_.Apply(std::move(change_set)); + + if (change_notifications) { + NotifyChanges(std::move(*change_notifications)); + } +} + bool ChangeNotifier::HasListeners() const { std::lock_guard lock{signal_mutex_}; return !signals_.empty(); @@ -69,7 +156,7 @@ void ChangeNotifier::NotifyChanges(DependencySet changes) { } } -ChangeNotifier::ChangeNotifier(IDestination& sink, +ChangeNotifier::ChangeNotifier(data_interfaces::ITransactionalDestination& sink, data_interfaces::IStore const& source) : sink_(sink), source_(source) {} diff --git a/libs/server-sdk/src/data_components/change_notifier/change_notifier.hpp b/libs/server-sdk/src/data_components/change_notifier/change_notifier.hpp index 87518eabf..ed0ca1c64 100644 --- a/libs/server-sdk/src/data_components/change_notifier/change_notifier.hpp +++ b/libs/server-sdk/src/data_components/change_notifier/change_notifier.hpp @@ -1,6 +1,6 @@ #pragma once -#include "../../data_interfaces/destination/idestination.hpp" +#include "../../data_interfaces/destination/itransactional_destination.hpp" #include "../../data_interfaces/store/istore.hpp" #include "../dependency_tracker/dependency_tracker.hpp" @@ -13,7 +13,7 @@ namespace launchdarkly::server_side::data_components { -class ChangeNotifier final : public data_interfaces::IDestination, +class ChangeNotifier final : public data_interfaces::ITransactionalDestination, public IChangeNotifier { public: template @@ -26,7 +26,8 @@ class ChangeNotifier final : public data_interfaces::IDestination, using SharedCollection = std::unordered_map>; - ChangeNotifier(IDestination& sink, data_interfaces::IStore const& source); + ChangeNotifier(data_interfaces::ITransactionalDestination& sink, + data_interfaces::IStore const& source); std::unique_ptr OnFlagChange(ChangeHandler handler) override; @@ -35,6 +36,8 @@ class ChangeNotifier final : public data_interfaces::IDestination, data_model::FlagDescriptor flag) override; void Upsert(std::string const& key, data_model::SegmentDescriptor segment) override; + void Apply(data_model::ChangeSet change_set) + override; [[nodiscard]] std::string const& Identity() const override; @@ -105,7 +108,7 @@ class ChangeNotifier final : public data_interfaces::IDestination, void NotifyChanges(DependencySet changes); - IDestination& sink_; + data_interfaces::ITransactionalDestination& sink_; data_interfaces::IStore const& source_; boost::signals2::signal)> signals_; diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index 7bda17d3f..3dc73272c 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -1,6 +1,6 @@ #pragma once -#include "../../data_interfaces/destination/idestination.hpp" +#include "../../data_interfaces/destination/itransactional_destination.hpp" #include "../../data_interfaces/item_change.hpp" #include "../../data_interfaces/store/istore.hpp" @@ -14,7 +14,7 @@ namespace launchdarkly::server_side::data_components { class MemoryStore final : public data_interfaces::IStore, - public data_interfaces::IDestination { + public data_interfaces::ITransactionalDestination { public: [[nodiscard]] std::shared_ptr GetFlag( std::string const& key) const override; @@ -47,7 +47,8 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); - void Apply(data_model::ChangeSet changeSet); + void Apply(data_model::ChangeSet changeSet) + override; MemoryStore() = default; ~MemoryStore() override = default; diff --git a/libs/server-sdk/src/data_interfaces/destination/itransactional_destination.hpp b/libs/server-sdk/src/data_interfaces/destination/itransactional_destination.hpp new file mode 100644 index 000000000..fe503c693 --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/destination/itransactional_destination.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "../item_change.hpp" +#include "idestination.hpp" + +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * ITransactionalDestination extends IDestination with the ability to apply + * an FDv2 changeset atomically. + * + * A changeset is a batch of flag and segment upserts and deletions that must + * be applied as a unit; readers must never observe a partially applied + * changeset. + */ +class ITransactionalDestination : public IDestination { + public: + /** + * Applies an FDv2 changeset to the destination atomically. + */ + virtual void Apply(data_model::ChangeSet change_set) = 0; + + ITransactionalDestination(ITransactionalDestination const&) = delete; + ITransactionalDestination(ITransactionalDestination&&) = delete; + ITransactionalDestination& operator=(ITransactionalDestination const&) = + delete; + ITransactionalDestination& operator=(ITransactionalDestination&&) = delete; + ~ITransactionalDestination() override = default; + + protected: + ITransactionalDestination() = default; +}; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer_factory.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer_factory.hpp new file mode 100644 index 000000000..613135d9c --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer_factory.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "ifdv2_initializer.hpp" + +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * Builds new IFDv2Initializer instances on demand. Each call to Build() + * produces a fresh initializer that has not yet been started. + */ +class IFDv2InitializerFactory { + public: + virtual std::unique_ptr Build() = 0; + + virtual ~IFDv2InitializerFactory() = default; + IFDv2InitializerFactory(IFDv2InitializerFactory const&) = delete; + IFDv2InitializerFactory(IFDv2InitializerFactory&&) = delete; + IFDv2InitializerFactory& operator=(IFDv2InitializerFactory const&) = delete; + IFDv2InitializerFactory& operator=(IFDv2InitializerFactory&&) = delete; + + protected: + IFDv2InitializerFactory() = default; +}; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp new file mode 100644 index 000000000..fc3c6421a --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "ifdv2_synchronizer.hpp" + +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * Builds new IFDv2Synchronizer instances on demand. Each call to Build() + * produces a fresh synchronizer that has not yet been started. + */ +class IFDv2SynchronizerFactory { + public: + virtual std::unique_ptr Build() = 0; + + virtual ~IFDv2SynchronizerFactory() = default; + IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete; + IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete; + IFDv2SynchronizerFactory& operator=(IFDv2SynchronizerFactory const&) = + delete; + IFDv2SynchronizerFactory& operator=(IFDv2SynchronizerFactory&&) = delete; + + protected: + IFDv2SynchronizerFactory() = default; +}; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp new file mode 100644 index 000000000..073a4ed70 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -0,0 +1,313 @@ +#include "fdv2_data_system.hpp" + +#include + +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +namespace { + +template +struct overloaded : Ts... { + using Ts::operator()...; +}; +template +overloaded(Ts...) -> overloaded; + +// Default until fallback/recovery is implemented. +constexpr std::chrono::hours kSynchronizerNextTimeout{24}; + +} // namespace + +FDv2DataSystem::FDv2DataSystem( + std::vector> + initializer_factories, + std::vector> + synchronizer_factories, + boost::asio::any_io_executor ioc, + data_components::DataSourceStatusManager* status_manager, + Logger const& logger) + : logger_(logger), + ioc_(std::move(ioc)), + initializer_factories_(std::move(initializer_factories)), + synchronizer_factories_(std::move(synchronizer_factories)), + status_manager_(status_manager), + store_(), + change_notifier_(store_, store_), + closed_(false), + selector_(), + initializer_index_(0), + synchronizer_index_(0), + active_initializer_(nullptr), + active_synchronizer_(nullptr) {} + +FDv2DataSystem::~FDv2DataSystem() { + Close(); +} + +void FDv2DataSystem::Close() { + std::lock_guard lock(mutex_); + closed_ = true; + if (active_initializer_) { + active_initializer_->Close(); + } + if (active_synchronizer_) { + active_synchronizer_->Close(); + } + status_manager_->SetState(DataSourceStatus::DataSourceState::kOff); +} + +std::shared_ptr FDv2DataSystem::GetFlag( + std::string const& key) const { + return store_.GetFlag(key); +} + +std::shared_ptr FDv2DataSystem::GetSegment( + std::string const& key) const { + return store_.GetSegment(key); +} + +std::unordered_map> +FDv2DataSystem::AllFlags() const { + return store_.AllFlags(); +} + +std::unordered_map> +FDv2DataSystem::AllSegments() const { + return store_.AllSegments(); +} + +std::string const& FDv2DataSystem::Identity() const { + static std::string const identity = "fdv2"; + return identity; +} + +void FDv2DataSystem::Initialize() { + LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": starting"; + if (initializer_factories_.empty() && synchronizer_factories_.empty()) { + // Offline mode: empty store is the canonical state. + status_manager_->SetState(DataSourceStatus::DataSourceState::kValid); + return; + } + boost::asio::post(ioc_, [this]() { RunNextInitializer(); }); +} + +void FDv2DataSystem::RunNextInitializer() { + auto future = async::MakeFuture(data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}); + bool exhausted = false; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + if (initializer_index_ >= initializer_factories_.size()) { + exhausted = true; + } else { + auto& factory = initializer_factories_[initializer_index_++]; + active_initializer_ = factory->Build(); + future = active_initializer_->Run(); + } + } + + if (exhausted) { + StartSynchronizers(); + return; + } + + std::move(future).Then( + [this]( + data_interfaces::FDv2SourceResult const& result) -> std::monostate { + OnInitializerResult(result); + return {}; + }, + async::kInlineExecutor); +} + +void FDv2DataSystem::OnInitializerResult( + data_interfaces::FDv2SourceResult result) { + using Result = data_interfaces::FDv2SourceResult; + + bool got_basis = false; + bool got_shutdown = false; + + std::visit( + overloaded{ + [&](Result::ChangeSet& cs) { + bool const has_selector = + cs.change_set.selector.value.has_value(); + ApplyChangeSet(std::move(cs.change_set)); + if (has_selector) { + got_basis = true; + } + }, + [&](Result::Shutdown&) { got_shutdown = true; }, + [&](Result::Interrupted const& iv) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": initializer interrupted: " << iv.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + iv.error.Kind(), iv.error.Message()); + }, + [&](Result::TerminalError const& te) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": initializer terminal error: " << te.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + te.error.Kind(), te.error.Message()); + }, + [&](Result::Goodbye const&) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": initializer received unexpected goodbye"; + }, + [&](Result::Timeout const&) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() << ": initializer timed out (unexpected)"; + }, + }, + result.value); + + { + std::lock_guard lock(mutex_); + active_initializer_.reset(); + if (closed_ || got_shutdown) { + return; + } + } + + if (got_basis) { + StartSynchronizers(); + } else { + RunNextInitializer(); + } +} + +void FDv2DataSystem::StartSynchronizers() { + bool exhausted = false; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + if (synchronizer_index_ >= synchronizer_factories_.size()) { + exhausted = true; + } else { + auto& factory = synchronizer_factories_[synchronizer_index_++]; + active_synchronizer_ = factory->Build(); + } + } + + if (exhausted) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() << ": no synchronizers available"; + return; + } + + RunSynchronizerNext(); +} + +void FDv2DataSystem::RunSynchronizerNext() { + auto future = async::MakeFuture(data_interfaces::FDv2SourceResult{ + data_interfaces::FDv2SourceResult::Shutdown{}}); + { + std::lock_guard lock(mutex_); + if (closed_ || !active_synchronizer_) { + return; + } + future = + active_synchronizer_->Next(kSynchronizerNextTimeout, selector_); + } + + std::move(future).Then( + [this]( + data_interfaces::FDv2SourceResult const& result) -> std::monostate { + OnSynchronizerResult(result); + return {}; + }, + async::kInlineExecutor); +} + +void FDv2DataSystem::OnSynchronizerResult( + data_interfaces::FDv2SourceResult result) { + using Result = data_interfaces::FDv2SourceResult; + + bool got_shutdown = false; + bool advance = false; + + std::visit( + overloaded{ + [&](Result::ChangeSet& cs) { + ApplyChangeSet(std::move(cs.change_set)); + }, + [&](Result::Shutdown&) { got_shutdown = true; }, + [&](Result::Interrupted const& iv) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": synchronizer interrupted: " << iv.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + iv.error.Kind(), iv.error.Message()); + }, + [&](Result::TerminalError const& te) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": synchronizer terminal error: " << te.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + te.error.Kind(), te.error.Message()); + advance = true; + }, + [&](Result::Goodbye const& gb) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": synchronizer goodbye" + << (gb.reason ? ": " + *gb.reason : ""); + advance = true; + }, + [&](Result::Timeout const&) { + LD_LOG(logger_, LogLevel::kDebug) + << Identity() << ": synchronizer timed out; retrying"; + }, + }, + result.value); + + { + std::lock_guard lock(mutex_); + if (closed_ || got_shutdown) { + active_synchronizer_.reset(); + return; + } + if (advance) { + active_synchronizer_.reset(); + } + } + + if (advance) { + StartSynchronizers(); + } else { + RunSynchronizerNext(); + } +} + +void FDv2DataSystem::ApplyChangeSet( + data_model::ChangeSet change_set) { + if (change_set.selector.value.has_value()) { + std::lock_guard lock(mutex_); + selector_ = change_set.selector; + } + change_notifier_.Apply(std::move(change_set)); + status_manager_->SetState(DataSourceStatus::DataSourceState::kValid); +} + +bool FDv2DataSystem::Initialized() const { + return store_.Initialized(); +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp new file mode 100644 index 000000000..c2b294f07 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -0,0 +1,263 @@ +#pragma once + +#include "../../data_components/change_notifier/change_notifier.hpp" +#include "../../data_components/memory_store/memory_store.hpp" +#include "../../data_components/status_notifications/data_source_status_manager.hpp" +#include "../../data_interfaces/source/ifdv2_initializer_factory.hpp" +#include "../../data_interfaces/source/ifdv2_synchronizer_factory.hpp" +#include "../../data_interfaces/system/idata_system.hpp" + +#include +#include + +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2DataSystem is the IDataSystem implementation for the FDv2 protocol. + * It runs a sequence of initializers to populate an in-memory store, then + * hands off to a synchronizer for ongoing updates. + * + * Lifecycle / call order: + * + * 1. Construct. + * 2. Call Initialize() exactly once. It returns immediately; orchestration + * runs asynchronously on the executor. + * 3. Call IStore methods (GetFlag/GetSegment/AllFlags/AllSegments) and + * Initialized() / Identity() any number of times, from any thread. + * 4. Drain the executor and join callers (see Destruction protocol). + * 5. Destroy. + * + * Initialize() may not be called more than once. + * + * Thread safety: + * + * - GetFlag, GetSegment, AllFlags, AllSegments, Initialized, and Identity + * are safe to call concurrently from any thread. + * - Initialize() is intended to be called once from a single thread. + * - The destructor must run with no other method calls in flight; see + * Destruction protocol below. + * + * Destruction protocol: + * + * The destructor cancels in-flight orchestration (closes the active + * source, emits status kOff), but does NOT block to drain executor + * callbacks that may already be queued. Before destroying, the caller + * must ensure both of: + * + * 1. The executor that orchestration callbacks run on has been stopped + * AND any thread running it has been joined. Otherwise a previously- + * scheduled callback may run and reference a destroyed object. + * 2. No other thread is currently calling a method on this object. + * + * The standard pattern is: + * + * ioc.stop(); // no new callbacks; running work returns + * run_thread.join(); // wait for the executor thread to exit + * // Now no thread can be touching this object. + * // Destroy FDv2DataSystem. + * + * ClientImpl's destructor uses this pattern (~ClientImpl performs + * ioc_.stop() and run_thread_.join() before any member is destroyed). + * + * Sources (initializers and synchronizers): + * + * Sources are constructed lazily via the factories supplied at + * construction. Each factory is invoked at most once during a run of the + * orchestration. A source is closed and destroyed when the orchestration + * finishes processing a result that ends its turn, or when the + * FDv2DataSystem is destroyed. At any given moment at most one source is + * active. + * + * Selector tracking: + * + * FDv2DataSystem tracks the most-recent non-empty selector seen on a + * ChangeSet and passes it into each subsequent synchronizer Next() call. + * The initial selector is empty. + * + * Orchestration phases: + * + * Initialize() + * | + * v + * +-------------------+ no factories given + * | Offline? |---------> [Done, status = kValid] + * +-------------------+ + * | + * v + * +-------------------+ initializer #N returns: + * | Initializer phase| ChangeSet(no selector) -> stay, N += 1 + * | N = 0, 1, 2, ... | ChangeSet(selector) -> go to Sync + * | | Interrupted/Terminal -> stay, N += 1 + * | | Goodbye/Timeout -> stay, N += 1 + * | | Shutdown -> [Closed] + * +-------------------+ + * | + * | (N exhausted, or basis received) + * v + * +-------------------+ synchronizer #M's Next returns: + * | Synchronizer | ChangeSet -> apply, loop + * | phase | Interrupted -> loop (source self-retries) + * | M = 0, 1, 2, ... | Timeout -> loop + * | | Goodbye/Term -> M += 1 + * | | Shutdown -> [Closed] + * +-------------------+ + * | + * | (M exhausted) + * v + * [Done; final status preserved] + * + * Calling the destructor at any time -> [Closed; status kOff]. + * + * Status transitions: + * + * kInitializing (initial) -> kValid on first successful ChangeSet apply. + * kInterrupted on Interrupted / TerminalError + * (filtered to kInitializing while still in + * the initializer phase if not yet Valid). + * kValid -> kInterrupted on errors; kOff in destructor. + * kInterrupted -> kValid on next successful ChangeSet apply; + * kOff in destructor. + * kOff -> terminal. + */ +class FDv2DataSystem final : public data_interfaces::IDataSystem { + public: + /** + * Constructs the data system. + * + * @param initializer_factories Factories that build initializers, run in + * order during the initialization phase. + * @param synchronizer_factories Factories that build synchronizers, used + * in order for ongoing updates after initialization. + * @param ioc Executor on which orchestration callbacks run. + * @param status_manager Non-owning. Must outlive this object; the caller + * is responsible for ensuring this. Used to publish data-source + * status transitions. + * @param logger Used for diagnostic logging. Held by value (Logger is + * internally thread-safe and cheap to copy). + */ + FDv2DataSystem( + std::vector> + initializer_factories, + std::vector> + synchronizer_factories, + boost::asio::any_io_executor ioc, + data_components::DataSourceStatusManager* status_manager, + Logger const& logger); + + ~FDv2DataSystem() override; + + FDv2DataSystem(FDv2DataSystem const&) = delete; + FDv2DataSystem(FDv2DataSystem&&) = delete; + FDv2DataSystem& operator=(FDv2DataSystem const&) = delete; + FDv2DataSystem& operator=(FDv2DataSystem&&) = delete; + + /** + * Returns the flag descriptor for the given key, or nullptr if no flag + * with that key is currently in the store. + */ + std::shared_ptr GetFlag( + std::string const& key) const override; + + /** + * Returns the segment descriptor for the given key, or nullptr if no + * segment with that key is currently in the store. + */ + std::shared_ptr GetSegment( + std::string const& key) const override; + + /** + * Returns all flag descriptors currently in the store, keyed by flag key. + * The returned map is a snapshot; subsequent updates are not reflected. + */ + std::unordered_map> + AllFlags() const override; + + /** + * Returns all segment descriptors currently in the store, keyed by + * segment key. The returned map is a snapshot; subsequent updates are + * not reflected. + */ + std::unordered_map> + AllSegments() const override; + + /** + * Returns a display-suitable name for the data system, used in + * diagnostic logging. + */ + std::string const& Identity() const override; + + /** + * Starts the orchestration: runs initializers in order on the executor, + * then hands off to a synchronizer for ongoing updates. Returns + * immediately; orchestration runs asynchronously. Must be called exactly + * once, before any IStore method is invoked. + */ + void Initialize() override; + + /** + * Returns true once the in-memory store has been populated for the first + * time. Naming follows the IDataSystem base interface (predates the + * IsX() convention). + */ + bool Initialized() const override; + + private: + /** + * Signals the orchestration loop to stop and closes any active source. + * Idempotent. Called from the destructor. + */ + void Close(); + + // Orchestration-loop methods. Each step chains the next via Future::Then, + // so at most one is in flight at a time. mutex_ guards shared state + // against the destructor's Close() running concurrently with a callback. + + void RunNextInitializer(); + void OnInitializerResult(data_interfaces::FDv2SourceResult result); + void StartSynchronizers(); + void RunSynchronizerNext(); + void OnSynchronizerResult(data_interfaces::FDv2SourceResult result); + + // Applies a typed FDv2 changeset to the in-memory store and updates the + // tracked selector if the changeset's selector is non-empty. + void ApplyChangeSet( + data_model::ChangeSet change_set); + + // Logger is itself thread-safe and cheap to copy. + Logger logger_; + + // Immutable after construction. + boost::asio::any_io_executor const ioc_; + std::vector> const + initializer_factories_; + std::vector< + std::unique_ptr> const + synchronizer_factories_; + // Non-owning. Lifetime guaranteed by the caller (see constructor doc). + data_components::DataSourceStatusManager* const status_manager_; + + // Internally synchronized. + data_components::MemoryStore store_; + // Holds references to store_; declared after it so destruction order is + // safe. + data_components::ChangeNotifier change_notifier_; + + // Orchestration state, guarded by mutex_. + std::mutex mutex_; + bool closed_; + data_model::Selector selector_; + std::size_t initializer_index_; + std::size_t synchronizer_index_; + std::unique_ptr active_initializer_; + std::unique_ptr active_synchronizer_; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp new file mode 100644 index 000000000..1839f7a2f --- /dev/null +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -0,0 +1,590 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +using namespace launchdarkly; +using namespace launchdarkly::server_side; +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; +using namespace std::chrono_literals; + +namespace { + +Logger MakeNullLogger() { + struct NullBackend : ILogBackend { + bool Enabled(LogLevel) noexcept override { return false; } + void Write(LogLevel, std::string) noexcept override {} + }; + return Logger{std::make_shared()}; +} + +// Initializer that resolves Run() with a single pre-set result. +class MockInitializer : public IFDv2Initializer { + public: + MockInitializer(FDv2SourceResult result, bool* closed_flag = nullptr) + : result_(std::move(result)), closed_flag_(closed_flag) {} + + async::Future Run() override { + return async::MakeFuture(std::move(result_)); + } + + void Close() override { + if (closed_flag_) { + *closed_flag_ = true; + } + } + + std::string const& Identity() const override { + static std::string const id = "mock initializer"; + return id; + } + + private: + FDv2SourceResult result_; + bool* closed_flag_; +}; + +// Synchronizer that resolves successive Next() calls from a queue of results. +// Once the queue is exhausted, returns Shutdown to terminate orchestration. +class MockSynchronizer : public IFDv2Synchronizer { + public: + using NextCall = std::pair; + + MockSynchronizer(std::vector results, + bool* closed_flag = nullptr, + std::vector* next_calls = nullptr) + : results_(std::move(results)), + closed_flag_(closed_flag), + next_calls_(next_calls) {} + + async::Future Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) override { + if (next_calls_) { + next_calls_->push_back({timeout, selector}); + } + if (call_index_ < results_.size()) { + return async::MakeFuture(std::move(results_[call_index_++])); + } + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + + void Close() override { + if (closed_flag_) { + *closed_flag_ = true; + } + } + + std::string const& Identity() const override { + static std::string const id = "mock synchronizer"; + return id; + } + + private: + std::vector results_; + std::size_t call_index_ = 0; + bool* closed_flag_; + std::vector* next_calls_; +}; + +// One-shot factory: returns a pre-supplied source on its first Build() call. +// Tracks build_count_ so tests can assert whether the factory was invoked. +class OneShotInitializerFactory : public IFDv2InitializerFactory { + public: + explicit OneShotInitializerFactory(std::unique_ptr source) + : source_(std::move(source)) {} + + std::unique_ptr Build() override { + ++build_count_; + return std::move(source_); + } + + int build_count_ = 0; + std::unique_ptr source_; +}; + +class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory { + public: + explicit OneShotSynchronizerFactory( + std::unique_ptr source) + : source_(std::move(source)) {} + + std::unique_ptr Build() override { + ++build_count_; + return std::move(source_); + } + + int build_count_ = 0; + std::unique_ptr source_; +}; + +data_model::Selector MakeSelector(std::int64_t version, std::string state) { + return data_model::Selector{ + data_model::Selector::State{version, std::move(state)}}; +} + +FDv2SourceResult MakeFullChangeSetResult(std::vector items, + data_model::Selector selector) { + return FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kFull, + std::move(items), + std::move(selector), + }, + false, + }}; +} + +} // namespace + +// ============================================================================ +// Lifecycle +// ============================================================================ + +TEST(FDv2DataSystemTest, OfflineMode_NoFactories_StatusValid) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + FDv2DataSystem ds({}, {}, ioc.get_executor(), &status_manager, logger); + + // Initialize with no sources; orchestration should not be posted. + ds.Initialize(); + + // Offline mode: status reaches Valid synchronously, store stays empty. + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kValid); + EXPECT_FALSE(ds.Initialized()); +} + +TEST(FDv2DataSystemTest, Destructor_TransitionsStatusToOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + { + FDv2DataSystem ds({}, {}, ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ASSERT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kValid); + } + + // After ~FDv2DataSystem, status is Off. + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kOff); +} + +// ============================================================================ +// Initializer phase +// ============================================================================ + +TEST(FDv2DataSystemTest, InitializerWithBasis_AppliesAndStatusValid) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + data_model::Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + auto initializer = + std::make_unique(MakeFullChangeSetResult( + ChangeSetData{ + ItemChange{"flagA", data_model::FlagDescriptor(flag_a)}, + }, + MakeSelector(1, "state-1"))); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), + &status_manager, logger); + + // Run the initializer to completion. + ds.Initialize(); + ioc.run(); + + // The Full changeset's flag is now visible and status is Valid. + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kValid); + EXPECT_TRUE(ds.Initialized()); + auto fetched = ds.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(1u, fetched->version); +} + +TEST(FDv2DataSystemTest, InitializerInterrupted_AdvancesToNextInitializer) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + data_model::Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + auto first = std::make_unique( + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, + "boom", std::chrono::system_clock::now()}, + false, + }}); + auto first_factory = + std::make_unique(std::move(first)); + auto* first_factory_ptr = first_factory.get(); + + auto second = std::make_unique(MakeFullChangeSetResult( + ChangeSetData{ + ItemChange{"flagA", data_model::FlagDescriptor(flag_a)}, + }, + MakeSelector(1, "state-1"))); + auto second_factory = + std::make_unique(std::move(second)); + auto* second_factory_ptr = second_factory.get(); + + std::vector> initializers; + initializers.push_back(std::move(first_factory)); + initializers.push_back(std::move(second_factory)); + + FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), + &status_manager, logger); + + // Run; first initializer fails, orchestrator should fall through to + // the second. + ds.Initialize(); + ioc.run(); + + // Both factories were used; data from the second is in the store. + EXPECT_EQ(1, first_factory_ptr->build_count_); + EXPECT_EQ(1, second_factory_ptr->build_count_); + EXPECT_TRUE(ds.Initialized()); + EXPECT_TRUE(ds.GetFlag("flagA")); + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kValid); +} + +TEST(FDv2DataSystemTest, + InitializerChangeSet_WithoutSelector_ContinuesToNextInitializer) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + data_model::Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + data_model::Flag flag_b; + flag_b.key = "flagB"; + flag_b.version = 1; + + // First initializer applies a Full changeset but with an EMPTY selector + // (no basis). Orchestrator should still try the next initializer. + auto first = std::make_unique(MakeFullChangeSetResult( + ChangeSetData{ + ItemChange{"flagA", data_model::FlagDescriptor(flag_a)}, + }, + data_model::Selector{})); + auto first_factory = + std::make_unique(std::move(first)); + auto* first_factory_ptr = first_factory.get(); + + // Second initializer applies a Partial changeset (does not clear the + // store) with a non-empty selector that ends the initializer phase. + auto second = std::make_unique( + FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kPartial, + ChangeSetData{ + ItemChange{"flagB", data_model::FlagDescriptor(flag_b)}, + }, + MakeSelector(1, "state-1"), + }, + false, + }}); + auto second_factory = + std::make_unique(std::move(second)); + auto* second_factory_ptr = second_factory.get(); + + std::vector> initializers; + initializers.push_back(std::move(first_factory)); + initializers.push_back(std::move(second_factory)); + + FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), + &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Both initializers ran. Both flags are present: flagA was applied by the + // first initializer (Full, no selector); flagB was applied by the second + // (Partial, with selector) which doesn't clear the store. + EXPECT_EQ(1, first_factory_ptr->build_count_); + EXPECT_EQ(1, second_factory_ptr->build_count_); + EXPECT_TRUE(ds.GetFlag("flagA")); + EXPECT_TRUE(ds.GetFlag("flagB")); +} + +TEST(FDv2DataSystemTest, + InitializerWithBasis_StopsBeforeRemainingInitializers) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + auto first = std::make_unique( + MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); + auto first_factory = + std::make_unique(std::move(first)); + auto* first_factory_ptr = first_factory.get(); + + // Second initializer should never be built. + auto second = std::make_unique( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + auto second_factory = + std::make_unique(std::move(second)); + auto* second_factory_ptr = second_factory.get(); + + std::vector> initializers; + initializers.push_back(std::move(first_factory)); + initializers.push_back(std::move(second_factory)); + + FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), + &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // First was built; second was skipped because the basis was already + // received. + EXPECT_EQ(1, first_factory_ptr->build_count_); + EXPECT_EQ(0, second_factory_ptr->build_count_); +} + +// ============================================================================ +// Synchronizer phase +// ============================================================================ + +TEST(FDv2DataSystemTest, SynchronizerChangeSet_AppliesAndStatusValid) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + data_model::Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 7; + + std::vector results; + results.push_back(MakeFullChangeSetResult( + ChangeSetData{ + ItemChange{"flagA", data_model::FlagDescriptor(flag_a)}, + }, + MakeSelector(7, "v7"))); + auto sync = std::make_unique(std::move(results)); + + std::vector> synchronizers; + synchronizers.push_back( + std::make_unique(std::move(sync))); + + FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), + &status_manager, logger); + + // No initializers; orchestrator should hand directly to the synchronizer. + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kValid); + EXPECT_TRUE(ds.Initialized()); + auto fetched = ds.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(7u, fetched->version); +} + +TEST(FDv2DataSystemTest, SynchronizerGoodbye_AdvancesToNextFactory) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + auto first = + std::make_unique(std::vector{ + FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}}}); + auto first_factory = + std::make_unique(std::move(first)); + auto* first_factory_ptr = first_factory.get(); + + auto second = std::make_unique( + std::vector{}); // empty -> Shutdown + auto second_factory = + std::make_unique(std::move(second)); + auto* second_factory_ptr = second_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(first_factory)); + synchronizers.push_back(std::move(second_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), + &status_manager, logger); + + // First synchronizer says Goodbye; orchestrator should rotate to the + // next factory. + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(1, first_factory_ptr->build_count_); + EXPECT_EQ(1, second_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, SynchronizerInterrupted_RetriesSameSynchronizer) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Single synchronizer: first Next returns Interrupted, then (after the + // orchestrator loops) a ChangeSet, then exhausts to Shutdown. There is + // no second factory, so a rotation would be observable as "no second + // build". Tests that Interrupted does NOT rotate. + std::vector results; + results.push_back(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, + "transient", std::chrono::system_clock::now()}, + false, + }}); + results.push_back( + MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); + auto sync = std::make_unique(std::move(results)); + + std::vector> synchronizers; + auto factory = + std::make_unique(std::move(sync)); + auto* factory_ptr = factory.get(); + synchronizers.push_back(std::move(factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), + &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Factory was built only once; the subsequent ChangeSet recovered the + // status to Valid. + EXPECT_EQ(1, factory_ptr->build_count_); + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kValid); +} + +TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Initializer provides a basis with selector v1/state-1. + auto initializer = std::make_unique( + MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + // Synchronizer first returns a partial changeset with a NEW selector, + // then exhausts (Shutdown) on the next call. + std::vector next_calls; + std::vector results; + results.push_back(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kPartial, + ChangeSetData{}, + MakeSelector(2, "state-2"), + }, + false, + }}); + auto sync = std::make_unique(std::move(results), nullptr, + &next_calls); + + std::vector> synchronizers; + synchronizers.push_back( + std::make_unique(std::move(sync))); + + FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), + ioc.get_executor(), &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Two Next calls: first with the initializer's selector, second with the + // selector updated by the partial changeset. + ASSERT_EQ(2u, next_calls.size()); + ASSERT_TRUE(next_calls[0].second.value.has_value()); + EXPECT_EQ(1, next_calls[0].second.value->version); + EXPECT_EQ("state-1", next_calls[0].second.value->state); + ASSERT_TRUE(next_calls[1].second.value.has_value()); + EXPECT_EQ(2, next_calls[1].second.value->version); + EXPECT_EQ("state-2", next_calls[1].second.value->state); +} + +TEST(FDv2DataSystemTest, + SynchronizerTerminalError_StatusInterruptedAndAdvance) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // First synchronizer applies a changeset (-> Valid), then fails terminally. + std::vector first_results; + first_results.push_back( + MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); + first_results.push_back(FDv2SourceResult{FDv2SourceResult::TerminalError{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, + "unauthorized", std::chrono::system_clock::now()}, + false, + }}); + auto first = std::make_unique(std::move(first_results)); + auto first_factory = + std::make_unique(std::move(first)); + auto* first_factory_ptr = first_factory.get(); + + auto second = std::make_unique( + std::vector{}); // empty -> Shutdown + auto second_factory = + std::make_unique(std::move(second)); + auto* second_factory_ptr = second_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(first_factory)); + synchronizers.push_back(std::move(second_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), + &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // First synchronizer set status to Valid, then the terminal error pushed + // it to Interrupted; after rotating to the second synchronizer (which + // immediately exits via Shutdown), Interrupted is the final non-Off + // state seen. + EXPECT_EQ(1, first_factory_ptr->build_count_); + EXPECT_EQ(1, second_factory_ptr->build_count_); + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kInterrupted); +} From d154cbe14c12ea3a289220c384f0620735d0d62a Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 5 May 2026 22:07:05 -0700 Subject: [PATCH 2/5] fix: server FDv2 goodbye behavior and orchestration polish --- .../change_notifier/change_notifier.cpp | 1 + .../data_systems/fdv2/fdv2_data_system.cpp | 90 ++++---- .../data_systems/fdv2/fdv2_data_system.hpp | 21 +- .../fdv2/streaming_synchronizer.cpp | 8 + .../server-sdk/tests/change_notifier_test.cpp | 198 ++++++++++++++++++ .../tests/fdv2_data_system_test.cpp | 78 ++++++- 6 files changed, 346 insertions(+), 50 deletions(-) diff --git a/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp b/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp index f42991fcf..76daefe10 100644 --- a/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp +++ b/libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp @@ -10,6 +10,7 @@ namespace launchdarkly::server_side::data_components { namespace { +// Lets std::visit dispatch to a different lambda per variant alternative. template struct overloaded : Ts... { using Ts::operator()...; diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index 073a4ed70..edbd55e4e 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -4,6 +4,7 @@ #include +#include #include #include #include @@ -12,6 +13,7 @@ namespace launchdarkly::server_side::data_systems { namespace { +// Lets std::visit dispatch to a different lambda per variant alternative. template struct overloaded : Ts... { using Ts::operator()...; @@ -39,6 +41,7 @@ FDv2DataSystem::FDv2DataSystem( status_manager_(status_manager), store_(), change_notifier_(store_, store_), + initialize_called_(false), closed_(false), selector_(), initializer_index_(0), @@ -88,6 +91,9 @@ std::string const& FDv2DataSystem::Identity() const { } void FDv2DataSystem::Initialize() { + bool const already_called = initialize_called_.exchange(true); + assert(!already_called && "Initialize() must be called at most once"); + LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": starting"; if (initializer_factories_.empty() && synchronizer_factories_.empty()) { // Offline mode: empty store is the canonical state. @@ -98,8 +104,6 @@ void FDv2DataSystem::Initialize() { } void FDv2DataSystem::RunNextInitializer() { - auto future = async::MakeFuture(data_interfaces::FDv2SourceResult{ - data_interfaces::FDv2SourceResult::Shutdown{}}); bool exhausted = false; { std::lock_guard lock(mutex_); @@ -111,22 +115,21 @@ void FDv2DataSystem::RunNextInitializer() { } else { auto& factory = initializer_factories_[initializer_index_++]; active_initializer_ = factory->Build(); - future = active_initializer_->Run(); + active_initializer_->Run().Then( + [this](data_interfaces::FDv2SourceResult const& result) + -> std::monostate { + OnInitializerResult(result); + return {}; + }, + [ioc = ioc_](async::Continuation work) { + boost::asio::post(ioc, std::move(work)); + }); } } if (exhausted) { StartSynchronizers(); - return; } - - std::move(future).Then( - [this]( - data_interfaces::FDv2SourceResult const& result) -> std::monostate { - OnInitializerResult(result); - return {}; - }, - async::kInlineExecutor); } void FDv2DataSystem::OnInitializerResult( @@ -164,13 +167,12 @@ void FDv2DataSystem::OnInitializerResult( te.error.Kind(), te.error.Message()); }, [&](Result::Goodbye const&) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() - << ": initializer received unexpected goodbye"; + LD_LOG(logger_, LogLevel::kDebug) + << Identity() << ": ignoring goodbye from initializer"; }, [&](Result::Timeout const&) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() << ": initializer timed out (unexpected)"; + LD_LOG(logger_, LogLevel::kDebug) + << Identity() << ": ignoring timeout from initializer"; }, }, result.value); @@ -192,6 +194,7 @@ void FDv2DataSystem::OnInitializerResult( void FDv2DataSystem::StartSynchronizers() { bool exhausted = false; + bool cycled_synchronizers = false; { std::lock_guard lock(mutex_); if (closed_) { @@ -199,6 +202,7 @@ void FDv2DataSystem::StartSynchronizers() { } if (synchronizer_index_ >= synchronizer_factories_.size()) { exhausted = true; + cycled_synchronizers = synchronizer_index_ > 0; } else { auto& factory = synchronizer_factories_[synchronizer_index_++]; active_synchronizer_ = factory->Build(); @@ -206,8 +210,19 @@ void FDv2DataSystem::StartSynchronizers() { } if (exhausted) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() << ": no synchronizers available"; + // kOff when we can't continue updating; init-only with data stays + // kValid. + if (cycled_synchronizers || !store_.Initialized()) { + std::string const message = + cycled_synchronizers + ? "all data source acquisition methods have been exhausted" + : "all initializers exhausted and no synchronizers " + "configured"; + LD_LOG(logger_, LogLevel::kWarn) << Identity() << ": " << message; + status_manager_->SetState( + DataSourceStatus::DataSourceState::kOff, + DataSourceStatus::ErrorInfo::ErrorKind::kUnknown, message); + } return; } @@ -215,24 +230,20 @@ void FDv2DataSystem::StartSynchronizers() { } void FDv2DataSystem::RunSynchronizerNext() { - auto future = async::MakeFuture(data_interfaces::FDv2SourceResult{ - data_interfaces::FDv2SourceResult::Shutdown{}}); - { - std::lock_guard lock(mutex_); - if (closed_ || !active_synchronizer_) { - return; - } - future = - active_synchronizer_->Next(kSynchronizerNextTimeout, selector_); + std::lock_guard lock(mutex_); + if (closed_ || !active_synchronizer_) { + return; } - - std::move(future).Then( - [this]( - data_interfaces::FDv2SourceResult const& result) -> std::monostate { - OnSynchronizerResult(result); - return {}; - }, - async::kInlineExecutor); + active_synchronizer_->Next(kSynchronizerNextTimeout, selector_) + .Then( + [this](data_interfaces::FDv2SourceResult const& result) + -> std::monostate { + OnSynchronizerResult(result); + return {}; + }, + [ioc = ioc_](async::Continuation work) { + boost::asio::post(ioc, std::move(work)); + }); } void FDv2DataSystem::OnSynchronizerResult( @@ -266,10 +277,11 @@ void FDv2DataSystem::OnSynchronizerResult( advance = true; }, [&](Result::Goodbye const& gb) { - LD_LOG(logger_, LogLevel::kInfo) - << Identity() << ": synchronizer goodbye" + // The synchronizer handles goodbye internally (reconnects); + // the orchestrator just loops on the same source. + LD_LOG(logger_, LogLevel::kDebug) + << Identity() << ": ignoring goodbye from synchronizer" << (gb.reason ? ": " + *gb.reason : ""); - advance = true; }, [&](Result::Timeout const&) { LD_LOG(logger_, LogLevel::kDebug) diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp index c2b294f07..b359945f9 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -12,6 +12,7 @@ #include +#include #include #include #include @@ -105,7 +106,8 @@ namespace launchdarkly::server_side::data_systems { * | Synchronizer | ChangeSet -> apply, loop * | phase | Interrupted -> loop (source self-retries) * | M = 0, 1, 2, ... | Timeout -> loop - * | | Goodbye/Term -> M += 1 + * | | Goodbye -> loop (source self-restarts) + * | | TerminalError -> M += 1 * | | Shutdown -> [Closed] * +-------------------+ * | @@ -121,9 +123,13 @@ namespace launchdarkly::server_side::data_systems { * kInterrupted on Interrupted / TerminalError * (filtered to kInitializing while still in * the initializer phase if not yet Valid). - * kValid -> kInterrupted on errors; kOff in destructor. + * kOff if all initializers exhaust without data + * and no synchronizers are configured. + * kValid -> kInterrupted on errors; kOff in destructor or + * when synchronizers cycle through and exhaust. * kInterrupted -> kValid on next successful ChangeSet apply; - * kOff in destructor. + * kOff in destructor or on synchronizer + * exhaustion. * kOff -> terminal. */ class FDv2DataSystem final : public data_interfaces::IDataSystem { @@ -217,8 +223,10 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { void Close(); // Orchestration-loop methods. Each step chains the next via Future::Then, - // so at most one is in flight at a time. mutex_ guards shared state - // against the destructor's Close() running concurrently with a callback. + // so at most one step has a pending continuation at any time. mutex_ + // provides mutual exclusion for orchestration state when callbacks run on + // different executor threads, and lets Close() safely tear down active + // sources from any thread. void RunNextInitializer(); void OnInitializerResult(data_interfaces::FDv2SourceResult result); @@ -250,6 +258,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { // safe. data_components::ChangeNotifier change_notifier_; + // Set by Initialize() to detect repeat or concurrent calls. + std::atomic_bool initialize_called_; + // Orchestration state, guarded by mutex_. std::mutex mutex_; bool closed_; diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index 894d35846..cc2c861a2 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -210,6 +210,14 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { << r.reason.value_or("") << "'."; Notify(FDv2SourceResult{ FDv2SourceResult::Goodbye{r.reason, false}}); + // Drop the current connection and reconnect; the protocol + // handler is reset so the new connection starts in a clean + // state. + protocol_handler_.Reset(); + std::lock_guard lock(mutex_); + if (sse_client_) { + sse_client_->async_restart("FDv2 goodbye received"); + } } else if constexpr (std::is_same_v) { if (r.kind == FDv2ProtocolHandler::Error::Kind::kServerError) { diff --git a/libs/server-sdk/tests/change_notifier_test.cpp b/libs/server-sdk/tests/change_notifier_test.cpp index eb51f5748..2e872ce64 100644 --- a/libs/server-sdk/tests/change_notifier_test.cpp +++ b/libs/server-sdk/tests/change_notifier_test.cpp @@ -2,12 +2,14 @@ #include #include +#include #include using launchdarkly::Value; using namespace launchdarkly::data_model; using namespace launchdarkly::server_side::data_components; +using namespace launchdarkly::server_side::data_interfaces; TEST(ChangeNotifierTest, DoesNotInitializeStoreUntilInit) { MemoryStore store; @@ -422,3 +424,199 @@ TEST(ChangeNotifierTest, NoEventOnDiscardedUpsert) { EXPECT_EQ(false, got_event); } + +TEST(ChangeNotifierTest, ApplyFullReplacesStore) { + Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + Flag flag_b; + flag_b.key = "flagB"; + flag_b.version = 1; + + MemoryStore store; + ChangeNotifier updater(store, store); + + updater.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + // Apply a Full changeset containing only flagB. + updater.Apply(ChangeSet{ + ChangeSetType::kFull, + ChangeSetData{ItemChange{"flagB", FlagDescriptor(flag_b)}}, + Selector{}, + }); + + // flagA was wiped by the Full changeset; flagB is now present. + EXPECT_FALSE(store.GetFlag("flagA")); + auto fetched = store.GetFlag("flagB"); + ASSERT_TRUE(fetched); + EXPECT_EQ(1, fetched->version); +} + +TEST(ChangeNotifierTest, ApplyPartialPreservesStore) { + Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + Flag flag_b; + flag_b.key = "flagB"; + flag_b.version = 1; + + MemoryStore store; + ChangeNotifier updater(store, store); + + updater.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + // Apply a Partial changeset containing only flagB. + updater.Apply(ChangeSet{ + ChangeSetType::kPartial, + ChangeSetData{ItemChange{"flagB", FlagDescriptor(flag_b)}}, + Selector{}, + }); + + // Both flags are present: flagA was preserved, flagB was added. + EXPECT_TRUE(store.GetFlag("flagA")); + EXPECT_TRUE(store.GetFlag("flagB")); +} + +TEST(ChangeNotifierTest, ApplyNoneIsNoOp) { + Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + MemoryStore store; + ChangeNotifier updater(store, store); + + updater.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map(), + }); + + std::atomic got_event(false); + updater.OnFlagChange( + [&got_event](std::shared_ptr> changeset) { + got_event = true; + }); + + // Apply a kNone changeset. + updater.Apply(ChangeSet{ + ChangeSetType::kNone, + ChangeSetData{}, + Selector{}, + }); + + // Store is untouched and no change event was emitted. + EXPECT_TRUE(store.GetFlag("flagA")); + EXPECT_FALSE(got_event); +} + +TEST(ChangeNotifierTest, ApplyFullProducesChangeEvents) { + Flag flag_a_v1; + flag_a_v1.key = "flagA"; + flag_a_v1.version = 1; + + Flag flag_b_v1; + flag_b_v1.key = "flagB"; + flag_b_v1.version = 1; + + MemoryStore store; + ChangeNotifier updater(store, store); + + updater.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a_v1)}, + {"flagB", FlagDescriptor(flag_b_v1)}}, + std::unordered_map(), + }); + + Flag flag_a_v2; + flag_a_v2.key = "flagA"; + flag_a_v2.version = 2; + + Flag flag_c; + flag_c.key = "flagC"; + flag_c.version = 1; + + std::atomic got_event(false); + updater.OnFlagChange( + [&got_event](std::shared_ptr> changeset) { + got_event = true; + auto expected = std::set{"flagA", "flagB", "flagC"}; + std::vector diff; + std::set_difference(expected.begin(), expected.end(), + changeset->begin(), changeset->end(), + std::inserter(diff, diff.begin())); + EXPECT_EQ(0, diff.size()); + }); + + // Apply a Full changeset that bumps flagA, drops flagB, and adds flagC. + updater.Apply(ChangeSet{ + ChangeSetType::kFull, + ChangeSetData{ + ItemChange{"flagA", FlagDescriptor(flag_a_v2)}, + ItemChange{"flagC", FlagDescriptor(flag_c)}, + }, + Selector{}, + }); + + // A change event was emitted (with flagA, flagB, flagC per the listener). + EXPECT_TRUE(got_event); +} + +TEST(ChangeNotifierTest, ApplyPartialSegmentChangePropagatesToDependentFlag) { + // flagA depends on segmentA; updating segmentA should fire a change + // event including flagA. + Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + Clause clause; + clause.op = Clause::Op::kSegmentMatch; + clause.values = std::vector{"segmentA"}; + Flag::Rule rule; + rule.clauses.push_back(clause); + flag_a.rules.push_back(rule); + + Segment segment_a_v1; + segment_a_v1.key = "segmentA"; + segment_a_v1.version = 1; + + MemoryStore store; + ChangeNotifier updater(store, store); + + updater.Init(SDKDataSet{ + std::unordered_map{ + {"flagA", FlagDescriptor(flag_a)}}, + std::unordered_map{ + {"segmentA", SegmentDescriptor(segment_a_v1)}}, + }); + + Segment segment_a_v2; + segment_a_v2.key = "segmentA"; + segment_a_v2.version = 2; + + std::atomic got_event(false); + updater.OnFlagChange( + [&got_event](std::shared_ptr> changeset) { + got_event = true; + EXPECT_TRUE(changeset->count("flagA")); + }); + + // Apply a Partial changeset bumping segmentA's version. + updater.Apply(ChangeSet{ + ChangeSetType::kPartial, + ChangeSetData{ItemChange{"segmentA", SegmentDescriptor(segment_a_v2)}}, + Selector{}, + }); + + // Change event fired; flagA appears because it depends on segmentA. + EXPECT_TRUE(got_event); +} diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index 1839f7a2f..307338975 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -380,6 +380,36 @@ TEST(FDv2DataSystemTest, EXPECT_EQ(0, second_factory_ptr->build_count_); } +TEST(FDv2DataSystemTest, InitializerOnly_AllFail_TransitionsToOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + auto init = std::make_unique( + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, + "fail", std::chrono::system_clock::now()}, + false, + }}); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(init))); + + FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), + &status_manager, logger); + + // Run: initializer fails and there are no synchronizers to fall through to. + ds.Initialize(); + ioc.run(); + + // No data was ever applied; status transitions to Off. + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kOff); + EXPECT_FALSE(ds.Initialized()); +} + // ============================================================================ // Synchronizer phase // ============================================================================ @@ -420,11 +450,14 @@ TEST(FDv2DataSystemTest, SynchronizerChangeSet_AppliesAndStatusValid) { EXPECT_EQ(7u, fetched->version); } -TEST(FDv2DataSystemTest, SynchronizerGoodbye_AdvancesToNextFactory) { +TEST(FDv2DataSystemTest, SynchronizerGoodbye_StaysOnSameSynchronizer) { auto logger = MakeNullLogger(); boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; + // Synchronizer first emits Goodbye, then exhausts (returns Shutdown). + // The synchronizer is expected to handle the goodbye internally + // (reconnecting); the orchestrator must NOT rotate. auto first = std::make_unique(std::vector{ FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}}}); @@ -432,8 +465,9 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_AdvancesToNextFactory) { std::make_unique(std::move(first)); auto* first_factory_ptr = first_factory.get(); - auto second = std::make_unique( - std::vector{}); // empty -> Shutdown + // Second factory should never be built; presence detects rotation. + auto second = + std::make_unique(std::vector{}); auto second_factory = std::make_unique(std::move(second)); auto* second_factory_ptr = second_factory.get(); @@ -445,13 +479,12 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_AdvancesToNextFactory) { FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), &status_manager, logger); - // First synchronizer says Goodbye; orchestrator should rotate to the - // next factory. ds.Initialize(); ioc.run(); + // Goodbye does not advance the factory cursor. EXPECT_EQ(1, first_factory_ptr->build_count_); - EXPECT_EQ(1, second_factory_ptr->build_count_); + EXPECT_EQ(0, second_factory_ptr->build_count_); } TEST(FDv2DataSystemTest, SynchronizerInterrupted_RetriesSameSynchronizer) { @@ -588,3 +621,36 @@ TEST(FDv2DataSystemTest, EXPECT_EQ(status_manager.Status().State(), DataSourceStatus::DataSourceState::kInterrupted); } + +TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Single synchronizer that fails terminally on its first Next. The + // orchestrator advances past the only factory and finds no more, + // exhausting sources. + auto sync = + std::make_unique(std::vector{ + FDv2SourceResult{FDv2SourceResult::TerminalError{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, + "unauthorized", std::chrono::system_clock::now()}, + false, + }}}); + + std::vector> synchronizers; + synchronizers.push_back( + std::make_unique(std::move(sync))); + + FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), + &status_manager, logger); + + // Synchronizer fails terminally; no more factories to try. + ds.Initialize(); + ioc.run(); + + // We cycled through all synchronizers; status transitions to Off. + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kOff); +} From 8effe466c8294dda721334594aa3ebb0c82052bd Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 6 May 2026 14:04:51 -0700 Subject: [PATCH 3/5] test: cover FDv2DataSystem destruction with in-flight orchestration Adds two regression tests that exercise the destruction protocol contract documented on FDv2DataSystem: when the destructor runs while an initializer or synchronizer Future is unresolved, the orchestrator must close the active source, transition status to kOff, and tear down the captured-this continuation chain without firing it against the destroyed object. Existing Destructor_TransitionsStatusToOff only covers the offline-mode case (no factories, no orchestration ever started); the in-flight teardown paths were not exercised. Adds StalledInitializer / StalledSynchronizer mocks that return an unresolved Future to drive the orchestrator into the in-flight state, then destroy the data system before the future resolves. --- .../tests/fdv2_data_system_test.cpp | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index 307338975..e4fd8bd33 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -138,6 +138,61 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory { std::unique_ptr source_; }; +// Initializer whose Run() returns an unresolved Future. Used to exercise +// destruction with orchestration in flight. +class StalledInitializer : public IFDv2Initializer { + public: + explicit StalledInitializer(bool* closed_flag) : closed_flag_(closed_flag) {} + + async::Future Run() override { + return promise_.GetFuture(); + } + + void Close() override { + if (closed_flag_) { + *closed_flag_ = true; + } + } + + std::string const& Identity() const override { + static std::string const id = "stalled initializer"; + return id; + } + + private: + async::Promise promise_; + bool* closed_flag_; +}; + +// Synchronizer whose Next() returns an unresolved Future. Used to exercise +// destruction with orchestration in flight. +class StalledSynchronizer : public IFDv2Synchronizer { + public: + explicit StalledSynchronizer(bool* closed_flag) + : closed_flag_(closed_flag) {} + + async::Future Next( + std::chrono::milliseconds, + data_model::Selector) override { + return promise_.GetFuture(); + } + + void Close() override { + if (closed_flag_) { + *closed_flag_ = true; + } + } + + std::string const& Identity() const override { + static std::string const id = "stalled synchronizer"; + return id; + } + + private: + async::Promise promise_; + bool* closed_flag_; +}; + data_model::Selector MakeSelector(std::int64_t version, std::string state) { return data_model::Selector{ data_model::Selector::State{version, std::move(state)}}; @@ -654,3 +709,73 @@ TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) { EXPECT_EQ(status_manager.Status().State(), DataSourceStatus::DataSourceState::kOff); } + +// ============================================================================ +// Destruction protocol: in-flight orchestration +// ============================================================================ +// +// The destructor contract (fdv2_data_system.hpp) requires the destructor to +// cancel in-flight orchestration (close the active source, transition status +// to kOff) without firing any continuation against the destroyed object. The +// caller's responsibility is to ensure the executor is no longer running by +// the time destruction begins; the orchestrator's responsibility is to leave +// nothing dangling. These two tests pin that contract for both phases. + +TEST(FDv2DataSystemTest, + Destructor_WithInFlightInitializer_ClosesSourceAndStatusOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + bool initializer_closed = false; + + auto initializer = + std::make_unique(&initializer_closed); + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + { + FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), + &status_manager, logger); + ds.Initialize(); + // RunNextInitializer runs, builds the source, calls Run().Then(...). + // Run() returns an unresolved Future; the orchestrator's continuation + // is registered but will never fire. ioc has no more pending work. + ioc.run(); + } + // ~FDv2DataSystem ran with the initializer's Future still unresolved. + + EXPECT_TRUE(initializer_closed); + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kOff); +} + +TEST(FDv2DataSystemTest, + Destructor_WithInFlightSynchronizer_ClosesSourceAndStatusOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + bool synchronizer_closed = false; + + auto synchronizer = + std::make_unique(&synchronizer_closed); + std::vector> synchronizers; + synchronizers.push_back(std::make_unique( + std::move(synchronizer))); + + { + FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), + &status_manager, logger); + ds.Initialize(); + // No initializers -> RunNextInitializer immediately exhausts -> + // StartSynchronizers builds the synchronizer and calls Next(). + // Next() returns an unresolved Future; orchestration is mid-flight. + ioc.run(); + } + + EXPECT_TRUE(synchronizer_closed); + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kOff); +} From 3a52efc573469ddee058216613e94c14877f28c9 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 6 May 2026 14:33:35 -0700 Subject: [PATCH 4/5] test: cover FDv2StreamingSynchronizer Goodbye behavior Adds two regression tests for the Goodbye handling added in d154cbe: - GoodbyeEventTriggersAsyncRestart: verifies that on receiving a goodbye event, the synchronizer drives sse::Client::async_restart with the documented reason string. Without this, the server's "we're about to disconnect" signal would lead to a stalled connection rather than a controlled reconnect. Adds a MockSseClient that records calls and a SetSseClient test peer to inject it. - GoodbyeMidPayloadDiscardsAccumulatedAndAcceptsFreshChangeset: feeds a partial payload, then a goodbye, then a fresh full changeset, and asserts that the accumulated puts were discarded and only the fresh put is in the resulting ChangeSet. Locks in the spec-aligned property that Goodbye does not corrupt subsequent payloads. --- .../fdv2_streaming_synchronizer_test.cpp | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp index cb9bdd8f6..9ae12f0df 100644 --- a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -54,6 +55,11 @@ class FDv2StreamingSynchronizerTestPeer { std::lock_guard lock(sync.state_->mutex_); sync.state_->latest_selector_ = std::move(selector); } + static void SetSseClient(FDv2StreamingSynchronizer& sync, + std::shared_ptr client) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->sse_client_ = std::move(client); + } }; } // namespace launchdarkly::server_side::data_systems @@ -104,6 +110,29 @@ config::shared::built::HttpProperties MakeHttpProperties() { .Build(); } +// Records calls to the sse::Client interface. Used to verify that the +// synchronizer drives the underlying SSE connection correctly without +// requiring a real network client. +class MockSseClient : public sse::Client { + public: + void async_connect() override { ++connect_count_; } + void async_shutdown(std::function completion) override { + ++shutdown_count_; + if (completion) { + completion(); + } + } + void async_restart(std::string const& reason) override { + ++restart_count_; + last_restart_reason_ = reason; + } + + int connect_count_ = 0; + int shutdown_count_ = 0; + int restart_count_ = 0; + std::string last_restart_reason_; +}; + } // namespace // ============================================================================ @@ -415,6 +444,99 @@ TEST(FDv2StreamingSynchronizerTest, GoodbyeEventReturnsGoodbye) { EXPECT_EQ(*g->reason, "bye"); } +TEST(FDv2StreamingSynchronizerTest, GoodbyeEventTriggersAsyncRestart) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + auto mock_client = std::make_shared(); + FDv2StreamingSynchronizerTestPeer::SetSseClient(synchronizer, mock_client); + + sse::Event goodbye("goodbye", R"({"reason":"bye"})"); + + // Act: deliver a goodbye event and drain the resulting Goodbye result. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + ASSERT_TRUE(result.has_value()); + + // Assert: the Goodbye handler drove the SSE client to restart with the + // documented reason string. Without this, the server's "we're about to + // disconnect" signal would lead to a stalled connection rather than a + // controlled reconnect. + EXPECT_EQ(mock_client->restart_count_, 1); + EXPECT_EQ(mock_client->last_restart_reason_, "FDv2 goodbye received"); +} + +TEST(FDv2StreamingSynchronizerTest, + GoodbyeMidPayloadDiscardsAccumulatedAndAcceptsFreshChangeset) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Begin accumulating a payload that we'll abandon mid-flight via Goodbye. + sse::Event server_intent_one("server-intent", + R"({"payloads":[{"id":"p1","target":1,)" + R"("intentCode":"xfer-full"}]})"); + sse::Event abandoned_put( + "put-object", + R"({"version":1,"kind":"flag","key":"abandoned","object":)" + R"({"key":"abandoned","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1}})"); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + server_intent_one); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, abandoned_put); + + // Goodbye arrives mid-payload; expect a Goodbye result and the partial + // payload to be discarded. + sse::Event goodbye("goodbye", R"({"reason":"bye"})"); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); + auto goodbye_result = + synchronizer.Next(2s, data_model::Selector{}).WaitForResult(2s); + ASSERT_TRUE(goodbye_result.has_value()); + ASSERT_NE(std::get_if(&goodbye_result->value), + nullptr); + + // Drive a fresh full changeset through. The protocol handler must be back + // in a clean state — neither carrying the abandoned put nor stuck in the + // previous accumulating state. + sse::Event server_intent_two("server-intent", + R"({"payloads":[{"id":"p2","target":2,)" + R"("intentCode":"xfer-full"}]})"); + sse::Event fresh_put( + "put-object", + R"({"version":2,"kind":"flag","key":"fresh","object":)" + R"({"key":"fresh","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":2}})"); + sse::Event payload_transferred("payload-transferred", + R"({"state":"abc","version":2})"); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + server_intent_two); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, fresh_put); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + payload_transferred); + auto changeset_result = + synchronizer.Next(2s, data_model::Selector{}).WaitForResult(2s); + ASSERT_TRUE(changeset_result.has_value()); + auto* cs = + std::get_if(&changeset_result->value); + ASSERT_NE(cs, nullptr); + + // Only the fresh put should be present; the abandoned put was discarded. + ASSERT_EQ(cs->change_set.data.size(), 1u); + EXPECT_EQ(cs->change_set.data[0].key, "fresh"); +} + TEST(FDv2StreamingSynchronizerTest, ServerErrorEventReturnsInterrupted) { auto logger = MakeNullLogger(); IoContextRunner runner; From 7a147d20bf29fe34b6e643939225e347604217dd Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 6 May 2026 14:38:31 -0700 Subject: [PATCH 5/5] test: cover post-Goodbye selector preservation in FDv2DataSystem Adds SynchronizerGoodbye_PreservesSelectorOnNextCall: drives the orchestrator through initializer-basis@v1 -> ChangeSet@v2 -> Goodbye -> Shutdown, and asserts the captured Next() selectors are v1, v2, v2 in order. The existing SynchronizerGoodbye_StaysOnSameSynchronizer test only checks that Goodbye does not rotate the synchronizer factory; it does not verify what selector the post-Goodbye Next() call receives. Without this preservation, the SDK would reconnect with stale or empty payload state on every Goodbye, forcing the server into expensive xfer-full responses instead of efficient xfer-changes. Verified load-bearing: temporarily clearing selector_ on Goodbye in fdv2_data_system.cpp makes only this test fail (the existing Goodbye test still passes). --- .../tests/fdv2_data_system_test.cpp | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index e4fd8bd33..20db235f1 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -630,6 +630,70 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { EXPECT_EQ("state-2", next_calls[1].second.value->state); } +TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Initializer provides a basis with selector v1/state-1. + auto initializer = std::make_unique( + MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + // Synchronizer returns a partial changeset (selector advances to v2), + // then a Goodbye, then exhausts via Shutdown. The orchestrator must + // call Next a third time with the v2 selector — Goodbye is a transient + // event the synchronizer handles internally (reconnects), and the + // orchestrator must not regress the selector across it. Without this + // preservation, the SDK would reconnect with stale or empty payload + // state on every Goodbye, forcing the server into expensive xfer-full + // responses instead of efficient xfer-changes. + std::vector next_calls; + std::vector results; + results.push_back(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kPartial, + ChangeSetData{}, + MakeSelector(2, "state-2"), + }, + false, + }}); + results.push_back( + FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}}); + auto sync = std::make_unique(std::move(results), nullptr, + &next_calls); + + std::vector> synchronizers; + synchronizers.push_back( + std::make_unique(std::move(sync))); + + FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), + ioc.get_executor(), &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Three Next calls: first with v1 from the initializer, second with v2 + // after the partial changeset, third with v2 still — Goodbye does not + // regress the selector. + ASSERT_EQ(3u, next_calls.size()); + + ASSERT_TRUE(next_calls[0].second.value.has_value()); + EXPECT_EQ(1, next_calls[0].second.value->version); + EXPECT_EQ("state-1", next_calls[0].second.value->state); + + ASSERT_TRUE(next_calls[1].second.value.has_value()); + EXPECT_EQ(2, next_calls[1].second.value->version); + EXPECT_EQ("state-2", next_calls[1].second.value->state); + + ASSERT_TRUE(next_calls[2].second.value.has_value()); + EXPECT_EQ(2, next_calls[2].second.value->version); + EXPECT_EQ("state-2", next_calls[2].second.value->state); +} + TEST(FDv2DataSystemTest, SynchronizerTerminalError_StatusInterruptedAndAdvance) { auto logger = MakeNullLogger();