Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 58 additions & 25 deletions src/VecSim/algorithms/svs/svs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,33 +474,56 @@ class VecSimSVSThreadPoolImpl {
return instance()->getAllocationSize();
}

// Physically resize the pool. Creates new OS threads on grow, shuts down idle threads
// on shrink. new_size is total parallelism including the calling thread (minimum 1).
// Occupied threads (held by renters) survive shrink via the deferred-resize protocol —
// the pool defers shrink while jobs are in flight, so slots cannot be destroyed while rented.
//
// If jobs are in flight (pending_jobs_ > 0), shrink is deferred — the target size is
// stored and applied when the last job completes (see endScheduledJob()). Grow is
// always applied immediately so new jobs can use the extra threads right away.
// Resize the shared pool. in all cases the requested size is stored in
// deferred_size_ and applied later if not applied immediately:
// * no SVS index attached yet → recorded; applied on first
// onIndexAttached() (so no OS threads are spawned in deployments that
// never create an SVS index).
// * shrink while jobs are in flight (pending_jobs_ > 0) → recorded;
// applied when endScheduledJob() drops pending_jobs_ back to 0 (avoids
// destroying slots that already-reserved RediSearch workers will rent).
// * otherwise (grows; shrinks with no jobs in flight) → applied
// immediately via resize_locked().
// The two deferral cases never overlap — no jobs can be in flight before
// the first index attaches. Clamps to a minimum of 1.
void resize(size_t new_size) {
new_size = std::max(new_size, size_t{1});
std::lock_guard lock{pool_mutex_};
resize_locked(new_size);
if (has_attached_index_) {
resize_locked(new_size);
} else {
deferred_size_ = new_size;
}
}

// Deferred-resize protocol
// ========================
// When a job is created via createScheduledJobs(), the pool size is snapshotted
// to determine how many reserve jobs to submit to the RediSearch worker pool.
// If resize() shrinks the SVS pool between that snapshot and when the job
// actually executes, the RediSearch workers would have checked in (reserved
// threads exist) but the SVS pool slots they need to rent from would have been
// destroyed — causing a failure.
//
// To prevent this, beginScheduledJob() increments pending_jobs_, and any shrink
// while pending_jobs_ > 0 is deferred (stored in deferred_size_) until the last
// in-flight job completes and its destructor calls endScheduledJob(). Grows are
// always applied immediately since extra threads don't break anything.
// Called from the per-index VecSimSVSThreadPool ctor. The first call flips
// has_attached_index_ and applies any size requested earlier via resize()
// — this is where OS threads are actually spawned in the lazy path.
void onIndexAttached() {
std::lock_guard lock{pool_mutex_};
if (has_attached_index_)
return;
has_attached_index_ = true;
if (deferred_size_)
resize_locked(deferred_size_.value());
}

#ifdef BUILD_TESTS
// Restore the singleton to its as-constructed state: drop all worker slots
// (releasing vector capacity so getAllocationSize() returns 0), clear
// has_attached_index_, deferred_size_, and pending_jobs_. Intended for unit
// tests that need a clean baseline (the singleton itself is process-wide
// and cannot be torn down). Caller must ensure no jobs are in flight.
void resetForTest() {
std::lock_guard lock{pool_mutex_};
// Swap with a fresh empty vector to release the capacity allocation
// (clear() destroys elements but retains capacity).
std::vector<SlotPtr, SlotVecAllocator>(SlotVecAllocator(allocator_)).swap(slots_);
deferred_size_.reset();
pending_jobs_ = 0;
has_attached_index_ = false;
}
#endif

