From a7835176c6f5267885b7811952408d2a73bd4226 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:46:59 -0700 Subject: [PATCH 01/13] Fix HTTP client torn reads and response memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - HttpClient_Apple: scope Cancel() to m_dataTask only instead of blanket-cancelling every task on the shared session. Fix torn read on m_requests.empty() in CancelAllRequests spin loop. - HttpClientManager: fix torn read on m_httpCallbacks.empty() in cancelAllRequests spin loop — read under lock. - HttpResponseDecoder: add missing delete ctx->httpResponse before nullptr in Abort and RetryNetwork paths (memory leak). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/http/HttpClientManager.cpp | 9 ++++++++- lib/http/HttpClient_Apple.mm | 24 ++++++------------------ lib/http/HttpResponseDecoder.cpp | 5 +++-- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/lib/http/HttpClientManager.cpp b/lib/http/HttpClientManager.cpp index 58fa5fb4a..a1c228556 100644 --- a/lib/http/HttpClientManager.cpp +++ b/lib/http/HttpClientManager.cpp @@ -149,8 +149,15 @@ namespace MAT_NS_BEGIN { void HttpClientManager::cancelAllRequests() { cancelAllRequestsAsync(); - while (!m_httpCallbacks.empty()) + while (true) + { + { + LOCKGUARD(m_httpCallbacksMtx); + if (m_httpCallbacks.empty()) + break; + } std::this_thread::yield(); + } } // start async cancellation diff --git a/lib/http/HttpClient_Apple.mm b/lib/http/HttpClient_Apple.mm index 05817087a..579b05313 100644 --- a/lib/http/HttpClient_Apple.mm +++ b/lib/http/HttpClient_Apple.mm @@ -132,23 +132,6 @@ void HandleResponse(NSData* data, NSURLResponse* response, NSError* error) void Cancel() { [m_dataTask cancel]; - [session getTasksWithCompletionHandler:^(NSArray* dataTasks, NSArray* uploadTasks, NSArray* downloadTasks) - { - for (NSURLSessionTask* _task in dataTasks) - { - [_task cancel]; - } - - for (NSURLSessionTask* _task in downloadTasks) - { - [_task cancel]; - } - - for (NSURLSessionTask* _task in uploadTasks) - { - [_task cancel]; - } - }]; } private: @@ -214,8 +197,13 @@ void Cancel() for (const auto &id : ids) CancelRequestAsync(id); - while (!m_requests.empty()) + while (true) { + { + std::lock_guard lock(m_requestsMtx); + if (m_requests.empty()) + break; + } PAL::sleep(100); std::this_thread::yield(); } diff --git a/lib/http/HttpResponseDecoder.cpp b/lib/http/HttpResponseDecoder.cpp index 11e9d4096..2bb652fdf 100644 --- a/lib/http/HttpResponseDecoder.cpp +++ b/lib/http/HttpResponseDecoder.cpp @@ -67,13 +67,11 @@ namespace MAT_NS_BEGIN { break; case HttpResult_Aborted: - ctx->httpResponse = nullptr; outcome = Abort; break; case HttpResult_LocalFailure: case HttpResult_NetworkFailure: - ctx->httpResponse = nullptr; outcome = RetryNetwork; break; } @@ -129,6 +127,7 @@ namespace MAT_NS_BEGIN { evt.param1 = 0; // response.GetStatusCode(); DispatchEvent(evt); } + delete ctx->httpResponse; ctx->httpResponse = nullptr; // eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected requestAborted(ctx); @@ -159,6 +158,8 @@ namespace MAT_NS_BEGIN { evt.param1 = response.GetStatusCode(); DispatchEvent(evt); } + delete ctx->httpResponse; + ctx->httpResponse = nullptr; temporaryNetworkFailure(ctx); break; } From 28cf17d40082f4771f96d803a2463fa2b9f3dbd9 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:47:10 -0700 Subject: [PATCH 02/13] Fix WorkerThread shutdown: safe cleanup and diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Only delete queued tasks after successful join (not after detach, where the thread may still access them — undefined behavior) - Replace catch(...) with std::system_error and std::exception handlers that log error code and message - Log pending queue sizes in both join and detach paths Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/pal/WorkerThread.cpp | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 2bdbf6c67..5e843790d 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -6,6 +6,8 @@ #include "pal/WorkerThread.hpp" #include "pal/PAL.hpp" +#include + #if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32) /* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */ @@ -56,22 +58,40 @@ namespace PAL_NS_BEGIN { auto item = new WorkerThreadShutdownItem(); Queue(item); std::thread::id this_id = std::this_thread::get_id(); + bool joined = false; try { - if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) + if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) { m_hThread.join(); - else + joined = true; + } else { m_hThread.detach(); + } + } + catch (const std::system_error& e) { + LOG_ERROR("Thread join/detach failed: [%d] %s", e.code().value(), e.what()); + } + catch (const std::exception& e) { + LOG_ERROR("Thread join/detach failed: %s", e.what()); } - catch (...) {}; - // TODO: [MG] - investigate if we ever drop work items on shutdown. - if (!m_queue.empty()) - { - LOG_WARN("m_queue is not empty!"); + // Log pending work in both paths so operators can see if + // shutdown is dropping tasks. + if (!m_queue.empty()) { + LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size()); } - if (!m_timerQueue.empty()) - { - LOG_WARN("m_timerQueue is not empty!"); + if (!m_timerQueue.empty()) { + LOG_WARN("Shutdown with %zu timer(s) pending", m_timerQueue.size()); + } + + // Clean up any tasks remaining in the queues after shutdown. + // Only safe after join() — the thread has fully exited. + // After detach(), the thread still needs the shutdown item + // and may still be accessing the queues. + if (joined) { + for (auto task : m_queue) { delete task; } + m_queue.clear(); + for (auto task : m_timerQueue) { delete task; } + m_timerQueue.clear(); } } From a355ec5cd6b773349437c9a5691035c4f2ec588f Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:47:24 -0700 Subject: [PATCH 03/13] Make m_runningLatency and m_scheduledUploadTime atomic Both variables are read and written from different threads during normal upload scheduling. Declare as std::atomic to eliminate data races per the C++ memory model. Add .load() for variadic LOG_TRACE calls. Add comment explaining why unlocked stores in uploadAsync are safe. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 10 ++++++---- lib/tpm/TransmissionPolicyManager.hpp | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 83b82cf2a..7f24344e3 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -147,14 +147,14 @@ namespace MAT_NS_BEGIN { m_runningLatency = latency; } auto now = PAL::getMonotonicTimeMs(); - auto delta = Abs64(m_scheduledUploadTime, now); + auto delta = Abs64(m_scheduledUploadTime.load(), now); if (delta <= static_cast(delay.count())) { // Don't need to cancel and reschedule if it's about to happen now anyways. // m_isUploadScheduled check does not have to be strictly atomic because // the completion of upload will schedule more uploads as-needed, we only // want to avoid the unnecessary wasteful rescheduling. - LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency); + LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency.load()); return; } } @@ -173,7 +173,7 @@ namespace MAT_NS_BEGIN { { m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count(); m_runningLatency = latency; - LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency); + LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency.load()); m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency); } } @@ -184,9 +184,11 @@ namespace MAT_NS_BEGIN { if (guard.isPaused()) { return; } + // These stores happen outside the lock but are safe: scheduleUpload + // only reads them when m_isUploadScheduled is true, and we don't + // clear that flag until inside the LOCKGUARD below. m_runningLatency = latency; m_scheduledUploadTime = std::numeric_limits::max(); - { LOCKGUARD(m_scheduledUploadMutex); m_isUploadScheduled = false; // Allow to schedule another uploadAsync diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index e1a91ad10..dc7f91cf9 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -91,7 +91,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; std::atomic m_isPaused { true }; std::atomic m_isUploadScheduled { false }; - uint64_t m_scheduledUploadTime { std::numeric_limits::max() }; + std::atomic m_scheduledUploadTime { std::numeric_limits::max() }; std::mutex m_scheduledUploadMutex; PAL::DeferredCallbackHandle m_scheduledUpload; bool m_scheduledUploadAborted { false }; @@ -131,7 +131,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; size_t uploadCount() const noexcept; std::chrono::milliseconds m_timerdelay { std::chrono::seconds { 2 } }; - EventLatency m_runningLatency { EventLatency_RealTime }; + std::atomic m_runningLatency { EventLatency_RealTime }; TimerArray m_timers; public: From de46cb27cc44800d22bf957fbfcc257ab3ce3edc Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 28 Apr 2026 11:47:34 -0700 Subject: [PATCH 04/13] Fix static-destruction-order crash in Logger destructor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove LOG_TRACE from Logger destructor — it triggers a crash on iOS simulator when the recursive_mutex used by logging has already been destroyed during static destruction. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/api/Logger.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/api/Logger.cpp b/lib/api/Logger.cpp index 54d883664..f76f85734 100644 --- a/lib/api/Logger.cpp +++ b/lib/api/Logger.cpp @@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN Logger::~Logger() noexcept { - LOG_TRACE("%p: Destroyed", this); + // Intentionally empty — logging here triggers a static-destruction-order + // crash on iOS simulator (recursive_mutex used after teardown). } ISemanticContext* Logger::GetSemanticContext() const From 706a01ff8baa2710b460c78e9bbbb896ea1b8b9e Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Wed, 29 Apr 2026 18:04:57 -0700 Subject: [PATCH 05/13] Use cleaner shutdown and scheduler synchronization fixes Reject new worker-thread tasks once shutdown starts so queue cleanup cannot race with late producers, and move the TPM scheduled-upload state back under a single mutex so latency/next-upload decisions stay consistent without mixed atomic and mutex access. Files changed: - lib/pal/WorkerThread.cpp - lib/tpm/TransmissionPolicyManager.cpp - lib/tpm/TransmissionPolicyManager.hpp Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/pal/WorkerThread.cpp | 23 +++++++++-- lib/tpm/TransmissionPolicyManager.cpp | 55 ++++++++++++++++++--------- lib/tpm/TransmissionPolicyManager.hpp | 9 +++-- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 5e843790d..5eccbb5f2 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -37,6 +37,7 @@ namespace PAL_NS_BEGIN { std::list m_timerQueue; Event m_event; MAT::Task* m_itemInProgress; + bool m_shuttingDown = false; int count = 0; public: @@ -55,12 +56,22 @@ namespace PAL_NS_BEGIN { void Join() final { - auto item = new WorkerThreadShutdownItem(); - Queue(item); std::thread::id this_id = std::this_thread::get_id(); bool joined = false; + { + LOCKGUARD(m_lock); + if (!m_shuttingDown) { + m_shuttingDown = true; + m_queue.push_back(new WorkerThreadShutdownItem()); + count++; + m_event.post(); + } + } try { - if (m_hThread.joinable() && (m_hThread.get_id() != this_id)) { + if (!m_hThread.joinable()) { + return; + } + if (m_hThread.get_id() != this_id) { m_hThread.join(); joined = true; } else { @@ -76,6 +87,7 @@ namespace PAL_NS_BEGIN { // Log pending work in both paths so operators can see if // shutdown is dropping tasks. + LOCKGUARD(m_lock); if (!m_queue.empty()) { LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size()); } @@ -99,6 +111,11 @@ namespace PAL_NS_BEGIN { { LOG_INFO("queue item=%p", &item); LOCKGUARD(m_lock); + if (m_shuttingDown) { + LOG_WARN("Dropping queued task %p during shutdown", item); + delete item; + return; + } if (item->Type == MAT::Task::TimedCall) { auto it = m_timerQueue.begin(); while (it != m_timerQueue.end() && (*it)->TargetTime < item->TargetTime) { diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 7f24344e3..e7421bc7f 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -147,14 +147,13 @@ namespace MAT_NS_BEGIN { m_runningLatency = latency; } auto now = PAL::getMonotonicTimeMs(); - auto delta = Abs64(m_scheduledUploadTime.load(), now); + auto delta = Abs64(m_scheduledUploadTime, now); if (delta <= static_cast(delay.count())) { // Don't need to cancel and reschedule if it's about to happen now anyways. - // m_isUploadScheduled check does not have to be strictly atomic because // the completion of upload will schedule more uploads as-needed, we only // want to avoid the unnecessary wasteful rescheduling. - LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency.load()); + LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency); return; } } @@ -162,18 +161,19 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - if (!cancelUploadTask()) + if (!cancelUploadTaskLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); } } // Schedule new upload - if (!m_isUploadScheduled.exchange(true)) + if (!m_isUploadScheduled) { + m_isUploadScheduled = true; m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count(); m_runningLatency = latency; - LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency.load()); + LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency); m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency); } } @@ -184,18 +184,16 @@ namespace MAT_NS_BEGIN { if (guard.isPaused()) { return; } - // These stores happen outside the lock but are safe: scheduleUpload - // only reads them when m_isUploadScheduled is true, and we don't - // clear that flag until inside the LOCKGUARD below. - m_runningLatency = latency; - m_scheduledUploadTime = std::numeric_limits::max(); + EventLatency requestedLatency = latency; { LOCKGUARD(m_scheduledUploadMutex); + requestedLatency = m_runningLatency; + m_scheduledUploadTime = std::numeric_limits::max(); m_isUploadScheduled = false; // Allow to schedule another uploadAsync if ((m_isPaused) || (m_scheduledUploadAborted)) { LOG_TRACE("Paused or upload aborted: cancel pending upload task."); - cancelUploadTask(); // If there is a pending upload task, kill it + cancelUploadTaskLocked(); // If there is a pending upload task, kill it return; } } @@ -212,14 +210,14 @@ namespace MAT_NS_BEGIN { unsigned delayMs = 1000; LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later", proposedBandwidthBps, minimumBandwidthBps, delayMs); - scheduleUpload(delayMs, latency); // reschedule uploadAsync to run again 1000 ms later + scheduleUpload(delayMs, requestedLatency); // reschedule uploadAsync to run again 1000 ms later return; } } #endif auto ctx = m_system.createEventsUploadContext(); - ctx->requestedMinLatency = m_runningLatency; + ctx->requestedMinLatency = requestedLatency; addUpload(ctx); initiateUpload(ctx); } @@ -286,9 +284,9 @@ namespace MAT_NS_BEGIN { LOCKGUARD(m_scheduledUploadMutex); // Prevent execution of all upload tasks m_scheduledUploadAborted = true; - // Make sure we wait for completion of the upload scheduling task that may be running - cancelUploadTask(); } + // Make sure we wait for completion of the upload scheduling task that may be running + cancelUploadTask(); // Make sure we wait for all active upload callbacks to finish while (uploadCount() > 0) @@ -344,7 +342,12 @@ namespace MAT_NS_BEGIN { } // Schedule async upload if not scheduled yet - if (!m_isUploadScheduled || TransmitProfiles::isTimerUpdateRequired()) + bool isUploadScheduled = false; + { + LOCKGUARD(m_scheduledUploadMutex); + isUploadScheduled = m_isUploadScheduled; + } + if (!isUploadScheduled || TransmitProfiles::isTimerUpdateRequired()) { if (updateTimersIfNecessary()) { @@ -376,7 +379,13 @@ namespace MAT_NS_BEGIN { return EventLatency_RealTime; } - if (m_runningLatency == EventLatency_RealTime) + EventLatency runningLatency = EventLatency_RealTime; + { + LOCKGUARD(m_scheduledUploadMutex); + runningLatency = m_runningLatency; + } + + if (runningLatency == EventLatency_RealTime) { return EventLatency_Normal; } @@ -456,6 +465,12 @@ namespace MAT_NS_BEGIN { } bool TransmissionPolicyManager::cancelUploadTask() + { + LOCKGUARD(m_scheduledUploadMutex); + return cancelUploadTaskLocked(); + } + + bool TransmissionPolicyManager::cancelUploadTaskLocked() { bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count()); @@ -464,7 +479,8 @@ namespace MAT_NS_BEGIN { // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { - m_isUploadScheduled.exchange(false); + m_isUploadScheduled = false; + m_scheduledUploadTime = std::numeric_limits::max(); } return result; } @@ -478,6 +494,7 @@ namespace MAT_NS_BEGIN { bool TransmissionPolicyManager::isUploadInProgress() const noexcept { // unfinished uploads that haven't processed callbacks or pending upload task + LOCKGUARD(m_scheduledUploadMutex); return (uploadCount() > 0) || m_isUploadScheduled; } diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index dc7f91cf9..029b6623f 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -90,9 +90,9 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; DeviceStateHandler m_deviceStateHandler; std::atomic m_isPaused { true }; - std::atomic m_isUploadScheduled { false }; - std::atomic m_scheduledUploadTime { std::numeric_limits::max() }; - std::mutex m_scheduledUploadMutex; + bool m_isUploadScheduled { false }; + uint64_t m_scheduledUploadTime { std::numeric_limits::max() }; + mutable std::mutex m_scheduledUploadMutex; PAL::DeferredCallbackHandle m_scheduledUpload; bool m_scheduledUploadAborted { false }; @@ -123,6 +123,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; /// Cancels pending upload task. /// bool cancelUploadTask(); + bool cancelUploadTaskLocked(); /// /// Calculate the number of pending upload contexts. @@ -131,7 +132,7 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; size_t uploadCount() const noexcept; std::chrono::milliseconds m_timerdelay { std::chrono::seconds { 2 } }; - std::atomic m_runningLatency { EventLatency_RealTime }; + EventLatency m_runningLatency { EventLatency_RealTime }; TimerArray m_timers; public: From 0b277171a9e2481fa54f4d5150d780ea69916bf6 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Thu, 30 Apr 2026 06:40:21 -0700 Subject: [PATCH 06/13] Avoid holding TPM scheduler mutex during cancel Keep the scheduled-upload state mutex-based, but stop holding m_scheduledUploadMutex across DeferredCallbackHandle::Cancel so shutdown and pause paths do not block uploadAsync behind the same lock. While touching the path, use std::chrono::milliseconds for the bandwidth-controller reschedule call so ENABLE_BW_CONTROLLER builds cleanly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 70 ++++++++++++++++----------- lib/tpm/TransmissionPolicyManager.hpp | 1 - 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index e7421bc7f..c52ccfc61 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -111,26 +111,35 @@ namespace MAT_NS_BEGIN { LOG_TRACE("Collector URL is not set, no upload."); return; } - LOCKGUARD(m_scheduledUploadMutex); - if (delay.count() < 0 || m_timerdelay.count() < 0) - { - LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); - return; - } - if (m_scheduledUploadAborted) + auto shouldSkipScheduling = [&delay, this]() -> bool { - LOG_TRACE("Scheduled upload aborted, no upload."); - return; - } - if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ]) ) - { - LOG_TRACE("Maximum number of HTTP requests reached"); - return; - } + if (delay.count() < 0 || m_timerdelay.count() < 0) + { + LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); + return true; + } + if (m_scheduledUploadAborted) + { + LOG_TRACE("Scheduled upload aborted, no upload."); + return true; + } + if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ])) + { + LOG_TRACE("Maximum number of HTTP requests reached"); + return true; + } + if (m_isPaused) + { + LOG_TRACE("Paused, not uploading anything until resumed"); + return true; + } - if (m_isPaused) + return false; + }; + + std::unique_lock scheduledUploadLock(m_scheduledUploadMutex); + if (shouldSkipScheduling()) { - LOG_TRACE("Paused, not uploading anything until resumed"); return; } @@ -161,10 +170,16 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - if (!cancelUploadTaskLocked()) + scheduledUploadLock.unlock(); + if (!cancelUploadTask()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); } + scheduledUploadLock.lock(); + if (shouldSkipScheduling()) + { + return; + } } // Schedule new upload @@ -192,8 +207,7 @@ namespace MAT_NS_BEGIN { m_isUploadScheduled = false; // Allow to schedule another uploadAsync if ((m_isPaused) || (m_scheduledUploadAborted)) { - LOG_TRACE("Paused or upload aborted: cancel pending upload task."); - cancelUploadTaskLocked(); // If there is a pending upload task, kill it + LOG_TRACE("Paused or upload aborted: skip upload."); return; } } @@ -210,7 +224,7 @@ namespace MAT_NS_BEGIN { unsigned delayMs = 1000; LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later", proposedBandwidthBps, minimumBandwidthBps, delayMs); - scheduleUpload(delayMs, requestedLatency); // reschedule uploadAsync to run again 1000 ms later + scheduleUpload(std::chrono::milliseconds{delayMs}, requestedLatency); // reschedule uploadAsync to run again 1000 ms later return; } } @@ -466,19 +480,19 @@ namespace MAT_NS_BEGIN { bool TransmissionPolicyManager::cancelUploadTask() { - LOCKGUARD(m_scheduledUploadMutex); - return cancelUploadTaskLocked(); - } - - bool TransmissionPolicyManager::cancelUploadTaskLocked() - { - bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count()); + auto waitTime = std::chrono::milliseconds{}; + { + LOCKGUARD(m_scheduledUploadMutex); + waitTime = getCancelWaitTime(); + } + bool result = m_scheduledUpload.Cancel(waitTime.count()); // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { + LOCKGUARD(m_scheduledUploadMutex); m_isUploadScheduled = false; m_scheduledUploadTime = std::numeric_limits::max(); } diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index 029b6623f..a9cf39a23 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -123,7 +123,6 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; /// Cancels pending upload task. /// bool cancelUploadTask(); - bool cancelUploadTaskLocked(); /// /// Calculate the number of pending upload contexts. From 2cdf8177f4c43ac2b8d9b4b1aa8d9344f7514439 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 4 May 2026 07:26:26 -0500 Subject: [PATCH 07/13] Address runtime review comments Keep forced upload scheduling atomic around no-wait cancellation and preserve HTTP responses until downstream abort/network-failure handlers finish. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/http/HttpResponseDecoder.cpp | 5 - lib/tpm/TransmissionPolicyManager.cpp | 23 +++- lib/tpm/TransmissionPolicyManager.hpp | 7 +- tests/unittests/HttpResponseDecoderTests.cpp | 21 ++- .../TransmissionPolicyManagerTests.cpp | 122 +++++++++++++++++- 5 files changed, 162 insertions(+), 16 deletions(-) diff --git a/lib/http/HttpResponseDecoder.cpp b/lib/http/HttpResponseDecoder.cpp index 2bb652fdf..941931c1e 100644 --- a/lib/http/HttpResponseDecoder.cpp +++ b/lib/http/HttpResponseDecoder.cpp @@ -127,8 +127,6 @@ namespace MAT_NS_BEGIN { evt.param1 = 0; // response.GetStatusCode(); DispatchEvent(evt); } - delete ctx->httpResponse; - ctx->httpResponse = nullptr; // eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected requestAborted(ctx); break; @@ -158,8 +156,6 @@ namespace MAT_NS_BEGIN { evt.param1 = response.GetStatusCode(); DispatchEvent(evt); } - delete ctx->httpResponse; - ctx->httpResponse = nullptr; temporaryNetworkFailure(ctx); break; } @@ -254,4 +250,3 @@ namespace MAT_NS_BEGIN { } } MAT_NS_END - diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index c52ccfc61..100d2339a 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -170,12 +170,10 @@ namespace MAT_NS_BEGIN { // Cancel upload if already scheduled. if (force || delay.count() == 0) { - scheduledUploadLock.unlock(); - if (!cancelUploadTask()) + if (!cancelUploadTaskNoWaitLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); } - scheduledUploadLock.lock(); if (shouldSkipScheduling()) { return; @@ -478,12 +476,31 @@ namespace MAT_NS_BEGIN { return (m_scheduledUploadAborted) ? DefaultTaskCancelTime : std::chrono::milliseconds {}; } + bool TransmissionPolicyManager::cancelUploadTaskNoWaitLocked() + { + bool result = m_scheduledUpload.Cancel(std::chrono::milliseconds {}.count()); + + // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. + // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to + // ensure those tasks are canceled when the log manager is destroyed. Issue 388 + if (result) + { + m_isUploadScheduled = false; + m_scheduledUploadTime = std::numeric_limits::max(); + } + return result; + } + bool TransmissionPolicyManager::cancelUploadTask() { auto waitTime = std::chrono::milliseconds{}; { LOCKGUARD(m_scheduledUploadMutex); waitTime = getCancelWaitTime(); + if (waitTime.count() == 0) + { + return cancelUploadTaskNoWaitLocked(); + } } bool result = m_scheduledUpload.Cancel(waitTime.count()); diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp index a9cf39a23..d6c97beb0 100644 --- a/lib/tpm/TransmissionPolicyManager.hpp +++ b/lib/tpm/TransmissionPolicyManager.hpp @@ -119,6 +119,12 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; std::chrono::milliseconds getCancelWaitTime() const noexcept; + /// + /// Cancels a pending upload task without waiting for a running task to finish. + /// The caller must already hold m_scheduledUploadMutex. + /// + bool cancelUploadTaskNoWaitLocked(); + /// /// Cancels pending upload task. /// @@ -160,4 +166,3 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1"; } MAT_NS_END #endif // TRANSMISSIONPOLICYMANAGER_HPP - diff --git a/tests/unittests/HttpResponseDecoderTests.cpp b/tests/unittests/HttpResponseDecoderTests.cpp index 314cdb513..7d11ae4b8 100644 --- a/tests/unittests/HttpResponseDecoderTests.cpp +++ b/tests/unittests/HttpResponseDecoderTests.cpp @@ -88,20 +88,29 @@ TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryServerFailures) TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryNetworkFailures) { auto ctx = createContextWith(HttpResult_LocalFailure, -1, ""); - EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_LocalFailure); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); ctx = createContextWith(HttpResult_NetworkFailure, -1, ""); - EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_NetworkFailure); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); } TEST_F(HttpResponseDecoderTests, SkipsAbortedRequests) { auto ctx = createContextWith(HttpResult_Aborted, -1, ""); - EXPECT_CALL(*this, resultRequestAborted(ctx)) - .WillOnce(Return()); + EXPECT_CALL(*this, resultRequestAborted(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) { + ASSERT_THAT(routedCtx->httpResponse, NotNull()); + EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_Aborted); + EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1)); + })); decoder.decode(ctx); } diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index 6cbdb99f5..b961df15f 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -11,14 +11,24 @@ #include "tpm/TransmissionPolicyManager.hpp" #include "TransmitProfiles.hpp" +#include +#include +#include +#include + using namespace testing; using namespace MAT; class TransmissionPolicyManager4Test : public TransmissionPolicyManager { public: + TransmissionPolicyManager4Test(ITelemetrySystem& system, ITaskDispatcher& taskDispatcher, IBandwidthController* bandwidthController) + : TransmissionPolicyManager(system, taskDispatcher, bandwidthController) + { + } + TransmissionPolicyManager4Test(ITelemetrySystem& system, IBandwidthController* bandwidthController) - : TransmissionPolicyManager(system, *PAL::getDefaultTaskDispatcher(), bandwidthController) + : TransmissionPolicyManager4Test(system, *PAL::getDefaultTaskDispatcher(), bandwidthController) { } @@ -69,6 +79,82 @@ class TransmissionPolicyManager4Test : public TransmissionPolicyManager { } }; +class BlockingCancelTaskDispatcher : public ITaskDispatcher +{ +public: + ~BlockingCancelTaskDispatcher() override + { + Join(); + } + + void Join() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Queue(Task* task) override + { + std::lock_guard lock(m_tasksMutex); + m_tasks.push_back(task); + } + + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(waitTime); + + { + std::lock_guard lock(m_tasksMutex); + auto it = std::find(m_tasks.begin(), m_tasks.end(), task); + if (it == m_tasks.end()) + { + return false; + } + delete *it; + m_tasks.erase(it); + } + + { + std::lock_guard lock(m_cancelMutex); + m_cancelEntered = true; + } + m_cancelEnteredCv.notify_all(); + + std::unique_lock lock(m_cancelMutex); + m_cancelReleasedCv.wait(lock, [this]() { return m_cancelReleased; }); + return true; + } + + bool WaitForCancel(const std::chrono::milliseconds timeout) + { + std::unique_lock lock(m_cancelMutex); + return m_cancelEnteredCv.wait_for(lock, timeout, [this]() { return m_cancelEntered; }); + } + + void ReleaseCancel() + { + { + std::lock_guard lock(m_cancelMutex); + m_cancelReleased = true; + } + m_cancelReleasedCv.notify_all(); + } + +private: + std::mutex m_tasksMutex; + std::vector m_tasks; + + std::mutex m_cancelMutex; + std::condition_variable m_cancelEnteredCv; + std::condition_variable m_cancelReleasedCv; + bool m_cancelEntered = false; + bool m_cancelReleased = false; +}; + class TransmissionPolicyManagerTests : public StrictMock { protected: StrictMock runtimeConfigMock; @@ -608,6 +694,40 @@ TEST_F(TransmissionPolicyManagerTests, cancelUploadTask_ScheduledUpload_IsUpload ASSERT_FALSE(tpm.m_isUploadScheduled); } +TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCancelBlocks) +{ + BlockingCancelTaskDispatcher dispatcher; + TransmissionPolicyManager4Test blockingTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock); + blockingTpm.paused(false); + + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + + auto forceSchedule = std::async(std::launch::async, [&blockingTpm]() { + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); + }); + + ASSERT_TRUE(dispatcher.WaitForCancel(std::chrono::milliseconds{ 250 })); + + auto delayedSchedule = std::async(std::launch::async, [&blockingTpm]() { + blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + }); + + EXPECT_EQ(delayedSchedule.wait_for(std::chrono::milliseconds{ 100 }), std::future_status::timeout); + + dispatcher.ReleaseCancel(); + + forceSchedule.get(); + delayedSchedule.get(); + + ASSERT_TRUE(blockingTpm.m_isUploadScheduled); + + auto remainingDelayMs = + static_cast(blockingTpm.m_scheduledUploadTime) - static_cast(PAL::getMonotonicTimeMs()); + + EXPECT_GT(remainingDelayMs, -100); + EXPECT_LT(remainingDelayMs, 250); +} + TEST_F(TransmissionPolicyManagerTests, increaseBackoff_EmptyBackoffObject_ReturnZero) { tpm.m_backoff = nullptr; From 95519efd3239812498d9fad5475586b7a363f880 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 4 May 2026 10:34:15 -0500 Subject: [PATCH 08/13] Apply force-scheduled latency when running cancel fails When scheduleUpload is called with force=true (or zero delay) and the previously scheduled upload task is currently executing on the worker, the no-wait cancel returns false and m_isUploadScheduled stays set. The existing m_isUploadScheduled check then skipped scheduling a new task, silently dropping the requested latency for force-scheduled profile changes. Propagate the requested latency to m_runningLatency under the same mutex when this race occurs. uploadAsync re-reads m_runningLatency inside its own LOCKGUARD, so a task that hasn't yet entered that critical section will pick up the new latency. If uploadAsync has already cleared m_isUploadScheduled (past its LOCKGUARD), the existing fallthrough at line 184 schedules a fresh task with the new latency. Add a regression test using a fake dispatcher whose Cancel always returns false, asserting that a force-scheduled call updates m_runningLatency without enqueueing a duplicate task. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 12 +++ .../TransmissionPolicyManagerTests.cpp | 91 +++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 100d2339a..f4c1a800c 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -173,6 +173,18 @@ namespace MAT_NS_BEGIN { if (!cancelUploadTaskNoWaitLocked()) { LOG_TRACE("Upload either hasn't been scheduled or already done."); + // Cancel can return false when the previous upload task is + // currently executing on the worker. If uploadAsync hasn't + // yet entered its own LOCKGUARD (m_isUploadScheduled is + // still set under the mutex we hold), propagate the + // requested latency so the running task picks it up when + // it acquires m_scheduledUploadMutex. Otherwise the + // running task has already cleared the flag and the + // schedule below will queue a fresh task. + if (m_isUploadScheduled) + { + m_runningLatency = latency; + } } if (shouldSkipScheduling()) { diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index b961df15f..23e72eeb7 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -155,6 +155,65 @@ class BlockingCancelTaskDispatcher : public ITaskDispatcher bool m_cancelReleased = false; }; +class RunningTaskDispatcher : public ITaskDispatcher +{ +public: + ~RunningTaskDispatcher() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Join() override + { + std::lock_guard lock(m_tasksMutex); + for (auto* task : m_tasks) + { + delete task; + } + m_tasks.clear(); + } + + void Queue(Task* task) override + { + std::lock_guard lock(m_tasksMutex); + m_tasks.push_back(task); + } + + bool Cancel(Task* task, uint64_t waitTime = 0) override + { + UNREFERENCED_PARAMETER(task); + UNREFERENCED_PARAMETER(waitTime); + // Simulate a task that is currently executing on the worker: + // cancellation can never proceed without waiting for the run + // to complete, so a no-wait cancel must return false. + std::lock_guard lock(m_tasksMutex); + m_cancelCount++; + return false; + } + + size_t QueuedCount() const + { + std::lock_guard lock(m_tasksMutex); + return m_tasks.size(); + } + + size_t CancelCount() const + { + std::lock_guard lock(m_tasksMutex); + return m_cancelCount; + } + +private: + mutable std::mutex m_tasksMutex; + std::vector m_tasks; + size_t m_cancelCount = 0; +}; + class TransmissionPolicyManagerTests : public StrictMock { protected: StrictMock runtimeConfigMock; @@ -728,6 +787,38 @@ TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCa EXPECT_LT(remainingDelayMs, 250); } +TEST_F(TransmissionPolicyManagerTests, ForceScheduleAppliesLatencyWhenRunningCancelFails) +{ + RunningTaskDispatcher dispatcher; + TransmissionPolicyManager4Test runningTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock); + runningTpm.paused(false); + + // Queue an initial upload so m_scheduledUpload has a non-null task and + // m_isUploadScheduled is set; the dispatcher's Cancel will fail later + // (simulating the "task currently executing on worker" race). + runningTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + ASSERT_TRUE(runningTpm.m_isUploadScheduled); + ASSERT_EQ(dispatcher.QueuedCount(), 1u); + + auto scheduledTimeBefore = runningTpm.m_scheduledUploadTime; + // Reset m_runningLatency so we can observe the force path updating it + // (the initial schedule may have bumped it depending on the active + // profile's timers). + runningTpm.runningLatency(EventLatency_Normal); + + // Force a higher-priority schedule. The dispatcher's no-wait cancel + // returns false, so the previous task remains in flight. The fix in + // scheduleUpload must propagate the new latency to m_runningLatency + // so the running task picks it up under the same mutex. + runningTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); + + EXPECT_GE(dispatcher.CancelCount(), 1u); + EXPECT_EQ(dispatcher.QueuedCount(), 1u); + EXPECT_TRUE(runningTpm.m_isUploadScheduled); + EXPECT_EQ(runningTpm.m_runningLatency, EventLatency_RealTime); + EXPECT_EQ(runningTpm.m_scheduledUploadTime, scheduledTimeBefore); +} + TEST_F(TransmissionPolicyManagerTests, increaseBackoff_EmptyBackoffObject_ReturnZero) { tpm.m_backoff = nullptr; From 68f4dd0c0787e1bd352a6536b1467561e16bd20d Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 11 May 2026 11:22:23 -0500 Subject: [PATCH 09/13] Simplify TPM cancellation cleanup Use the existing LOCKGUARD helper because scheduled upload cancellation does not need movable lock ownership. Consolidate the duplicated Issue 388 cancellation note so the PR keeps the remaining limitation documented without repeating the same TODO. Files changed: - lib/tpm/TransmissionPolicyManager.cpp Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index f4c1a800c..1db4e9d5e 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -137,7 +137,7 @@ namespace MAT_NS_BEGIN { return false; }; - std::unique_lock scheduledUploadLock(m_scheduledUploadMutex); + LOCKGUARD(m_scheduledUploadMutex); if (shouldSkipScheduling()) { return; @@ -492,9 +492,6 @@ namespace MAT_NS_BEGIN { { bool result = m_scheduledUpload.Cancel(std::chrono::milliseconds {}.count()); - // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. - // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to - // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { m_isUploadScheduled = false; @@ -516,9 +513,8 @@ namespace MAT_NS_BEGIN { } bool result = m_scheduledUpload.Cancel(waitTime.count()); - // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. - // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to - // ensure those tasks are canceled when the log manager is destroyed. Issue 388 + // Cancel may still fail if the task runs past the wait timeout; + // stronger task lifetime guarantees are tracked by Issue 388. if (result) { LOCKGUARD(m_scheduledUploadMutex); From 4a8cc9de56857402df93ccd6d7689c9d6b59aca9 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 11 May 2026 11:39:12 -0500 Subject: [PATCH 10/13] Simplify TPM force scheduling test Replace tight current-time assertions with a direct comparison against the original delayed schedule time. This keeps coverage for the forced immediate upload race while reducing timing sensitivity in CI. Files changed: - tests/unittests/TransmissionPolicyManagerTests.cpp Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/unittests/TransmissionPolicyManagerTests.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index 23e72eeb7..99603c545 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -760,6 +760,7 @@ TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCa blockingTpm.paused(false); blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); + auto delayedUploadTime = blockingTpm.m_scheduledUploadTime; auto forceSchedule = std::async(std::launch::async, [&blockingTpm]() { blockingTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); @@ -779,12 +780,7 @@ TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCa delayedSchedule.get(); ASSERT_TRUE(blockingTpm.m_isUploadScheduled); - - auto remainingDelayMs = - static_cast(blockingTpm.m_scheduledUploadTime) - static_cast(PAL::getMonotonicTimeMs()); - - EXPECT_GT(remainingDelayMs, -100); - EXPECT_LT(remainingDelayMs, 250); + EXPECT_LT(blockingTpm.m_scheduledUploadTime, delayedUploadTime); } TEST_F(TransmissionPolicyManagerTests, ForceScheduleAppliesLatencyWhenRunningCancelFails) From 563897220b26475e11dfa5ae96a8b41ded348745 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 11 May 2026 11:52:56 -0500 Subject: [PATCH 11/13] Keep TPM cancellation comment wording Restore the existing Issue 388 wording in the remaining cancellation comment while keeping the duplicated helper comment removed. Files changed: - lib/tpm/TransmissionPolicyManager.cpp Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/tpm/TransmissionPolicyManager.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 1db4e9d5e..011cc8b83 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -513,8 +513,9 @@ namespace MAT_NS_BEGIN { } bool result = m_scheduledUpload.Cancel(waitTime.count()); - // Cancel may still fail if the task runs past the wait timeout; - // stronger task lifetime guarantees are tracked by Issue 388. + // TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for. + // We either need a stronger guarantee here (could impact SDK performance), or a mechanism to + // ensure those tasks are canceled when the log manager is destroyed. Issue 388 if (result) { LOCKGUARD(m_scheduledUploadMutex); From 05bd3776d4532a322262930b128b801e49616e4e Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Mon, 11 May 2026 12:40:14 -0500 Subject: [PATCH 12/13] Address runtime review comments Fix printf-style logging arguments for scheduled upload delays and queued worker task pointers. Ensure the blocking cancel test releases the dispatcher before failing so async futures cannot hang the test runner. Files changed: - lib/pal/WorkerThread.cpp - lib/tpm/TransmissionPolicyManager.cpp - tests/unittests/TransmissionPolicyManagerTests.cpp Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/pal/WorkerThread.cpp | 5 ++--- lib/tpm/TransmissionPolicyManager.cpp | 4 +++- tests/unittests/TransmissionPolicyManagerTests.cpp | 7 ++++++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 5eccbb5f2..3f4d43e79 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -109,10 +109,10 @@ namespace PAL_NS_BEGIN { void Queue(MAT::Task* item) final { - LOG_INFO("queue item=%p", &item); + LOG_INFO("queue item=%p", static_cast(item)); LOCKGUARD(m_lock); if (m_shuttingDown) { - LOG_WARN("Dropping queued task %p during shutdown", item); + LOG_WARN("Dropping queued task %p during shutdown", static_cast(item)); delete item; return; } @@ -298,4 +298,3 @@ namespace PAL_NS_BEGIN { } PAL_NS_END #endif - diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 011cc8b83..373a19d4f 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -115,7 +115,9 @@ namespace MAT_NS_BEGIN { { if (delay.count() < 0 || m_timerdelay.count() < 0) { - LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count()); + LOG_TRACE("Negative delay(%lld) or m_timerdelay(%lld), no upload", + static_cast(delay.count()), + static_cast(m_timerdelay.count())); return true; } if (m_scheduledUploadAborted) diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp index 99603c545..2f4a75ec1 100644 --- a/tests/unittests/TransmissionPolicyManagerTests.cpp +++ b/tests/unittests/TransmissionPolicyManagerTests.cpp @@ -766,7 +766,12 @@ TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCa blockingTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true); }); - ASSERT_TRUE(dispatcher.WaitForCancel(std::chrono::milliseconds{ 250 })); + if (!dispatcher.WaitForCancel(std::chrono::milliseconds{ 250 })) + { + dispatcher.ReleaseCancel(); + forceSchedule.get(); + FAIL() << "Timed out waiting for cancel to block"; + } auto delayedSchedule = std::async(std::launch::async, [&blockingTpm]() { blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false); From 2c559d0b716d6e00faf2d1f132173924d9c5d431 Mon Sep 17 00:00:00 2001 From: Bhagirath Mehta Date: Tue, 12 May 2026 18:28:44 -0500 Subject: [PATCH 13/13] Clean up runtime logging follow-ups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address pre-existing follow-ups surfaced during PR review: - TransmissionPolicyManager: fix three LOG_TRACE calls that used %d for uint64_t / chrono::milliseconds::rep values. Now uses %lld / %llu matching the existing codebase pattern (cf. TelemetrySystem.cpp, LogManagerImpl.cpp, OfflineStorage_SQLite.cpp). Also strip the unnecessary static_cast wrappers from the earlier negative-delay log fix at line 118 for consistency. - WorkerThread: remove the dead 'count' member. It was incremented in Queue() (and Join() before the prior shutdown refactor) but never read, returned, exposed via a getter, declared friend, or accessed from any derived class — the field is protected within a concrete class with a private factory, so there's nowhere it could be read. Validation: - Host UnitTests on macOS arm64: 488/488 pass. - TransmissionPolicyManagerTests + HttpClientManagerTests + HttpResponseDecoderTests --gtest_repeat=10: 46/46 each round. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- lib/pal/WorkerThread.cpp | 3 --- lib/tpm/TransmissionPolicyManager.cpp | 9 ++++----- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp index 3f4d43e79..50a7253e8 100644 --- a/lib/pal/WorkerThread.cpp +++ b/lib/pal/WorkerThread.cpp @@ -38,7 +38,6 @@ namespace PAL_NS_BEGIN { Event m_event; MAT::Task* m_itemInProgress; bool m_shuttingDown = false; - int count = 0; public: @@ -63,7 +62,6 @@ namespace PAL_NS_BEGIN { if (!m_shuttingDown) { m_shuttingDown = true; m_queue.push_back(new WorkerThreadShutdownItem()); - count++; m_event.post(); } } @@ -126,7 +124,6 @@ namespace PAL_NS_BEGIN { else { m_queue.push_back(item); } - count++; m_event.post(); } diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp index 373a19d4f..420f830c1 100644 --- a/lib/tpm/TransmissionPolicyManager.cpp +++ b/lib/tpm/TransmissionPolicyManager.cpp @@ -116,8 +116,7 @@ namespace MAT_NS_BEGIN { if (delay.count() < 0 || m_timerdelay.count() < 0) { LOG_TRACE("Negative delay(%lld) or m_timerdelay(%lld), no upload", - static_cast(delay.count()), - static_cast(m_timerdelay.count())); + delay.count(), m_timerdelay.count()); return true; } if (m_scheduledUploadAborted) @@ -164,7 +163,7 @@ namespace MAT_NS_BEGIN { // Don't need to cancel and reschedule if it's about to happen now anyways. // the completion of upload will schedule more uploads as-needed, we only // want to avoid the unnecessary wasteful rescheduling. - LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency); + LOG_TRACE("WAIT upload %llu ms for lat=%d", delta, m_runningLatency); return; } } @@ -200,7 +199,7 @@ namespace MAT_NS_BEGIN { m_isUploadScheduled = true; m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count(); m_runningLatency = latency; - LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency); + LOG_TRACE("SCHED upload %lld ms for lat=%d", delay.count(), m_runningLatency); m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency); } } @@ -264,7 +263,7 @@ namespace MAT_NS_BEGIN { // Rescheduling upload if (nextUpload.count() >= 0) { - LOG_TRACE("Scheduling upload in %d ms", nextUpload.count()); + LOG_TRACE("Scheduling upload in %lld ms", nextUpload.count()); EventLatency proposed = calculateNewPriority(); scheduleUpload(nextUpload, proposed); // reschedule uploadAsync again }