From c68819047b67cb2ade18d511db210f37e3f6c747 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Thu, 14 May 2026 07:37:53 +0000 Subject: [PATCH 1/2] MOD-15579 Lazy initialization of the shared SVS thread pool --- src/VecSim/algorithms/svs/svs_utils.h | 83 +++++++++++++++++++-------- src/VecSim/vec_sim.cpp | 6 +- tests/unit/test_svs.cpp | 45 ++++++++++++++- tests/unit/test_svs_threadpool.cpp | 21 ++++++- 4 files changed, 121 insertions(+), 34 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index 0ec36849c..e49e1a41c 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -474,33 +474,56 @@ class VecSimSVSThreadPoolImpl { return instance()->getAllocationSize(); } - // Physically resize the pool. Creates new OS threads on grow, shuts down idle threads - // on shrink. new_size is total parallelism including the calling thread (minimum 1). - // Occupied threads (held by renters) survive shrink via the deferred-resize protocol — - // the pool defers shrink while jobs are in flight, so slots cannot be destroyed while rented. - // - // If jobs are in flight (pending_jobs_ > 0), shrink is deferred — the target size is - // stored and applied when the last job completes (see endScheduledJob()). Grow is - // always applied immediately so new jobs can use the extra threads right away. + // Resize the shared pool. in all cases the requested size is stored in + // deferred_size_ and applied later if not applied immediately: + // * no SVS index attached yet → recorded; applied on first + // onIndexAttached() (so no OS threads are spawned in deployments that + // never create an SVS index). + // * shrink while jobs are in flight (pending_jobs_ > 0) → recorded; + // applied when endScheduledJob() drops pending_jobs_ back to 0 (avoids + // destroying slots that already-reserved RediSearch workers will rent). + // * otherwise (grows; shrinks with no jobs in flight) → applied + // immediately via resize_locked(). + // The two deferral cases never overlap — no jobs can be in flight before + // the first index attaches. Clamps to a minimum of 1. void resize(size_t new_size) { new_size = std::max(new_size, size_t{1}); std::lock_guard lock{pool_mutex_}; - resize_locked(new_size); + if (has_attached_index_) { + resize_locked(new_size); + } else { + deferred_size_ = new_size; + } } - // Deferred-resize protocol - // ======================== - // When a job is created via createScheduledJobs(), the pool size is snapshotted - // to determine how many reserve jobs to submit to the RediSearch worker pool. - // If resize() shrinks the SVS pool between that snapshot and when the job - // actually executes, the RediSearch workers would have checked in (reserved - // threads exist) but the SVS pool slots they need to rent from would have been - // destroyed — causing a failure. - // - // To prevent this, beginScheduledJob() increments pending_jobs_, and any shrink - // while pending_jobs_ > 0 is deferred (stored in deferred_size_) until the last - // in-flight job completes and its destructor calls endScheduledJob(). Grows are - // always applied immediately since extra threads don't break anything. + // Called from the per-index VecSimSVSThreadPool ctor. The first call flips + // has_attached_index_ and applies any size requested earlier via resize() + // — this is where OS threads are actually spawned in the lazy path. + void onIndexAttached() { + std::lock_guard lock{pool_mutex_}; + if (has_attached_index_) + return; + has_attached_index_ = true; + if (deferred_size_) + resize_locked(deferred_size_.value()); + } + +#ifdef BUILD_TESTS + // Restore the singleton to its as-constructed state: drop all worker slots + // (releasing vector capacity so getAllocationSize() returns 0), clear + // has_attached_index_, deferred_size_, and pending_jobs_. Intended for unit + // tests that need a clean baseline (the singleton itself is process-wide + // and cannot be torn down). Caller must ensure no jobs are in flight. + void resetForTest() { + std::lock_guard lock{pool_mutex_}; + // Swap with a fresh empty vector to release the capacity allocation + // (clear() destroys elements but retains capacity). + std::vector(SlotVecAllocator(allocator_)).swap(slots_); + deferred_size_.reset(); + pending_jobs_ = 0; + has_attached_index_ = false; + } +#endif // Atomically mark a logical job as pending and snapshot the current shared pool size. size_t beginScheduledJob() { @@ -657,8 +680,14 @@ class VecSimSVSThreadPoolImpl { std::shared_ptr allocator_; // pool's own allocator for memory tracking mutable std::mutex pool_mutex_; std::vector slots_; - size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight - std::optional deferred_size_; // resize target deferred until pending_jobs_ == 0 + size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight + // Pending pool size to apply at the next safe point: either the first SVS index + // attaches (onIndexAttached()) or pending_jobs_ drops to 0 (endScheduledJob()). + // The two cases are sequential and never overlap. + std::optional deferred_size_; + // Flips true on the first onIndexAttached() call; gates resize() between + // "record only" and "apply immediately". + bool has_attached_index_ = false; }; // Per-index wrapper around the shared VecSimSVSThreadPoolImpl singleton. @@ -688,12 +717,16 @@ class VecSimSVSThreadPool { // explicit setParallelism() call. // parallelism_ is allocated through the provided VecsimAllocator so that the // allocation is tracked by the index's memory accounting. + // Notifies the shared pool that an index has attached — on the very first call this + // applies any size requested earlier via resize() (lazy thread spawn). explicit VecSimSVSThreadPool(const std::shared_ptr &allocator, void *log_ctx = nullptr) : pool_(VecSimSVSThreadPoolImpl::instance()), parallelism_(std::allocate_shared>( VecsimSTLAllocator>(allocator), size_t{1})), - log_ctx_(log_ctx) {} + log_ctx_(log_ctx) { + pool_->onIndexAttached(); + } // Resize the shared pool singleton. Delegates to VecSimSVSThreadPoolImpl::instance(). static void resize(size_t new_size) { VecSimSVSThreadPoolImpl::instance()->resize(new_size); } diff --git a/src/VecSim/vec_sim.cpp b/src/VecSim/vec_sim.cpp index 6e9699bd5..a26cf64ab 100644 --- a/src/VecSim/vec_sim.cpp +++ b/src/VecSim/vec_sim.cpp @@ -41,9 +41,9 @@ extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) { } else { VecSimIndex::setWriteMode(VecSim_WriteAsync); } - // Resize the shared SVS thread pool. resize() clamps to minimum size 1. - // new_size == 0 → pool size 1 (only calling thread, write-in-place). - // new_size > 0 → pool size new_size (new_size - 1 worker threads). + // Resize the shared SVS pool. Clamped to a minimum of 1. OS threads are spawned + // lazily on first SVS index creation; once an index exists this resizes the + // shared pool immediately (cooperating with the deferred-shrink protocol). VecSimSVSThreadPool::resize(new_size); } diff --git a/tests/unit/test_svs.cpp b/tests/unit/test_svs.cpp index 0ff7c1f0e..0841c90ef 100644 --- a/tests/unit/test_svs.cpp +++ b/tests/unit/test_svs.cpp @@ -3304,6 +3304,10 @@ TEST(SVSTest, compute_distance) { // the shared singleton. Setting it should log a warning but not affect the pool. // --------------------------------------------------------------------------- TEST(SVSTest, NumThreadsParamIgnored) { + // Mark the pool as attached so resize() applies eagerly (this test resizes + // before constructing the SVS index, so the lazy code path would otherwise + // just record the size without spawning threads). + VecSimSVSThreadPoolImpl::instance()->onIndexAttached(); // Resize the shared singleton pool to a known size. VecSimSVSThreadPool::resize(2); ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 2); @@ -3369,15 +3373,15 @@ TEST(SVSTest, NumThreadsParamIgnored) { // They are sourced from the same VecSimSVSThreadPool::getSharedAllocationSize() // (the only contributor to global memory today), so their values must match. TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) { - // Ensure the shared SVS thread pool singleton has allocated memory so both - // fields report a non-zero value. resize() always lazy-initializes the singleton. + // Request a non-trivial pool size; with lazy init the actual allocation only + // happens on first SVS index creation below. VecSim_UpdateThreadPoolSize(2); - ASSERT_GT(VecSim_GetGlobalMemory(), 0u); size_t dim = 4; SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; VecSimIndex *index = this->CreateNewIndex(params); ASSERT_INDEX(index); + ASSERT_GT(VecSim_GetGlobalMemory(), 0u); VecSimDebugInfoIterator *infoIterator = VecSimIndex_DebugInfoIterator(index); @@ -3416,6 +3420,41 @@ TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) { VecSimSVSThreadPool::resize(1); } +// --------------------------------------------------------------------------- +// Lazy thread-pool init: VecSim_UpdateThreadPoolSize must not allocate worker +// threads until the first SVS index attaches. +// --------------------------------------------------------------------------- +TEST(SVSTest, ThreadPoolLazyInit) { + // Reset the shared singleton to a clean state — earlier tests may have + // attached indexes and resized the pool. The allocator may still report a + // small non-zero baseline from internal bookkeeping; assertions below check + // deltas from this baseline rather than absolute zero. + VecSimSVSThreadPoolImpl::instance()->resetForTest(); + const size_t baseline_mem = VecSim_GetGlobalMemory(); + + // Recording a non-trivial requested size before any SVS index exists must + // not allocate any worker slots. + VecSim_UpdateThreadPoolSize(8); + EXPECT_EQ(VecSim_GetGlobalMemory(), baseline_mem) + << "VecSim_UpdateThreadPoolSize must not allocate threads before any SVS index exists"; + EXPECT_EQ(VecSimSVSThreadPool::poolSize(), 1u) + << "Pool size must stay at 1 (no worker slots) until first index attaches"; + + // First SVS index creation triggers onIndexAttached(), which applies the + // recorded size and spawns 7 worker threads. + SVSParams params = {.type = VecSimType_FLOAT32, .dim = 4, .metric = VecSimMetric_L2}; + VecSimParams vp{.algo = VecSimAlgo_SVS, .algoParams = {.svsParams = params}}; + VecSimIndex *index = VecSimIndex_New(&vp); + ASSERT_NE(index, nullptr); + EXPECT_GT(VecSim_GetGlobalMemory(), baseline_mem) + << "First SVS index creation must trigger lazy thread spawn"; + EXPECT_EQ(VecSimSVSThreadPool::poolSize(), 8u) + << "Pool size must reflect the most recent requested size after first attach"; + + VecSimIndex_Free(index); + VecSimSVSThreadPoolImpl::instance()->resetForTest(); +} + #else // HAVE_SVS TEST(SVSTest, svs_not_supported) { diff --git a/tests/unit/test_svs_threadpool.cpp b/tests/unit/test_svs_threadpool.cpp index aac389336..cce1fc15c 100644 --- a/tests/unit/test_svs_threadpool.cpp +++ b/tests/unit/test_svs_threadpool.cpp @@ -44,6 +44,10 @@ class SVSThreadPoolTest : public ::testing::Test { // don't assert on nullptr log_ctx (we don't have an index context). saved_callback_ = VecSimIndexInterface::logCallback; VecSimIndexInterface::logCallback = nullptr; + // Mark the shared pool as having an attached index so resize() behaves + // eagerly throughout the test (these unit tests exercise the pool in + // isolation, without constructing real SVS indexes). Idempotent. + VecSimSVSThreadPoolImpl::instance()->onIndexAttached(); // Reset the shared singleton pool to size 1 — earlier test suites may have // resized it via VecSim_UpdateThreadPoolSize() and left it in that state. VecSimSVSThreadPool::resize(1); @@ -330,15 +334,26 @@ TEST_F(SVSThreadPoolTest, TwoIndexesIndependentParallelism) { // --------------------------------------------------------------------------- // Test 6: VecSim_UpdateThreadPoolSize mode transitions -// The C API sets write mode and resizes the shared singleton pool. +// The C API always sets write mode immediately. The pool resize is lazy: it is +// applied at first index attach if no index has attached yet, otherwise +// immediately. // --------------------------------------------------------------------------- TEST_F(SVSThreadPoolTest, UpdateThreadPoolSizeModeTransitions) { - // 0 → 4: switch to async mode, pool resizes to 4. + // Reset to a fresh state — previous tests may have attached indexes via + // VecSimSVSThreadPool wrappers, leaving has_attached_index_ set. + VecSimSVSThreadPoolImpl::instance()->resetForTest(); + + // 0 → 4: switch to async mode immediately. Pool size stays at 1 because + // no SVS index has attached yet (resize is deferred). VecSim_UpdateThreadPoolSize(4); ASSERT_EQ(VecSimIndex::asyncWriteMode, VecSim_WriteAsync); + ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 1); + + // Attaching an index applies the deferred size. + VecSimSVSThreadPool wrapper{allocator_}; ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 4); - // 4 → 8: stay in async mode (N→M, both > 0), pool resizes to 8. + // 4 → 8: now that an index is attached, resize is immediate. VecSim_UpdateThreadPoolSize(8); ASSERT_EQ(VecSimIndex::asyncWriteMode, VecSim_WriteAsync); ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 8); From 7bc3e24da542736fb7d3b2ab0b691ca25d31922f Mon Sep 17 00:00:00 2001 From: meiravgri Date: Thu, 14 May 2026 08:51:07 +0000 Subject: [PATCH 2/2] comment --- tests/unit/test_svs.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_svs.cpp b/tests/unit/test_svs.cpp index 0841c90ef..afb177f1b 100644 --- a/tests/unit/test_svs.cpp +++ b/tests/unit/test_svs.cpp @@ -3426,9 +3426,10 @@ TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) { // --------------------------------------------------------------------------- TEST(SVSTest, ThreadPoolLazyInit) { // Reset the shared singleton to a clean state — earlier tests may have - // attached indexes and resized the pool. The allocator may still report a - // small non-zero baseline from internal bookkeeping; assertions below check - // deltas from this baseline rather than absolute zero. + // attached indexes and resized the pool. After reset, getAllocationSize() + // still reports sizeof(VecSimAllocator) (the allocator's self-accounting, + // see VecSimAllocator() ctor); assertions below compare against this + // baseline rather than absolute zero. VecSimSVSThreadPoolImpl::instance()->resetForTest(); const size_t baseline_mem = VecSim_GetGlobalMemory();