// Atomically mark a logical job as pending and snapshot the current shared pool size.
size_t beginScheduledJob() {
Expand Down Expand Up @@ -657,8 +680,14 @@ class VecSimSVSThreadPoolImpl {
std::shared_ptr<VecSimAllocator> allocator_; // pool's own allocator for memory tracking
mutable std::mutex pool_mutex_;
std::vector<SlotPtr, SlotVecAllocator> slots_;
size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight
std::optional<size_t> deferred_size_; // resize target deferred until pending_jobs_ == 0
size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight
// Pending pool size to apply at the next safe point: either the first SVS index
// attaches (onIndexAttached()) or pending_jobs_ drops to 0 (endScheduledJob()).
// The two cases are sequential and never overlap.
std::optional<size_t> deferred_size_;
// Flips true on the first onIndexAttached() call; gates resize() between
// "record only" and "apply immediately".
bool has_attached_index_ = false;
};

// Per-index wrapper around the shared VecSimSVSThreadPoolImpl singleton.
Expand Down Expand Up @@ -688,12 +717,16 @@ class VecSimSVSThreadPool {
// explicit setParallelism() call.
// parallelism_ is allocated through the provided VecsimAllocator so that the
// allocation is tracked by the index's memory accounting.
// Notifies the shared pool that an index has attached — on the very first call this
// applies any size requested earlier via resize() (lazy thread spawn).
explicit VecSimSVSThreadPool(const std::shared_ptr<VecSimAllocator> &allocator,
void *log_ctx = nullptr)
: pool_(VecSimSVSThreadPoolImpl::instance()),
parallelism_(std::allocate_shared<std::atomic<size_t>>(
VecsimSTLAllocator<std::atomic<size_t>>(allocator), size_t{1})),
log_ctx_(log_ctx) {}
log_ctx_(log_ctx) {
pool_->onIndexAttached();
}

// Resize the shared pool singleton. Delegates to VecSimSVSThreadPoolImpl::instance().
static void resize(size_t new_size) { VecSimSVSThreadPoolImpl::instance()->resize(new_size); }
Expand Down
6 changes: 3 additions & 3 deletions src/VecSim/vec_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) {
} else {
VecSimIndex::setWriteMode(VecSim_WriteAsync);
}
// Resize the shared SVS thread pool. resize() clamps to minimum size 1.
// new_size == 0 → pool size 1 (only calling thread, write-in-place).
// new_size > 0 → pool size new_size (new_size - 1 worker threads).
// Resize the shared SVS pool. Clamped to a minimum of 1. OS threads are spawned
// lazily on first SVS index creation; once an index exists this resizes the
// shared pool immediately (cooperating with the deferred-shrink protocol).
VecSimSVSThreadPool::resize(new_size);
}

Expand Down
46 changes: 43 additions & 3 deletions tests/unit/test_svs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,10 @@ TEST(SVSTest, compute_distance) {
// the shared singleton. Setting it should log a warning but not affect the pool.
// ---------------------------------------------------------------------------
TEST(SVSTest, NumThreadsParamIgnored) {
// Mark the pool as attached so resize() applies eagerly (this test resizes
// before constructing the SVS index, so the lazy code path would otherwise
// just record the size without spawning threads).
VecSimSVSThreadPoolImpl::instance()->onIndexAttached();
// Resize the shared singleton pool to a known size.
VecSimSVSThreadPool::resize(2);
ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 2);
Expand Down Expand Up @@ -3369,15 +3373,15 @@ TEST(SVSTest, NumThreadsParamIgnored) {
// They are sourced from the same VecSimSVSThreadPool::getSharedAllocationSize()
// (the only contributor to global memory today), so their values must match.
TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) {
// Ensure the shared SVS thread pool singleton has allocated memory so both
// fields report a non-zero value. resize() always lazy-initializes the singleton.
// Request a non-trivial pool size; with lazy init the actual allocation only
// happens on first SVS index creation below.
VecSim_UpdateThreadPoolSize(2);
ASSERT_GT(VecSim_GetGlobalMemory(), 0u);

size_t dim = 4;
SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2};
VecSimIndex *index = this->CreateNewIndex(params);
ASSERT_INDEX(index);
ASSERT_GT(VecSim_GetGlobalMemory(), 0u);

VecSimDebugInfoIterator *infoIterator = VecSimIndex_DebugInfoIterator(index);

Expand Down Expand Up @@ -3416,6 +3420,42 @@ TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) {
VecSimSVSThreadPool::resize(1);
}

