diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx index 8d50dac63a67b..fd10668d18a7d 100644 --- a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx @@ -257,14 +257,24 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con helper->totalFetchedBytes += size; helper->totalRequestedBytes += size; api.appendFlatHeader(v, headers); + // Adopt the new SHM message BEFORE pruning the old cached one. The + // DPL CacheId is the SHM payload pointer (see addToCache and + // ObjectCache::Id::fromRef). If we pruned first the SHM allocator + // could recycle the just-freed address, the new CacheId would + // numerically equal the old one, and the consumer's ObjectCache + // would skip the deserialisation as "same id" — silently dropping + // every CCDB update from that point onwards. + auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) { + allocator.pruneFromCache(oldDPLCacheIt->second); + } helper->mapURL2DPLCache[path] = cacheId; responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size); continue; } - if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called + if (v.size()) { // but should be overridden by fresh object helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); @@ -276,12 +286,15 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con helper->totalFetchedBytes += size; helper->totalRequestedBytes += size; api.appendFlatHeader(v, headers); + // Adopt-before-prune; see comment in the etag.empty() branch above. + auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) { + allocator.pruneFromCache(oldDPLCacheIt->second); + } helper->mapURL2DPLCache[path] = cacheId; responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); continue; } else { // Only once the etag is actually used, we get the information on how long the object is valid diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index fd78594e365bf..76d26ff63b227 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -347,13 +347,26 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->totalFetchedBytes += v.size(); helper->totalRequestedBytes += v.size(); api.appendFlatHeader(v, headers); + // IMPORTANT: adopt the fresh SHM message BEFORE pruning the previously + // cached one. The DPL CacheId is the SHM payload pointer (see + // MessageContext::addToCache and ObjectCache::Id::fromRef). If we + // pruned first, the SHM allocator could immediately recycle the just- + // freed address for the new allocation, producing a CacheId numerically + // equal to the previous one. The consumer-side ObjectCache then thinks + // the object hasn't changed, returns the stale deserialised heap + // pointer, and CCDB updates silently stop propagating. Allocating + // first guarantees the old chunk is still alive, forcing the allocator + // to choose a different address for the new one. + auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) { + allocator.pruneFromCache(oldDPLCacheIt->second); + } helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); continue; } - if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called + if (v.size()) { // but should be overridden by fresh object helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); @@ -364,11 +377,16 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->totalFetchedBytes += v.size(); helper->totalRequestedBytes += v.size(); api.appendFlatHeader(v, headers); + // Adopt-before-prune: see comment in the etag.empty() branch above + // about why this ordering matters (SHM-address-as-CacheId collision + // would otherwise make the consumer skip the replacement). + auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) { + allocator.pruneFromCache(oldDPLCacheIt->second); + } helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); continue; } else { // Only once the etag is actually used, we get the information on how long the object is valid @@ -448,11 +466,18 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() helper->totalRequestedBytes += v.size(); newOrbitResetTime = getOrbitResetTime(v); api.appendFlatHeader(v, headers); + // Adopt-before-prune; see comment in populateCacheWith() about + // why pruning first would let the SHM allocator recycle the same + // address and produce a CacheId that collides with the consumer's + // ObjectCache id (which is also the SHM payload pointer). + auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); + if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) { + allocator.pruneFromCache(oldDPLCacheIt->second); + } helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - } else if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called + } else if (v.size()) { // but should be overridden by fresh object helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cacheMiss++; helper->mapURL2UUID[path].size = v.size(); @@ -462,11 +487,14 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() helper->totalRequestedBytes += v.size(); newOrbitResetTime = getOrbitResetTime(v); api.appendFlatHeader(v, headers); + // Adopt-before-prune; see analogous comment above. + auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); + if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) { + allocator.pruneFromCache(oldDPLCacheIt->second); + } helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); } // cached object is fine } diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index ed9a31ca2857c..87a502c3fafd6 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -525,6 +525,15 @@ class DataAllocator /// Adopt an already cached message, using an already provided CacheId. void adoptFromCache(Output const& spec, CacheId id, header::SerializationMethod method = header::gSerializationMethodNone); + /// Prune a previously cached message identified by @a id from the message cache. + /// The cached shallow-clone is dropped; if no other in-flight reference exists + /// the underlying shared-memory region will be released by FairMQ. Calling this + /// with an unknown id is a no-op. + /// This is intended to be used when an entry in a user-managed map of CacheIds + /// is about to be overwritten by a fresh adoptContainer() call (see e.g. the + /// CCDB cache replacement in CCDBHelpers). + void pruneFromCache(CacheId id); + /// snapshot object and route to output specified by OutputRef /// Framework makes a (serialized) copy of object content. /// diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index d2e152c1bcacc..cbc403c4d6116 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -415,37 +415,41 @@ class InputRecord auto id = ObjectCache::Id::fromRef(ref); ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification}; // If the matcher does not have an entry in the cache, deserialise it - // and cache the deserialised object at the given id. + // and cache the deserialised object alongside its id, keyed by path. + // The (id, obj) pair is stored per path so that a later cross-path + // SHM-address recycle cannot overwrite another path's cached + // pointer (which would turn the next replace-branch `delete` into a + // destructor call on a wrong-typed object — see ObjectCache.h). auto path = fmt::format("{}", DataSpecUtils::describe(matcher)); LOGP(debug, "{}", path); auto& cache = mRegistry.get(); auto& callbacks = mRegistry.get(); - auto cacheEntry = cache.matcherToId.find(path); - if (cacheEntry == cache.matcherToId.end()) { - cache.matcherToId.insert(std::make_pair(path, id)); + auto cacheEntry = cache.matcherToEntry.find(path); + if (cacheEntry == cache.matcherToEntry.end()) { std::unique_ptr> result(DataRefUtils::as>(ref).release(), false); void* obj = (void*)result.get(); callbacks.call((ConcreteDataMatcher&)matcher, (void*)obj); - cache.idToObject[id] = obj; + cache.matcherToEntry.emplace(path, ObjectCache::Entry{id, obj}); LOGP(info, "Caching in {} ptr to {} ({})", id.value, path, obj); return result; } - auto& oldId = cacheEntry->second; + auto& entry = cacheEntry->second; // The id in the cache is the same, let's simply return it. - if (oldId.value == id.value) { - std::unique_ptr> result((ValueT const*)cache.idToObject[id], false); + if (entry.id.value == id.value) { + std::unique_ptr> result((ValueT const*)entry.obj, false); LOGP(debug, "Returning cached entry {} for {} ({})", id.value, path, (void*)result.get()); return result; } - // The id in the cache is different. Let's destroy the old cached entry - // and create a new one. - delete reinterpret_cast(cache.idToObject[oldId]); + // The id in the cache is different. Destroy this path's previously + // cached object (entry.obj is guaranteed to be the object we created + // for this path — no other path can write into entry) and replace it. + delete reinterpret_cast(entry.obj); std::unique_ptr> result(DataRefUtils::as>(ref).release(), false); void* obj = (void*)result.get(); callbacks.call((ConcreteDataMatcher&)matcher, (void*)obj); - cache.idToObject[id] = obj; - LOGP(info, "Replacing cached entry {} with {} for {} ({})", oldId.value, id.value, path, obj); - oldId.value = id.value; + LOGP(info, "Replacing cached entry {} with {} for {} ({})", entry.id.value, id.value, path, obj); + entry.id = id; + entry.obj = obj; return result; } else { throw runtime_error("Attempt to extract object from message with unsupported serialization type"); @@ -497,29 +501,29 @@ class InputRecord auto id = ObjectCache::Id::fromRef(ref); ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification}; // If the matcher does not have an entry in the cache, deserialise it - // and cache the deserialised object at the given id. + // and cache it per path. Same per-path-keyed structure as the object + // cache above; see ObjectCache.h for the rationale. auto path = fmt::format("{}", DataSpecUtils::describe(matcher)); LOGP(debug, "{}", path); auto& cache = mRegistry.get(); - auto cacheEntry = cache.matcherToMetadataId.find(path); - if (cacheEntry == cache.matcherToMetadataId.end()) { - cache.matcherToMetadataId.insert(std::make_pair(path, id)); - cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref); + auto cacheEntry = cache.matcherToMetadata.find(path); + if (cacheEntry == cache.matcherToMetadata.end()) { + auto [it, inserted] = cache.matcherToMetadata.emplace( + path, ObjectCache::MetadataEntry{id, DataRefUtils::extractCCDBHeaders(ref)}); LOGP(info, "Caching CCDB metadata {}: {}", id.value, path); - return cache.idToMetadata[id]; + return it->second.metadata; } - auto& oldId = cacheEntry->second; + auto& entry = cacheEntry->second; // The id in the cache is the same, let's simply return it. - if (oldId.value == id.value) { + if (entry.id.value == id.value) { LOGP(debug, "Returning cached CCDB metatada {}: {}", id.value, path); - return cache.idToMetadata[id]; + return entry.metadata; } - // The id in the cache is different. Let's destroy the old cached entry - // and create a new one. - LOGP(info, "Replacing cached entry {} with {} for {}", oldId.value, id.value, path); - cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref); - oldId.value = id.value; - return cache.idToMetadata[id]; + // The id in the cache is different. Replace this path's metadata. + LOGP(info, "Replacing cached entry {} with {} for {}", entry.id.value, id.value, path); + entry.id = id; + entry.metadata = DataRefUtils::extractCCDBHeaders(ref); + return entry.metadata; } template diff --git a/Framework/Core/include/Framework/ObjectCache.h b/Framework/Core/include/Framework/ObjectCache.h index a6873aec8a1ac..62c8e59b91955 100644 --- a/Framework/Core/include/Framework/ObjectCache.h +++ b/Framework/Core/include/Framework/ObjectCache.h @@ -14,12 +14,25 @@ #include "Framework/DataRef.h" #include #include +#include namespace o2::framework { /// A cache for CCDB objects or objects in general /// which have more than one timeframe of lifetime. +/// +/// The cache is keyed *per path* rather than by a global id-derived hash. +/// Earlier versions stored a `matcherToId` (path -> id) map alongside an +/// `idToObject` (id -> deserialised object) map keyed by the SHM payload +/// pointer of the incoming message. Because SHM addresses are recycled by +/// the FairMQ allocator once a chunk is freed, two distinct CCDB paths could +/// transiently share the same id at different points in time. Within a single +/// timeframe an earlier path's deserialisation could then overwrite the +/// `idToObject` slot a later path's `matcherToId` was still pointing at, +/// turning the next `delete reinterpret_cast(idToObject[oldId])` into a +/// destructor call on the wrong object type. Storing the (id, object) pair +/// per path closes that hole: every path looks up its own slot only. struct ObjectCache { struct Id { int64_t value; @@ -39,20 +52,29 @@ struct ObjectCache { } }; }; - /// A cache for deserialised objects. - /// This keeps a mapping so that we can tell if a given - /// path was already received and it's blob stored in - /// .second. - std::unordered_map matcherToId; - /// A map from a CacheId (which is the void* ptr of the previous map). - /// to an actual (type erased) pointer to the deserialised object. - std::unordered_map idToObject; - - /// A cache to the deserialised metadata - /// We keep it separate because we want to avoid that looking up - /// the metadata also pollutes the object cache. - std::unordered_map matcherToMetadataId; - std::unordered_map, Id::hash_fn> idToMetadata; + + /// Per-path cache entry for a deserialised CCDB object. + /// `id` is the version marker — compared against the incoming message's id + /// to detect "did this object change?". `obj` is the heap-owned, type- + /// erased pointer to the deserialised value; the path that put it here is + /// the only one allowed to delete it. + struct Entry { + Id id{0}; + void* obj{nullptr}; + }; + + /// Per-path cache entry for the CCDB metadata map. + struct MetadataEntry { + Id id{0}; + std::map metadata; + }; + + /// Path -> (id, object). Replaces the former matcherToId / idToObject pair. + std::unordered_map matcherToEntry; + + /// Path -> (id, metadata). Replaces the former matcherToMetadataId / + /// idToMetadata pair. + std::unordered_map matcherToMetadata; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index d7bfff0dbf19d..6afeb3a3bfad8 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -392,6 +392,15 @@ void DataAllocator::adoptFromCache(const Output& spec, CacheId id, header::Seria context.add(std::move(headerMessage), std::move(payloadMessage), routeIndex); } +void DataAllocator::pruneFromCache(CacheId id) +{ + // Drop the cached shallow-clone for @a id from the message cache. If no other + // outstanding reference is held the underlying SHM region will be released. + // Erasing an unknown id is a no-op (std::unordered_map::erase semantics). + auto& context = mRegistry.get(); + context.pruneFromCache(id.value); +} + void DataAllocator::cookDeadBeef(const Output& spec) { auto& proxy = mRegistry.get();