Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions libs/server-sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ target_sources(${LIBNAME}
data_components/dependency_tracker/dependency_tracker.cpp
data_components/expiration_tracker/expiration_tracker.hpp
data_components/expiration_tracker/expiration_tracker.cpp
data_interfaces/destination/itransactional_destination.hpp
data_components/memory_store/memory_store.hpp
data_components/memory_store/memory_store.cpp
data_components/serialization_adapters/json_deserializer.hpp
Expand All @@ -59,6 +60,8 @@ target_sources(${LIBNAME}
data_systems/fdv2/polling_synchronizer.cpp
data_systems/fdv2/streaming_synchronizer.hpp
data_systems/fdv2/streaming_synchronizer.cpp
data_systems/fdv2/fdv2_data_system.hpp
data_systems/fdv2/fdv2_data_system.cpp
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
data_systems/background_sync/sources/streaming/streaming_data_source.cpp
data_systems/background_sync/sources/streaming/event_handler.hpp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
#include "change_notifier.hpp"

#include <launchdarkly/signals/boost_signal_connection.hpp>

#include <mutex>
#include <utility>
#include <variant>

namespace launchdarkly::server_side::data_components {

namespace {

// Lets std::visit dispatch to a different lambda per variant alternative.
template <class... Ts>
struct overloaded : Ts... {
using Ts::operator()...;
};
template <class... Ts>
overloaded(Ts...) -> overloaded<Ts...>;

} // namespace

std::unique_ptr<IConnection> ChangeNotifier::OnFlagChange(
ChangeHandler handler) {
std::lock_guard lock{signal_mutex_};
Expand Down Expand Up @@ -55,6 +70,79 @@ void ChangeNotifier::Upsert(std::string const& key,
std::move(segment));
}

void ChangeNotifier::Apply(
data_model::ChangeSet<data_interfaces::ChangeSetData> change_set) {
if (change_set.type == data_model::ChangeSetType::kNone) {
return;
}

// Compute changed dependencies before passing the changeset to the sink.
std::optional<DependencySet> change_notifications;
if (HasListeners()) {
DependencySet affected;
if (change_set.type == data_model::ChangeSetType::kFull) {
// Group items by kind so the existing per-kind diff helper can
// compare the new state to the existing store contents.
Collection<data_model::Flag> new_flags;
Collection<data_model::Segment> new_segments;
for (auto const& change : change_set.data) {
std::visit(
overloaded{
[&](data_model::ItemDescriptor<data_model::Flag> const&
f) { new_flags.emplace(change.key, f); },
[&](data_model::ItemDescriptor<
data_model::Segment> const& s) {
new_segments.emplace(change.key, s);
},
},
change.object);
}
CalculateChanges(DataKind::kFlag, source_.AllFlags(), new_flags,
affected);
CalculateChanges(DataKind::kSegment, source_.AllSegments(),
new_segments, affected);
} else {
// Partial: every item in the changeset is treated as a change;
// no version comparison.
for (auto const& change : change_set.data) {
std::visit(overloaded{
[&](data_model::ItemDescriptor<
data_model::Flag> const&) {
dependency_tracker_.CalculateChanges(
DataKind::kFlag, change.key, affected);
},
[&](data_model::ItemDescriptor<
data_model::Segment> const&) {
dependency_tracker_.CalculateChanges(
DataKind::kSegment, change.key,
affected);
},
},
change.object);
}
}
change_notifications = std::move(affected);
}

// Update the dependency tracker.
if (change_set.type == data_model::ChangeSetType::kFull) {
dependency_tracker_.Clear();
}
for (auto const& change : change_set.data) {
std::visit(
[&](auto const& descriptor) {
dependency_tracker_.UpdateDependencies(change.key, descriptor);
},
change.object);
}

sink_.Apply(std::move(change_set));

if (change_notifications) {
NotifyChanges(std::move(*change_notifications));
}
}

bool ChangeNotifier::HasListeners() const {
std::lock_guard lock{signal_mutex_};
return !signals_.empty();
Expand All @@ -69,7 +157,7 @@ void ChangeNotifier::NotifyChanges(DependencySet changes) {
}
}

ChangeNotifier::ChangeNotifier(IDestination& sink,
ChangeNotifier::ChangeNotifier(data_interfaces::ITransactionalDestination& sink,
data_interfaces::IStore const& source)
: sink_(sink), source_(source) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "../../data_interfaces/destination/idestination.hpp"
#include "../../data_interfaces/destination/itransactional_destination.hpp"
#include "../../data_interfaces/store/istore.hpp"
#include "../dependency_tracker/dependency_tracker.hpp"

Expand All @@ -13,7 +13,7 @@

namespace launchdarkly::server_side::data_components {

class ChangeNotifier final : public data_interfaces::IDestination,
class ChangeNotifier final : public data_interfaces::ITransactionalDestination,
public IChangeNotifier {
public:
template <typename Storage>
Expand All @@ -26,7 +26,8 @@ class ChangeNotifier final : public data_interfaces::IDestination,
using SharedCollection =
std::unordered_map<std::string, SharedItem<Storage>>;

ChangeNotifier(IDestination& sink, data_interfaces::IStore const& source);
ChangeNotifier(data_interfaces::ITransactionalDestination& sink,
data_interfaces::IStore const& source);

std::unique_ptr<IConnection> OnFlagChange(ChangeHandler handler) override;

Expand All @@ -35,6 +36,8 @@ class ChangeNotifier final : public data_interfaces::IDestination,
data_model::FlagDescriptor flag) override;
void Upsert(std::string const& key,
data_model::SegmentDescriptor segment) override;
void Apply(data_model::ChangeSet<data_interfaces::ChangeSetData> change_set)
override;

[[nodiscard]] std::string const& Identity() const override;

Expand Down Expand Up @@ -105,7 +108,7 @@ class ChangeNotifier final : public data_interfaces::IDestination,

void NotifyChanges(DependencySet changes);

IDestination& sink_;
data_interfaces::ITransactionalDestination& sink_;
data_interfaces::IStore const& source_;

boost::signals2::signal<void(std::shared_ptr<ChangeSet>)> signals_;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "../../data_interfaces/destination/idestination.hpp"
#include "../../data_interfaces/destination/itransactional_destination.hpp"
#include "../../data_interfaces/item_change.hpp"
#include "../../data_interfaces/store/istore.hpp"

Expand All @@ -14,7 +14,7 @@
namespace launchdarkly::server_side::data_components {

class MemoryStore final : public data_interfaces::IStore,
public data_interfaces::IDestination {
public data_interfaces::ITransactionalDestination {
public:
[[nodiscard]] std::shared_ptr<data_model::FlagDescriptor> GetFlag(
std::string const& key) const override;
Expand Down Expand Up @@ -47,7 +47,8 @@ class MemoryStore final : public data_interfaces::IStore,

bool RemoveSegment(std::string const& key);

void Apply(data_model::ChangeSet<data_interfaces::ChangeSetData> changeSet);
void Apply(data_model::ChangeSet<data_interfaces::ChangeSetData> changeSet)
override;

MemoryStore() = default;
~MemoryStore() override = default;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "../item_change.hpp"
#include "idestination.hpp"

#include <launchdarkly/data_model/change_set.hpp>

namespace launchdarkly::server_side::data_interfaces {

/**
* ITransactionalDestination extends IDestination with the ability to apply
* an FDv2 changeset atomically.
*
* A changeset is a batch of flag and segment upserts and deletions that must
* be applied as a unit; readers must never observe a partially applied
* changeset.
*/
class ITransactionalDestination : public IDestination {
public:
/**
* Applies an FDv2 changeset to the destination atomically.
*/
virtual void Apply(data_model::ChangeSet<ChangeSetData> change_set) = 0;

ITransactionalDestination(ITransactionalDestination const&) = delete;
ITransactionalDestination(ITransactionalDestination&&) = delete;
ITransactionalDestination& operator=(ITransactionalDestination const&) =
delete;
ITransactionalDestination& operator=(ITransactionalDestination&&) = delete;
~ITransactionalDestination() override = default;

protected:
ITransactionalDestination() = default;
};

} // namespace launchdarkly::server_side::data_interfaces
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "ifdv2_initializer.hpp"

#include <memory>

namespace launchdarkly::server_side::data_interfaces {

/**
* Builds new IFDv2Initializer instances on demand. Each call to Build()
* produces a fresh initializer that has not yet been started.
*/
class IFDv2InitializerFactory {
public:
virtual std::unique_ptr<IFDv2Initializer> Build() = 0;

virtual ~IFDv2InitializerFactory() = default;
IFDv2InitializerFactory(IFDv2InitializerFactory const&) = delete;
IFDv2InitializerFactory(IFDv2InitializerFactory&&) = delete;
IFDv2InitializerFactory& operator=(IFDv2InitializerFactory const&) = delete;
IFDv2InitializerFactory& operator=(IFDv2InitializerFactory&&) = delete;

protected:
IFDv2InitializerFactory() = default;
};

} // namespace launchdarkly::server_side::data_interfaces
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include "ifdv2_synchronizer.hpp"

#include <memory>

namespace launchdarkly::server_side::data_interfaces {

/**
* Builds new IFDv2Synchronizer instances on demand. Each call to Build()
* produces a fresh synchronizer that has not yet been started.
*/
class IFDv2SynchronizerFactory {
public:
virtual std::unique_ptr<IFDv2Synchronizer> Build() = 0;

virtual ~IFDv2SynchronizerFactory() = default;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete;
IFDv2SynchronizerFactory& operator=(IFDv2SynchronizerFactory const&) =
delete;
IFDv2SynchronizerFactory& operator=(IFDv2SynchronizerFactory&&) = delete;

protected:
IFDv2SynchronizerFactory() = default;
};

} // namespace launchdarkly::server_side::data_interfaces
Loading
Loading