// ---------------------------------------------------------------------------
// Lazy thread-pool init: VecSim_UpdateThreadPoolSize must not allocate worker
// threads until the first SVS index attaches.
// ---------------------------------------------------------------------------
TEST(SVSTest, ThreadPoolLazyInit) {
// Reset the shared singleton to a clean state — earlier tests may have
// attached indexes and resized the pool. After reset, getAllocationSize()
// still reports sizeof(VecSimAllocator) (the allocator's self-accounting,
// see VecSimAllocator() ctor); assertions below compare against this
// baseline rather than absolute zero.
VecSimSVSThreadPoolImpl::instance()->resetForTest();
const size_t baseline_mem = VecSim_GetGlobalMemory();

// Recording a non-trivial requested size before any SVS index exists must
// not allocate any worker slots.
VecSim_UpdateThreadPoolSize(8);
EXPECT_EQ(VecSim_GetGlobalMemory(), baseline_mem)
<< "VecSim_UpdateThreadPoolSize must not allocate threads before any SVS index exists";
EXPECT_EQ(VecSimSVSThreadPool::poolSize(), 1u)
<< "Pool size must stay at 1 (no worker slots) until first index attaches";

// First SVS index creation triggers onIndexAttached(), which applies the
// recorded size and spawns 7 worker threads.
SVSParams params = {.type = VecSimType_FLOAT32, .dim = 4, .metric = VecSimMetric_L2};
VecSimParams vp{.algo = VecSimAlgo_SVS, .algoParams = {.svsParams = params}};
VecSimIndex *index = VecSimIndex_New(&vp);
ASSERT_NE(index, nullptr);
EXPECT_GT(VecSim_GetGlobalMemory(), baseline_mem)
<< "First SVS index creation must trigger lazy thread spawn";
EXPECT_EQ(VecSimSVSThreadPool::poolSize(), 8u)
<< "Pool size must reflect the most recent requested size after first attach";

VecSimIndex_Free(index);
VecSimSVSThreadPoolImpl::instance()->resetForTest();
}

#else // HAVE_SVS

TEST(SVSTest, svs_not_supported) {
Expand Down
21 changes: 18 additions & 3 deletions tests/unit/test_svs_threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class SVSThreadPoolTest : public ::testing::Test {
// don't assert on nullptr log_ctx (we don't have an index context).
saved_callback_ = VecSimIndexInterface::logCallback;
VecSimIndexInterface::logCallback = nullptr;
// Mark the shared pool as having an attached index so resize() behaves
// eagerly throughout the test (these unit tests exercise the pool in
// isolation, without constructing real SVS indexes). Idempotent.
VecSimSVSThreadPoolImpl::instance()->onIndexAttached();
// Reset the shared singleton pool to size 1 — earlier test suites may have
// resized it via VecSim_UpdateThreadPoolSize() and left it in that state.
VecSimSVSThreadPool::resize(1);
Expand Down Expand Up @@ -330,15 +334,26 @@ TEST_F(SVSThreadPoolTest, TwoIndexesIndependentParallelism) {

// ---------------------------------------------------------------------------
// Test 6: VecSim_UpdateThreadPoolSize mode transitions
// The C API sets write mode and resizes the shared singleton pool.
// The C API always sets write mode immediately. The pool resize is lazy: it is
// applied at first index attach if no index has attached yet, otherwise
// immediately.
// ---------------------------------------------------------------------------
TEST_F(SVSThreadPoolTest, UpdateThreadPoolSizeModeTransitions) {
// 0 → 4: switch to async mode, pool resizes to 4.
// Reset to a fresh state — previous tests may have attached indexes via
// VecSimSVSThreadPool wrappers, leaving has_attached_index_ set.
VecSimSVSThreadPoolImpl::instance()->resetForTest();

// 0 → 4: switch to async mode immediately. Pool size stays at 1 because
// no SVS index has attached yet (resize is deferred).
VecSim_UpdateThreadPoolSize(4);
ASSERT_EQ(VecSimIndex::asyncWriteMode, VecSim_WriteAsync);
ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 1);

// Attaching an index applies the deferred size.
VecSimSVSThreadPool wrapper{allocator_};
ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 4);

// 4 → 8: stay in async mode (N→M, both > 0), pool resizes to 8.
// 4 → 8: now that an index is attached, resize is immediate.
VecSim_UpdateThreadPoolSize(8);
ASSERT_EQ(VecSimIndex::asyncWriteMode, VecSim_WriteAsync);
ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 8);
Expand Down
Loading