From 3d6fce157963deacc3953e18cc8262a96ec837df Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 22 Apr 2026 21:09:17 -0400 Subject: [PATCH 01/31] Add InvocationProcessorHandle::noop() for test-mode binary The forthcoming bottlecap-testmode binary (APMSVLS-511) reuses TraceAgent and handle_traces but has no Lambda lifecycle to drive. Adds a noop constructor that spawns a background task acknowledging every ProcessorCommand with a sensible default, so callers never block on their response oneshots. The match is exhaustive: a new ProcessorCommand variant forces a compile error here, keeping test-mode behavior explicit rather than silently dropping responses. --- .../lifecycle/invocation/processor_service.rs | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index a41a95b26..e898b6ac9 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -135,6 +135,58 @@ pub struct InvocationProcessorHandle { } impl InvocationProcessorHandle { + /// Returns a handle backed by a background task that acknowledges every + /// command with a sensible default. Intended for the `bottlecap-testmode` + /// binary, which reuses `TraceAgent` / `handle_traces` but has no Lambda + /// lifecycle state to drive. + /// + /// The match below is exhaustive: adding a new `ProcessorCommand` variant + /// will fail to compile here, so test-mode's behavior for it must be + /// decided explicitly rather than silently dropping a response and + /// hanging the caller on its oneshot. + #[must_use] + pub fn noop() -> Self { + let (sender, mut receiver) = mpsc::channel::(32); + tokio::spawn(async move { + while let Some(command) = receiver.recv().await { + match command { + // Request-response commands: reply with a default so the + // caller doesn't block on the oneshot forever. + ProcessorCommand::GetReparentingInfo { response } => { + let _ = response.send(Ok(std::collections::VecDeque::new())); + } + ProcessorCommand::UpdateReparenting { response, .. } => { + let _ = response.send(Ok(Vec::new())); + } + ProcessorCommand::SetColdStartSpanTraceId { response, .. } => { + let _ = response.send(Ok(None)); + } + ProcessorCommand::PlatformRuntimeDone { response, .. } + | ProcessorCommand::PlatformReport { response, .. } => { + let _ = response.send(()); + } + // Fire-and-forget commands: drop silently. + ProcessorCommand::InvokeEvent { .. } + | ProcessorCommand::PlatformInitStart { .. } + | ProcessorCommand::PlatformInitReport { .. } + | ProcessorCommand::PlatformRestoreStart { .. } + | ProcessorCommand::PlatformRestoreReport { .. } + | ProcessorCommand::PlatformStart { .. } + | ProcessorCommand::UniversalInstrumentationStart { .. } + | ProcessorCommand::UniversalInstrumentationEnd { .. } + | ProcessorCommand::AddReparenting { .. } + | ProcessorCommand::AddTracerSpan { .. } + | ProcessorCommand::ForwardDurableContext { .. } + | ProcessorCommand::OnOutOfMemoryError { .. } + | ProcessorCommand::OnShutdownEvent + | ProcessorCommand::SendCtxSpans { .. } + | ProcessorCommand::Shutdown => {} + } + } + }); + InvocationProcessorHandle { sender } + } + pub async fn on_invoke_event( &self, request_id: String, @@ -657,3 +709,34 @@ impl InvocationProcessorService { debug!("InvocationProcessorService stopped"); } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn noop_request_response_methods_return_defaults() { + let handle = InvocationProcessorHandle::noop(); + + let info = handle.get_reparenting_info().await.unwrap(); + assert!(info.is_empty()); + + let contexts = handle + .update_reparenting(std::collections::VecDeque::new()) + .await + .unwrap(); + assert!(contexts.is_empty()); + + let cold_start = handle.set_cold_start_span_trace_id(42).await.unwrap(); + assert!(cold_start.is_none()); + } + + #[tokio::test] + async fn noop_fire_and_forget_commands_do_not_panic() { + let handle = InvocationProcessorHandle::noop(); + + handle.on_invoke_event("rid".to_string()).await.unwrap(); + handle.on_shutdown_event().await.unwrap(); + handle.on_out_of_memory_error(0).await.unwrap(); + } +} From 62122d00e9ae37c29a3a182c2f67aed2b60ce417 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 22 Apr 2026 21:31:11 -0400 Subject: [PATCH 02/31] fix(bottlecap): replace unwrap() with expect() in noop() tests to satisfy clippy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Co-Authored-By: Claude Code --- .../lifecycle/invocation/processor_service.rs | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index e898b6ac9..562b5325c 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -718,16 +718,22 @@ mod tests { async fn noop_request_response_methods_return_defaults() { let handle = InvocationProcessorHandle::noop(); - let info = handle.get_reparenting_info().await.unwrap(); + let info = handle + .get_reparenting_info() + .await + .expect("noop get_reparenting_info"); assert!(info.is_empty()); let contexts = handle .update_reparenting(std::collections::VecDeque::new()) .await - .unwrap(); + .expect("noop update_reparenting"); assert!(contexts.is_empty()); - let cold_start = handle.set_cold_start_span_trace_id(42).await.unwrap(); + let cold_start = handle + .set_cold_start_span_trace_id(42) + .await + .expect("noop set_cold_start_span_trace_id"); assert!(cold_start.is_none()); } @@ -735,8 +741,17 @@ mod tests { async fn noop_fire_and_forget_commands_do_not_panic() { let handle = InvocationProcessorHandle::noop(); - handle.on_invoke_event("rid".to_string()).await.unwrap(); - handle.on_shutdown_event().await.unwrap(); - handle.on_out_of_memory_error(0).await.unwrap(); + handle + .on_invoke_event("rid".to_string()) + .await + .expect("noop on_invoke_event"); + handle + .on_shutdown_event() + .await + .expect("noop on_shutdown_event"); + handle + .on_out_of_memory_error(0) + .await + .expect("noop on_out_of_memory_error"); } } From 8a70de8df622b376d84c6cae48219e466f6445ed Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 22 Apr 2026 23:07:00 -0400 Subject: [PATCH 03/31] refactor(bottlecap): promote start_trace_agent to library crate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves the start_trace_agent helper from the private bin/bottlecap/main.rs into a new public module, bottlecap::traces::startup, so callers outside the Lambda binary (notably the forthcoming bottlecap-testmode binary) can construct the full trace-processing pipeline through a single call instead of duplicating ~110 lines of wiring. No behavior change. The Lambda binary's call site is unchanged: same function name, same signature, same internal spawn. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/bin/bottlecap/main.rs | 130 +----------------------- bottlecap/src/traces/mod.rs | 3 + bottlecap/src/traces/startup.rs | 148 ++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 128 deletions(-) create mode 100644 bottlecap/src/traces/startup.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 8ee88bb26..84aa8c30c 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -61,21 +61,10 @@ use bottlecap::{ provider::Provider as TagProvider, }, traces::{ - http_client as trace_http_client, propagation::DatadogCompositePropagator, - proxy_aggregator, - proxy_flusher::Flusher as ProxyFlusher, - span_dedup_service, - stats_aggregator::StatsAggregator, - stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService}, - stats_flusher, + stats_concentrator_service::StatsConcentratorHandle, stats_generator::StatsGenerator, - stats_processor, trace_agent, trace_aggregator::SendDataBuilderInfo, - trace_aggregator_service::{ - AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService, - }, - trace_flusher, trace_processor::{self, SendingTraceProcessor}, }, }; @@ -95,7 +84,6 @@ use dogstatsd::{ flusher::{Flusher as MetricsFlusher, FlusherConfig as MetricsFlusherConfig}, metric::{EMPTY_TAGS, SortedTags}, }; -use libdd_trace_obfuscation::obfuscation_config; use reqwest::Client; use std::{collections::hash_map, env, path::Path, str::FromStr, sync::Arc}; use tokio::time::Instant; @@ -365,7 +353,7 @@ async fn extension_loop_active( trace_agent_shutdown_token, stats_concentrator, trace_aggregator_handle, - ) = start_trace_agent( + ) = bottlecap::traces::start_trace_agent( config, &api_key_factory, &tags_provider, @@ -1092,120 +1080,6 @@ fn start_logs_agent( ) } -#[allow(clippy::type_complexity)] -fn start_trace_agent( - config: &Arc, - api_key_factory: &Arc, - tags_provider: &Arc, - invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, - client: &Client, -) -> ( - Sender, - Arc, - Arc, - Arc, - Arc, - tokio_util::sync::CancellationToken, - StatsConcentratorHandle, - TraceAggregatorHandle, -) { - // Build one shared hyper-based HTTP client for trace and stats flushing. - // This client type is required by libdd_trace_utils for SendData::send(). - let trace_http_client = trace_http_client::create_client( - config.proxy_https.as_ref(), - config.tls_cert_file.as_ref(), - config.skip_ssl_validation, - ) - .expect("Failed to create trace HTTP client"); - - // Stats - let (stats_concentrator_service, stats_concentrator_handle) = - StatsConcentratorService::new(Arc::clone(config)); - tokio::spawn(stats_concentrator_service.run()); - let stats_aggregator: Arc> = Arc::new(TokioMutex::new( - StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), - )); - let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new( - api_key_factory.clone(), - stats_aggregator.clone(), - Arc::clone(config), - trace_http_client.clone(), - )); - - let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); - - // Traces - let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default(); - tokio::spawn(trace_aggregator_service.run()); - - let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new( - trace_aggregator_handle.clone(), - config.clone(), - api_key_factory.clone(), - trace_http_client, - )); - - let obfuscation_config = obfuscation_config::ObfuscationConfig { - tag_replace_rules: config.apm_replace_tags.clone(), - http_remove_path_digits: config.apm_config_obfuscation_http_remove_paths_with_digits, - http_remove_query_string: config.apm_config_obfuscation_http_remove_query_string, - obfuscate_memcached: false, - obfuscation_redis_enabled: false, - obfuscation_redis_remove_all_args: false, - }; - - let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new(obfuscation_config), - }); - - let (span_dedup_service, span_dedup_handle) = span_dedup_service::DedupService::new(); - tokio::spawn(span_dedup_service.run()); - - // Proxy - let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default())); - let proxy_flusher = Arc::new(ProxyFlusher::new( - api_key_factory.clone(), - Arc::clone(&proxy_aggregator), - Arc::clone(tags_provider), - Arc::clone(config), - client.clone(), - )); - - let trace_agent = trace_agent::TraceAgent::new( - Arc::clone(config), - trace_aggregator_handle.clone(), - trace_processor.clone(), - stats_aggregator, - stats_processor, - proxy_aggregator, - invocation_processor_handle, - appsec_processor, - Arc::clone(tags_provider), - stats_concentrator_handle.clone(), - span_dedup_handle, - ); - let trace_agent_channel = trace_agent.get_sender_copy(); - let shutdown_token = trace_agent.shutdown_token(); - - tokio::spawn(async move { - if let Err(e) = trace_agent.start().await { - error!("Error starting trace agent: {e:?}"); - } - }); - - ( - trace_agent_channel, - trace_flusher, - trace_processor, - stats_flusher, - proxy_flusher, - shutdown_token, - stats_concentrator_handle, - trace_aggregator_handle, - ) -} - async fn start_dogstatsd( tags_provider: Arc, api_key_factory: Arc, diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 41ee7f064..8e62d96de 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -8,6 +8,7 @@ pub mod proxy_flusher; pub mod span_dedup; pub mod span_dedup_service; pub mod span_pointers; +pub mod startup; pub mod stats_aggregator; pub mod stats_concentrator_service; pub mod stats_flusher; @@ -19,6 +20,8 @@ pub mod trace_aggregator_service; pub mod trace_flusher; pub mod trace_processor; +pub use startup::start_trace_agent; + // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs new file mode 100644 index 000000000..84c1fc0cc --- /dev/null +++ b/bottlecap/src/traces/startup.rs @@ -0,0 +1,148 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use dogstatsd::api_key::ApiKeyFactory; +use libdd_trace_obfuscation::obfuscation_config; +use tokio::sync::Mutex as TokioMutex; +use tokio::sync::mpsc::Sender; +use tokio_util::sync::CancellationToken; +use tracing::error; + +use crate::appsec::processor::Processor as AppSecProcessor; +use crate::config::Config; +use crate::lifecycle::invocation::processor_service::InvocationProcessorHandle; +use crate::tags::provider::Provider as TagProvider; +use crate::traces::{ + http_client as trace_http_client, proxy_aggregator, + proxy_flusher::Flusher as ProxyFlusher, + span_dedup_service, + stats_aggregator::StatsAggregator, + stats_concentrator_service, stats_flusher, stats_processor, trace_agent, + trace_aggregator::SendDataBuilderInfo, + trace_aggregator_service::{self, AggregatorHandle as TraceAggregatorHandle}, + trace_flusher, trace_processor, +}; + +pub use crate::traces::stats_concentrator_service::StatsConcentratorHandle; + +/// Wires up the full trace-processing pipeline (trace + stats + proxy +/// aggregators, services, flushers, and the [`trace_agent::TraceAgent`] HTTP +/// listener) and spawns each background task onto the current tokio runtime. +/// Returns the handles callers need to drive flushing and cancel the +/// listener. +#[allow(clippy::type_complexity)] +pub fn start_trace_agent( + config: &Arc, + api_key_factory: &Arc, + tags_provider: &Arc, + invocation_processor_handle: InvocationProcessorHandle, + appsec_processor: Option>>, + client: &reqwest::Client, +) -> ( + Sender, + Arc, + Arc, + Arc, + Arc, + CancellationToken, + StatsConcentratorHandle, + TraceAggregatorHandle, +) { + // Build one shared hyper-based HTTP client for trace and stats flushing. + // This client type is required by libdd_trace_utils for SendData::send(). + let trace_http_client = trace_http_client::create_client( + config.proxy_https.as_ref(), + config.tls_cert_file.as_ref(), + config.skip_ssl_validation, + ) + .expect("Failed to create trace HTTP client"); + + // Stats + let (stats_concentrator_service, stats_concentrator_handle) = + stats_concentrator_service::StatsConcentratorService::new(Arc::clone(config)); + tokio::spawn(stats_concentrator_service.run()); + let stats_aggregator: Arc> = Arc::new(TokioMutex::new( + StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), + )); + let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new( + api_key_factory.clone(), + stats_aggregator.clone(), + Arc::clone(config), + trace_http_client.clone(), + )); + + let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); + + // Traces + let (trace_aggregator_service, trace_aggregator_handle) = + trace_aggregator_service::AggregatorService::default(); + tokio::spawn(trace_aggregator_service.run()); + + let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new( + trace_aggregator_handle.clone(), + config.clone(), + api_key_factory.clone(), + trace_http_client, + )); + + let obfuscation_config = obfuscation_config::ObfuscationConfig { + tag_replace_rules: config.apm_replace_tags.clone(), + http_remove_path_digits: config.apm_config_obfuscation_http_remove_paths_with_digits, + http_remove_query_string: config.apm_config_obfuscation_http_remove_query_string, + obfuscate_memcached: false, + obfuscation_redis_enabled: false, + obfuscation_redis_remove_all_args: false, + }; + + let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new(obfuscation_config), + }); + + let (span_dedup_service, span_dedup_handle) = span_dedup_service::DedupService::new(); + tokio::spawn(span_dedup_service.run()); + + // Proxy + let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default())); + let proxy_flusher = Arc::new(ProxyFlusher::new( + api_key_factory.clone(), + Arc::clone(&proxy_aggregator), + Arc::clone(tags_provider), + Arc::clone(config), + client.clone(), + )); + + let trace_agent = trace_agent::TraceAgent::new( + Arc::clone(config), + trace_aggregator_handle.clone(), + trace_processor.clone(), + stats_aggregator, + stats_processor, + proxy_aggregator, + invocation_processor_handle, + appsec_processor, + Arc::clone(tags_provider), + stats_concentrator_handle.clone(), + span_dedup_handle, + ); + let trace_agent_channel = trace_agent.get_sender_copy(); + let shutdown_token = trace_agent.shutdown_token(); + + tokio::spawn(async move { + if let Err(e) = trace_agent.start().await { + error!("Error starting trace agent: {e:?}"); + } + }); + + ( + trace_agent_channel, + trace_flusher, + trace_processor, + stats_flusher, + proxy_flusher, + shutdown_token, + stats_concentrator_handle, + trace_aggregator_handle, + ) +} From f49e75a970153fb4ee9b39e3fe5b0357ff8d4723 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 22 Apr 2026 23:10:05 -0400 Subject: [PATCH 04/31] refactor(bottlecap): add TraceAgent::with_flushing_service for test-mode /flush hook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in POST /flush route to the trace-agent listener on 8126, backed by a caller-supplied FlushingService. The route is only registered when the caller calls TraceAgent::with_flushing_service; otherwise the Lambda binary's HTTP surface is unchanged. - Add an optional flushing_service field to TraceAgent with a consuming builder method with_flushing_service(self, fs) -> Self. - In make_router, conditionally merge a /flush sub-router that calls FlushingService::flush_blocking_final when the field is set. - Split the library helper in src/traces/startup.rs into build_trace_agent (returns an unspawned TraceAgent plus the pipeline handles) and a thin start_trace_agent wrapper that spawns. Test-mode will use build_trace_agent so it can attach its FlushingService before spawning; Lambda's call site is unchanged. - No behavior change for the Lambda binary: it never attaches a flushing service, so /flush remains unexposed and start_trace_agent still spawns internally. Preparatory for APMSVLS-511 (bottlecap-testmode binary). 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/mod.rs | 2 +- bottlecap/src/traces/startup.rs | 87 +++++++++++++++++++++-------- bottlecap/src/traces/trace_agent.rs | 42 +++++++++++++- 3 files changed, 106 insertions(+), 25 deletions(-) diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 8e62d96de..860178c61 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -20,7 +20,7 @@ pub mod trace_aggregator_service; pub mod trace_flusher; pub mod trace_processor; -pub use startup::start_trace_agent; +pub use startup::{build_trace_agent, start_trace_agent}; // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs index 84c1fc0cc..65cc620ec 100644 --- a/bottlecap/src/traces/startup.rs +++ b/bottlecap/src/traces/startup.rs @@ -27,20 +27,11 @@ use crate::traces::{ pub use crate::traces::stats_concentrator_service::StatsConcentratorHandle; -/// Wires up the full trace-processing pipeline (trace + stats + proxy -/// aggregators, services, flushers, and the [`trace_agent::TraceAgent`] HTTP -/// listener) and spawns each background task onto the current tokio runtime. -/// Returns the handles callers need to drive flushing and cancel the -/// listener. +/// Return tuple common to [`build_trace_agent`] and [`start_trace_agent`]. +/// Kept as a `type` alias to avoid re-declaring the same eight-tuple in two +/// places. #[allow(clippy::type_complexity)] -pub fn start_trace_agent( - config: &Arc, - api_key_factory: &Arc, - tags_provider: &Arc, - invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, - client: &reqwest::Client, -) -> ( +pub type TraceAgentPipeline = ( Sender, Arc, Arc, @@ -49,7 +40,27 @@ pub fn start_trace_agent( CancellationToken, StatsConcentratorHandle, TraceAggregatorHandle, -) { +); + +/// Builds the full trace-processing pipeline (trace + stats + proxy +/// aggregators, services, flushers) and the [`trace_agent::TraceAgent`] that +/// owns the HTTP listener. Spawns the aggregator/concentrator/dedup services +/// onto the current tokio runtime, but does **not** spawn the `TraceAgent` +/// itself. The caller owns `trace_agent` and is responsible for spawning +/// `trace_agent.start()` (optionally after calling +/// [`trace_agent::TraceAgent::with_flushing_service`] to enable the +/// test-mode `POST /flush` route). +/// +/// Most callers want [`start_trace_agent`] instead, which handles the spawn. +#[allow(clippy::type_complexity)] +pub fn build_trace_agent( + config: &Arc, + api_key_factory: &Arc, + tags_provider: &Arc, + invocation_processor_handle: InvocationProcessorHandle, + appsec_processor: Option>>, + client: &reqwest::Client, +) -> (trace_agent::TraceAgent, TraceAgentPipeline) { // Build one shared hyper-based HTTP client for trace and stats flushing. // This client type is required by libdd_trace_utils for SendData::send(). let trace_http_client = trace_http_client::create_client( @@ -129,13 +140,7 @@ pub fn start_trace_agent( let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); - tokio::spawn(async move { - if let Err(e) = trace_agent.start().await { - error!("Error starting trace agent: {e:?}"); - } - }); - - ( + let pipeline = ( trace_agent_channel, trace_flusher, trace_processor, @@ -144,5 +149,43 @@ pub fn start_trace_agent( shutdown_token, stats_concentrator_handle, trace_aggregator_handle, - ) + ); + + (trace_agent, pipeline) +} + +/// Builds the trace-processing pipeline with [`build_trace_agent`] and spawns +/// the [`trace_agent::TraceAgent`] HTTP listener onto the current tokio +/// runtime. This is the convenience entry point used by the Lambda binary. +/// +/// Callers that need to attach a +/// [`crate::flushing::FlushingService`](crate::flushing::FlushingService), +/// notably the `bottlecap-testmode` binary (which uses it to back a +/// `POST /flush` route), should use [`build_trace_agent`] directly and spawn +/// the returned `TraceAgent` themselves after calling +/// [`trace_agent::TraceAgent::with_flushing_service`]. +pub fn start_trace_agent( + config: &Arc, + api_key_factory: &Arc, + tags_provider: &Arc, + invocation_processor_handle: InvocationProcessorHandle, + appsec_processor: Option>>, + client: &reqwest::Client, +) -> TraceAgentPipeline { + let (trace_agent, pipeline) = build_trace_agent( + config, + api_key_factory, + tags_provider, + invocation_processor_handle, + appsec_processor, + client, + ); + + tokio::spawn(async move { + if let Err(e) = trace_agent.start().await { + error!("Error starting trace agent: {e:?}"); + } + }); + + pipeline } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index fb7e8343a..4c7c16e2e 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -25,6 +25,7 @@ use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ appsec::processor::Processor as AppSecProcessor, config, + flushing::FlushingService, http::{extract_request_body, handler_not_found}, lifecycle::invocation::{ context::ReparentingInfo, processor_service::InvocationProcessorHandle, @@ -67,6 +68,10 @@ const V2_DEBUGGER_ENDPOINT_PATH: &str = "/debugger/v2/input"; const DEBUGGER_DIAGNOSTICS_ENDPOINT_PATH: &str = "/debugger/v1/diagnostics"; const INSTRUMENTATION_ENDPOINT_PATH: &str = "/telemetry/proxy/api/v2/apmtelemetry"; +// Test-mode only: exposed when a FlushingService is attached via +// TraceAgent::with_flushing_service. +const FLUSH_ENDPOINT_PATH: &str = "/flush"; + // Intake endpoints const DSM_INTAKE_PATH: &str = "/api/v0.1/pipeline_stats"; const LLM_OBS_SPANS_INTAKE_PATH: &str = "/api/v2/llmobs"; @@ -118,6 +123,11 @@ pub struct TraceAgent { tx: Sender, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, + /// Optional flushing service that, when attached via + /// [`TraceAgent::with_flushing_service`], backs a `POST /flush` route on + /// the same listener as the trace endpoints. Used by the + /// `bottlecap-testmode` binary; never attached by the Lambda binary. + flushing_service: Option>, } #[derive(Clone, Copy)] @@ -170,9 +180,21 @@ impl TraceAgent { shutdown_token: CancellationToken::new(), stats_concentrator, span_deduper, + flushing_service: None, } } + /// Attaches a [`FlushingService`] that will back a `POST /flush` route + /// registered on the same listener as the trace endpoints. Intended for + /// the `bottlecap-testmode` binary, which needs a deterministic drain + /// hook for the APM parity harness. The Lambda binary does not attach a + /// flushing service; `POST /flush` is not exposed in Lambda mode. + #[must_use] + pub fn with_flushing_service(mut self, flushing_service: Arc) -> Self { + self.flushing_service = Some(flushing_service); + self + } + #[allow(clippy::cast_possible_truncation)] pub async fn start(&self) -> Result<(), Box> { let now = Instant::now(); @@ -281,16 +303,32 @@ impl TraceAgent { let info_router = Router::new().route(INFO_ENDPOINT_PATH, any(Self::info)); - Router::new() + let mut router = Router::new() .merge(trace_router) .merge(stats_router) .merge(proxy_router) - .merge(info_router) + .merge(info_router); + + // POST /flush is only registered when a FlushingService has been + // attached via TraceAgent::with_flushing_service (test-mode only). + if let Some(flushing_service) = &self.flushing_service { + let flush_router = Router::new() + .route(FLUSH_ENDPOINT_PATH, post(Self::flush)) + .with_state(Arc::clone(flushing_service)); + router = router.merge(flush_router); + } + + router .fallback(handler_not_found) // Disable the default body limit so we can use our own limit .layer(DefaultBodyLimit::disable()) } + async fn flush(State(flushing_service): State>) -> StatusCode { + flushing_service.flush_blocking_final().await; + StatusCode::NO_CONTENT + } + async fn graceful_shutdown(shutdown_token: CancellationToken) { shutdown_token.cancelled().await; debug!("TRACE_AGENT | Shutdown signal received, shutting down"); From 3af7ae919abff0787aa17ce9fe010ac582708a7e Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 15:41:14 -0400 Subject: [PATCH 05/31] fix: return 500 from /flush handler on panic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/trace_agent.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 4c7c16e2e..34a4975c2 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -325,8 +325,14 @@ impl TraceAgent { } async fn flush(State(flushing_service): State>) -> StatusCode { - flushing_service.flush_blocking_final().await; - StatusCode::NO_CONTENT + match tokio::task::spawn(async move { + flushing_service.flush_blocking_final().await; + }) + .await + { + Ok(()) => StatusCode::NO_CONTENT, + Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + } } async fn graceful_shutdown(shutdown_token: CancellationToken) { From fb67a5d3b5de64db47d4b7ac286f2974ffc18a5d Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 15:42:03 -0400 Subject: [PATCH 06/31] refactor(startup): remove redundant re-export and allow attr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/startup.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs index 65cc620ec..f8af4fd6b 100644 --- a/bottlecap/src/traces/startup.rs +++ b/bottlecap/src/traces/startup.rs @@ -25,12 +25,9 @@ use crate::traces::{ trace_flusher, trace_processor, }; -pub use crate::traces::stats_concentrator_service::StatsConcentratorHandle; - /// Return tuple common to [`build_trace_agent`] and [`start_trace_agent`]. /// Kept as a `type` alias to avoid re-declaring the same eight-tuple in two /// places. -#[allow(clippy::type_complexity)] pub type TraceAgentPipeline = ( Sender, Arc, @@ -38,7 +35,7 @@ pub type TraceAgentPipeline = ( Arc, Arc, CancellationToken, - StatsConcentratorHandle, + stats_concentrator_service::StatsConcentratorHandle, TraceAggregatorHandle, ); From 7c02abf68ec2576c067938d27c8e793e938a46bf Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 16:12:59 -0400 Subject: [PATCH 07/31] refactor(startup): remove unnecessary clippy::type_complexity allow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit build_trace_agent returns (TraceAgent, TraceAgentPipeline), a 2-tuple whose complexity is hidden behind the type alias — the lint never fires. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/startup.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs index f8af4fd6b..11e37175b 100644 --- a/bottlecap/src/traces/startup.rs +++ b/bottlecap/src/traces/startup.rs @@ -49,7 +49,6 @@ pub type TraceAgentPipeline = ( /// test-mode `POST /flush` route). /// /// Most callers want [`start_trace_agent`] instead, which handles the spawn. -#[allow(clippy::type_complexity)] pub fn build_trace_agent( config: &Arc, api_key_factory: &Arc, From d15e2e5b30e48448df73bf0b68ab65b0a1d403a8 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 16:28:56 -0400 Subject: [PATCH 08/31] refactor(traces): re-export TraceAgentPipeline from traces module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Callers using build_trace_agent need the return type alias accessible via the same crate::traces path as the function itself. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 860178c61..4b9ce75c0 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -20,7 +20,7 @@ pub mod trace_aggregator_service; pub mod trace_flusher; pub mod trace_processor; -pub use startup::{build_trace_agent, start_trace_agent}; +pub use startup::{build_trace_agent, start_trace_agent, TraceAgentPipeline}; // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; From 49a9a80c87e86c7a59db73b5a6b388b68c7e5bb4 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 16:38:33 -0400 Subject: [PATCH 09/31] refactor(noop): match real service channel capacity --- bottlecap/src/lifecycle/invocation/processor_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 562b5325c..2395792d9 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -146,7 +146,7 @@ impl InvocationProcessorHandle { /// hanging the caller on its oneshot. #[must_use] pub fn noop() -> Self { - let (sender, mut receiver) = mpsc::channel::(32); + let (sender, mut receiver) = mpsc::channel::(1000); tokio::spawn(async move { while let Some(command) = receiver.recv().await { match command { From bab8963770f36a92241d0560376b4b5ed08eab6d Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 16:39:41 -0400 Subject: [PATCH 10/31] docs(trace_agent): explain /flush panic isolation --- bottlecap/src/traces/trace_agent.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 34a4975c2..234c5044f 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -325,6 +325,9 @@ impl TraceAgent { } async fn flush(State(flushing_service): State>) -> StatusCode { + // Isolate panics: run the flush on a spawned task so an + // `.expect(...)` or other panic inside `flush_blocking_final` + // surfaces as a 500 instead of tearing down the handler task. match tokio::task::spawn(async move { flushing_service.flush_blocking_final().await; }) From 1be55e7ae3eb29518b4466891e3c1d1691c82112 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 16:40:10 -0400 Subject: [PATCH 11/31] docs(startup): warn about leaked tasks if agent not spawned --- bottlecap/src/traces/startup.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs index 11e37175b..fdc843372 100644 --- a/bottlecap/src/traces/startup.rs +++ b/bottlecap/src/traces/startup.rs @@ -48,6 +48,12 @@ pub type TraceAgentPipeline = ( /// [`trace_agent::TraceAgent::with_flushing_service`] to enable the /// test-mode `POST /flush` route). /// +/// Note: the aggregator/concentrator/dedup tasks spawned here have no +/// external shutdown signal; they run until their command channels are +/// dropped. Callers that abandon the returned `TraceAgent` without either +/// spawning it or dropping the pipeline handles will leak those background +/// tasks for the lifetime of the process. +/// /// Most callers want [`start_trace_agent`] instead, which handles the spawn. pub fn build_trace_agent( config: &Arc, From ecafa3a0b3d2416d69145f5778ca704f0cfae139 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 16:41:16 -0400 Subject: [PATCH 12/31] refactor(startup): consolidate tokio::sync imports --- bottlecap/src/traces/startup.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs index fdc843372..2961bba63 100644 --- a/bottlecap/src/traces/startup.rs +++ b/bottlecap/src/traces/startup.rs @@ -5,8 +5,7 @@ use std::sync::Arc; use dogstatsd::api_key::ApiKeyFactory; use libdd_trace_obfuscation::obfuscation_config; -use tokio::sync::Mutex as TokioMutex; -use tokio::sync::mpsc::Sender; +use tokio::sync::{Mutex as TokioMutex, mpsc::Sender}; use tokio_util::sync::CancellationToken; use tracing::error; From 445b60193d1ce029c694b440a9b67d082765ce77 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 18:00:07 -0400 Subject: [PATCH 13/31] fix(trace_agent): add timeout and panic isolation to POST /flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap flush_blocking_final in a spawned task so panics return 500 instead of tearing down the handler, and add a 30-second timeout so a stuck flush returns 504 rather than hanging the test harness. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/trace_agent.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 234c5044f..2f4bb4788 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -325,16 +325,16 @@ impl TraceAgent { } async fn flush(State(flushing_service): State>) -> StatusCode { - // Isolate panics: run the flush on a spawned task so an - // `.expect(...)` or other panic inside `flush_blocking_final` - // surfaces as a 500 instead of tearing down the handler task. - match tokio::task::spawn(async move { + // Isolate panics and bound execution time: spawn so a panic in + // flush_blocking_final surfaces as 500, timeout so a stuck flush + // returns 504 instead of hanging the test harness. + let task = tokio::task::spawn(async move { flushing_service.flush_blocking_final().await; - }) - .await - { - Ok(()) => StatusCode::NO_CONTENT, - Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + }); + match tokio::time::timeout(std::time::Duration::from_secs(30), task).await { + Ok(Ok(())) => StatusCode::NO_CONTENT, + Ok(Err(_)) => StatusCode::INTERNAL_SERVER_ERROR, + Err(_) => StatusCode::GATEWAY_TIMEOUT, } } From 6b8fbc8ac6ec5a3cd774ac933a083b971337562e Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 18:01:06 -0400 Subject: [PATCH 14/31] test(noop): cover PlatformRuntimeDone and PlatformReport variants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a test that sends PlatformRuntimeDone and PlatformReport commands directly through the noop handle's internal channel and verifies the response oneshots are fulfilled without blocking. 🤖 Co-Authored-By: Claude Code --- .../lifecycle/invocation/processor_service.rs | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 2395792d9..83667ac35 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -754,4 +754,86 @@ mod tests { .await .expect("noop on_out_of_memory_error"); } + + #[tokio::test] + async fn noop_platform_runtime_done_and_report_respond_without_blocking() { + use std::collections::HashMap; + use crate::{ + LAMBDA_RUNTIME_SLUG, + config, + extension::telemetry::events::OnDemandReportMetrics, + traces::{ + stats_concentrator_service::StatsConcentratorService, + stats_generator::StatsGenerator, + trace_processor::ServerlessTraceProcessor, + }, + }; + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + + let config = Arc::new(config::Config::default()); + let (svc, concentrator) = StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(svc.run()); + let trace_sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("ObfuscationConfig"), + ), + }), + trace_tx: tokio::sync::mpsc::channel(1).0, + stats_generator: Arc::new(StatsGenerator::new(concentrator)), + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let handle = InvocationProcessorHandle::noop(); + + let (tx, rx) = oneshot::channel::<()>(); + handle + .sender + .send(ProcessorCommand::PlatformRuntimeDone { + request_id: "rid".to_string(), + metrics: RuntimeDoneMetrics { + duration_ms: 0.0, + produced_bytes: None, + }, + status: Status::Success, + error_type: None, + tags_provider: Arc::clone(&tags_provider), + trace_sender: Arc::clone(&trace_sender), + timestamp: 0, + response: tx, + }) + .await + .expect("send PlatformRuntimeDone"); + rx.await.expect("noop must respond to PlatformRuntimeDone"); + + let (tx, rx) = oneshot::channel::<()>(); + handle + .sender + .send(ProcessorCommand::PlatformReport { + request_id: "rid".to_string(), + metrics: ReportMetrics::OnDemand(OnDemandReportMetrics { + duration_ms: 0.0, + billed_duration_ms: 0, + memory_size_mb: 0, + max_memory_used_mb: 0, + init_duration_ms: None, + restore_duration_ms: None, + }), + timestamp: 0, + status: Status::Success, + error_type: None, + spans: None, + tags_provider, + trace_sender, + response: tx, + }) + .await + .expect("send PlatformReport"); + rx.await.expect("noop must respond to PlatformReport"); + } } From 88c8e30b1bbb3c34e1f330c07e379ee1bd82ecf4 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 18:13:09 -0400 Subject: [PATCH 15/31] style: apply rustfmt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Co-Authored-By: Claude Code --- .../src/lifecycle/invocation/processor_service.rs | 12 ++++-------- bottlecap/src/traces/mod.rs | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 83667ac35..a29d58b57 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -757,18 +757,16 @@ mod tests { #[tokio::test] async fn noop_platform_runtime_done_and_report_respond_without_blocking() { - use std::collections::HashMap; use crate::{ - LAMBDA_RUNTIME_SLUG, - config, + LAMBDA_RUNTIME_SLUG, config, extension::telemetry::events::OnDemandReportMetrics, traces::{ stats_concentrator_service::StatsConcentratorService, - stats_generator::StatsGenerator, - trace_processor::ServerlessTraceProcessor, + stats_generator::StatsGenerator, trace_processor::ServerlessTraceProcessor, }, }; use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + use std::collections::HashMap; let config = Arc::new(config::Config::default()); let (svc, concentrator) = StatsConcentratorService::new(Arc::clone(&config)); @@ -776,9 +774,7 @@ mod tests { let trace_sender = Arc::new(SendingTraceProcessor { appsec: None, processor: Arc::new(ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("ObfuscationConfig"), - ), + obfuscation_config: Arc::new(ObfuscationConfig::new().expect("ObfuscationConfig")), }), trace_tx: tokio::sync::mpsc::channel(1).0, stats_generator: Arc::new(StatsGenerator::new(concentrator)), diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 4b9ce75c0..911e4c762 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -20,7 +20,7 @@ pub mod trace_aggregator_service; pub mod trace_flusher; pub mod trace_processor; -pub use startup::{build_trace_agent, start_trace_agent, TraceAgentPipeline}; +pub use startup::{TraceAgentPipeline, build_trace_agent, start_trace_agent}; // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; From 02630c651e9833c401ec83557e43dd5d9a1dacc2 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Thu, 23 Apr 2026 18:39:18 -0400 Subject: [PATCH 16/31] fix(trace_agent): abort timed-out flush task to bound execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/trace_agent.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 2f4bb4788..54310656a 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -328,13 +328,20 @@ impl TraceAgent { // Isolate panics and bound execution time: spawn so a panic in // flush_blocking_final surfaces as 500, timeout so a stuck flush // returns 504 instead of hanging the test harness. - let task = tokio::task::spawn(async move { + let mut task = tokio::task::spawn(async move { flushing_service.flush_blocking_final().await; }); - match tokio::time::timeout(std::time::Duration::from_secs(30), task).await { + match tokio::time::timeout(std::time::Duration::from_secs(30), &mut task).await { Ok(Ok(())) => StatusCode::NO_CONTENT, Ok(Err(_)) => StatusCode::INTERNAL_SERVER_ERROR, - Err(_) => StatusCode::GATEWAY_TIMEOUT, + Err(_) => { + task.abort(); + match task.await { + Err(join_err) if join_err.is_cancelled() => StatusCode::GATEWAY_TIMEOUT, + Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + Ok(()) => StatusCode::GATEWAY_TIMEOUT, + } + } } } From 9b6c1d2e790f514d6e0c314a44c36526f8c2081f Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 11:46:55 -0400 Subject: [PATCH 17/31] refactor(startup): replace TraceAgentPipeline tuple with named struct Tuples beyond 3-4 fields are fragile: every call site destructures by position, and any reordering (or two fields of the same type swapping) breaks silently. Now that the pipeline is a pub API type shared between the Lambda binary and future test-mode binary, promote it to a struct with named pub fields. Addresses review feedback on PR #1201. --- bottlecap/src/bin/bottlecap/main.rs | 8 +++--- bottlecap/src/traces/startup.rs | 43 ++++++++++++++--------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 84aa8c30c..65ca33a23 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -344,16 +344,16 @@ async fn extension_loop_active( } }; - let ( - trace_agent_channel, + let bottlecap::traces::TraceAgentPipeline { + trace_tx: trace_agent_channel, trace_flusher, trace_processor, stats_flusher, proxy_flusher, - trace_agent_shutdown_token, + shutdown_token: trace_agent_shutdown_token, stats_concentrator, trace_aggregator_handle, - ) = bottlecap::traces::start_trace_agent( + } = bottlecap::traces::start_trace_agent( config, &api_key_factory, &tags_provider, diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/traces/startup.rs index 2961bba63..ae7f03e2a 100644 --- a/bottlecap/src/traces/startup.rs +++ b/bottlecap/src/traces/startup.rs @@ -18,25 +18,27 @@ use crate::traces::{ proxy_flusher::Flusher as ProxyFlusher, span_dedup_service, stats_aggregator::StatsAggregator, - stats_concentrator_service, stats_flusher, stats_processor, trace_agent, + stats_concentrator_service::{self, StatsConcentratorHandle}, + stats_flusher, stats_processor, trace_agent, trace_aggregator::SendDataBuilderInfo, trace_aggregator_service::{self, AggregatorHandle as TraceAggregatorHandle}, trace_flusher, trace_processor, }; -/// Return tuple common to [`build_trace_agent`] and [`start_trace_agent`]. -/// Kept as a `type` alias to avoid re-declaring the same eight-tuple in two -/// places. -pub type TraceAgentPipeline = ( - Sender, - Arc, - Arc, - Arc, - Arc, - CancellationToken, - stats_concentrator_service::StatsConcentratorHandle, - TraceAggregatorHandle, -); +/// Handles produced by [`build_trace_agent`] / [`start_trace_agent`]. Holds +/// the trace-channel sender, the per-domain flushers, the shutdown token, and +/// the aggregator/concentrator handles the caller needs to drive flushes and +/// shut the pipeline down. +pub struct TraceAgentPipeline { + pub trace_tx: Sender, + pub trace_flusher: Arc, + pub trace_processor: Arc, + pub stats_flusher: Arc, + pub proxy_flusher: Arc, + pub shutdown_token: CancellationToken, + pub stats_concentrator: StatsConcentratorHandle, + pub trace_aggregator_handle: TraceAggregatorHandle, +} /// Builds the full trace-processing pipeline (trace + stats + proxy /// aggregators, services, flushers) and the [`trace_agent::TraceAgent`] that @@ -138,19 +140,16 @@ pub fn build_trace_agent( stats_concentrator_handle.clone(), span_dedup_handle, ); - let trace_agent_channel = trace_agent.get_sender_copy(); - let shutdown_token = trace_agent.shutdown_token(); - - let pipeline = ( - trace_agent_channel, + let pipeline = TraceAgentPipeline { + trace_tx: trace_agent.get_sender_copy(), trace_flusher, trace_processor, stats_flusher, proxy_flusher, - shutdown_token, - stats_concentrator_handle, + shutdown_token: trace_agent.shutdown_token(), + stats_concentrator: stats_concentrator_handle, trace_aggregator_handle, - ); + }; (trace_agent, pipeline) } From c5f4a707441d0506b6ca6c54b3929b46eb96cb86 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 11:56:19 -0400 Subject: [PATCH 18/31] refactor(startup): move out of traces to top-level bottlecap::startup This module wires trace, stats, proxy, config, lifecycle, tags, appsec, and flushing pieces together. It is cross-cutting orchestration, not trace-domain code, so placing it under traces/ misrepresents its role and makes callers treat bottlecap::traces::start_trace_agent as if it were a trace-domain API. Promote it to bottlecap::startup at the crate root so both the Lambda binary and the forthcoming test-mode binary can consume it without reaching into a domain module. Addresses review feedback on PR #1201. --- bottlecap/src/bin/bottlecap/main.rs | 4 ++-- bottlecap/src/lib.rs | 1 + bottlecap/src/{traces => }/startup.rs | 0 bottlecap/src/traces/mod.rs | 3 --- 4 files changed, 3 insertions(+), 5 deletions(-) rename bottlecap/src/{traces => }/startup.rs (100%) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 65ca33a23..59de10197 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -344,7 +344,7 @@ async fn extension_loop_active( } }; - let bottlecap::traces::TraceAgentPipeline { + let bottlecap::startup::TraceAgentPipeline { trace_tx: trace_agent_channel, trace_flusher, trace_processor, @@ -353,7 +353,7 @@ async fn extension_loop_active( shutdown_token: trace_agent_shutdown_token, stats_concentrator, trace_aggregator_handle, - } = bottlecap::traces::start_trace_agent( + } = bottlecap::startup::start_trace_agent( config, &api_key_factory, &tags_provider, diff --git a/bottlecap/src/lib.rs b/bottlecap/src/lib.rs index df94fd246..97784956c 100644 --- a/bottlecap/src/lib.rs +++ b/bottlecap/src/lib.rs @@ -35,6 +35,7 @@ pub mod otlp; pub mod proc; pub mod proxy; pub mod secrets; +pub mod startup; pub mod tags; pub mod traces; diff --git a/bottlecap/src/traces/startup.rs b/bottlecap/src/startup.rs similarity index 100% rename from bottlecap/src/traces/startup.rs rename to bottlecap/src/startup.rs diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 911e4c762..41ee7f064 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -8,7 +8,6 @@ pub mod proxy_flusher; pub mod span_dedup; pub mod span_dedup_service; pub mod span_pointers; -pub mod startup; pub mod stats_aggregator; pub mod stats_concentrator_service; pub mod stats_flusher; @@ -20,8 +19,6 @@ pub mod trace_aggregator_service; pub mod trace_flusher; pub mod trace_processor; -pub use startup::{TraceAgentPipeline, build_trace_agent, start_trace_agent}; - // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; From 5a56b750ca1caec08b21db2f0ea7dc4d6880adde Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 13:37:58 -0400 Subject: [PATCH 19/31] fix(trace_agent): return 504 directly after aborting flush task After task.abort(), the awaited JoinError is effectively always Cancelled, and all three arms of the post-abort match returned (or should have returned) 504 for the test harness. The extra match adds branches to reason about without changing the result, and the lone Err(_) -> 500 arm is unreachable in practice (a panic surfaces through the Ok(Err(_)) branch of the outer timeout). Drop the await-after-abort and return 504 directly. Addresses review feedback on PR #1201. --- bottlecap/src/traces/trace_agent.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 54310656a..676a86e9f 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -336,11 +336,7 @@ impl TraceAgent { Ok(Err(_)) => StatusCode::INTERNAL_SERVER_ERROR, Err(_) => { task.abort(); - match task.await { - Err(join_err) if join_err.is_cancelled() => StatusCode::GATEWAY_TIMEOUT, - Err(_) => StatusCode::INTERNAL_SERVER_ERROR, - Ok(()) => StatusCode::GATEWAY_TIMEOUT, - } + StatusCode::GATEWAY_TIMEOUT } } } From 2ad997c4b39519ff56dfe9ff4c0c32b54f0a2d24 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 13:47:03 -0400 Subject: [PATCH 20/31] test(noop): call on_platform_runtime_done/on_platform_report via handle The test previously built ProcessorCommand::PlatformRuntimeDone and PlatformReport by hand and pushed them through handle.sender, making the reader mentally map raw command construction to the handler behavior and coupling the test to the channel internals. Use the already-public InvocationProcessorHandle methods, which exercise the same code path, drop ~15 lines of field plumbing per variant, and stay resilient to future refactors of the command channel. Addresses review feedback on PR #1201. --- .../lifecycle/invocation/processor_service.rs | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index a29d58b57..9a26f2b0d 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -787,32 +787,26 @@ mod tests { let handle = InvocationProcessorHandle::noop(); - let (tx, rx) = oneshot::channel::<()>(); handle - .sender - .send(ProcessorCommand::PlatformRuntimeDone { - request_id: "rid".to_string(), - metrics: RuntimeDoneMetrics { + .on_platform_runtime_done( + "rid".to_string(), + RuntimeDoneMetrics { duration_ms: 0.0, produced_bytes: None, }, - status: Status::Success, - error_type: None, - tags_provider: Arc::clone(&tags_provider), - trace_sender: Arc::clone(&trace_sender), - timestamp: 0, - response: tx, - }) + Status::Success, + None, + Arc::clone(&tags_provider), + Arc::clone(&trace_sender), + 0, + ) .await - .expect("send PlatformRuntimeDone"); - rx.await.expect("noop must respond to PlatformRuntimeDone"); + .expect("noop on_platform_runtime_done"); - let (tx, rx) = oneshot::channel::<()>(); handle - .sender - .send(ProcessorCommand::PlatformReport { - request_id: "rid".to_string(), - metrics: ReportMetrics::OnDemand(OnDemandReportMetrics { + .on_platform_report( + "rid", + ReportMetrics::OnDemand(OnDemandReportMetrics { duration_ms: 0.0, billed_duration_ms: 0, memory_size_mb: 0, @@ -820,16 +814,14 @@ mod tests { init_duration_ms: None, restore_duration_ms: None, }), - timestamp: 0, - status: Status::Success, - error_type: None, - spans: None, + 0, + Status::Success, + &None, + &None, tags_provider, trace_sender, - response: tx, - }) + ) .await - .expect("send PlatformReport"); - rx.await.expect("noop must respond to PlatformReport"); + .expect("noop on_platform_report"); } } From d9ac24dd1df419fa233ebe320fc6302cc4b4bbee Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 13:55:33 -0400 Subject: [PATCH 21/31] docs(startup,trace_agent,noop): describe behavior, not the consumer Library doc comments named the bottlecap-testmode binary and the POST /flush route it registers. Library APIs should document what they do and why a caller might reach for them, not which specific caller today happens to. Rewrite the affected doc comments (and one internal impl comment) to describe behavior and motivation generically: "a deterministic on-demand drain hook", "a handle that has no Lambda lifecycle state to drive", and so on. Addresses review feedback on PR #1201. --- .../lifecycle/invocation/processor_service.rs | 8 ++++---- bottlecap/src/startup.rs | 16 ++++++++-------- bottlecap/src/traces/trace_agent.rs | 14 +++++++------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 9a26f2b0d..4b48333e0 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -136,12 +136,12 @@ pub struct InvocationProcessorHandle { impl InvocationProcessorHandle { /// Returns a handle backed by a background task that acknowledges every - /// command with a sensible default. Intended for the `bottlecap-testmode` - /// binary, which reuses `TraceAgent` / `handle_traces` but has no Lambda - /// lifecycle state to drive. + /// command with a sensible default. Use this for callers that need an + /// `InvocationProcessorHandle` (for example, to reuse `TraceAgent` or + /// `handle_traces`) but have no Lambda lifecycle state to drive. /// /// The match below is exhaustive: adding a new `ProcessorCommand` variant - /// will fail to compile here, so test-mode's behavior for it must be + /// will fail to compile here, so the noop behavior for it must be /// decided explicitly rather than silently dropping a response and /// hanging the caller on its oneshot. #[must_use] diff --git a/bottlecap/src/startup.rs b/bottlecap/src/startup.rs index ae7f03e2a..d0fc9df31 100644 --- a/bottlecap/src/startup.rs +++ b/bottlecap/src/startup.rs @@ -45,9 +45,8 @@ pub struct TraceAgentPipeline { /// owns the HTTP listener. Spawns the aggregator/concentrator/dedup services /// onto the current tokio runtime, but does **not** spawn the `TraceAgent` /// itself. The caller owns `trace_agent` and is responsible for spawning -/// `trace_agent.start()` (optionally after calling -/// [`trace_agent::TraceAgent::with_flushing_service`] to enable the -/// test-mode `POST /flush` route). +/// `trace_agent.start()`, optionally after further configuring it (for +/// example, via [`trace_agent::TraceAgent::with_flushing_service`]). /// /// Note: the aggregator/concentrator/dedup tasks spawned here have no /// external shutdown signal; they run until their command channels are @@ -156,13 +155,14 @@ pub fn build_trace_agent( /// Builds the trace-processing pipeline with [`build_trace_agent`] and spawns /// the [`trace_agent::TraceAgent`] HTTP listener onto the current tokio -/// runtime. This is the convenience entry point used by the Lambda binary. +/// runtime. Convenience entry point for callers that do not need to +/// further configure the `TraceAgent` before spawning it. /// /// Callers that need to attach a -/// [`crate::flushing::FlushingService`](crate::flushing::FlushingService), -/// notably the `bottlecap-testmode` binary (which uses it to back a -/// `POST /flush` route), should use [`build_trace_agent`] directly and spawn -/// the returned `TraceAgent` themselves after calling +/// [`crate::flushing::FlushingService`](crate::flushing::FlushingService) +/// (or otherwise customize the `TraceAgent`) should use +/// [`build_trace_agent`] directly and spawn the returned `TraceAgent` +/// themselves after applying the extra configuration, e.g. via /// [`trace_agent::TraceAgent::with_flushing_service`]. pub fn start_trace_agent( config: &Arc, diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 676a86e9f..fe47d2271 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -125,8 +125,8 @@ pub struct TraceAgent { span_deduper: DedupHandle, /// Optional flushing service that, when attached via /// [`TraceAgent::with_flushing_service`], backs a `POST /flush` route on - /// the same listener as the trace endpoints. Used by the - /// `bottlecap-testmode` binary; never attached by the Lambda binary. + /// the same listener as the trace endpoints. `None` when the caller + /// wants no external flush trigger. flushing_service: Option>, } @@ -185,10 +185,10 @@ impl TraceAgent { } /// Attaches a [`FlushingService`] that will back a `POST /flush` route - /// registered on the same listener as the trace endpoints. Intended for - /// the `bottlecap-testmode` binary, which needs a deterministic drain - /// hook for the APM parity harness. The Lambda binary does not attach a - /// flushing service; `POST /flush` is not exposed in Lambda mode. + /// registered on the same listener as the trace endpoints. Use this for + /// callers that need a deterministic, on-demand drain hook (for example, + /// a test harness that flushes between requests). Without this call, + /// the agent never registers a `POST /flush` route. #[must_use] pub fn with_flushing_service(mut self, flushing_service: Arc) -> Self { self.flushing_service = Some(flushing_service); @@ -310,7 +310,7 @@ impl TraceAgent { .merge(info_router); // POST /flush is only registered when a FlushingService has been - // attached via TraceAgent::with_flushing_service (test-mode only). + // attached via TraceAgent::with_flushing_service. if let Some(flushing_service) = &self.flushing_service { let flush_router = Router::new() .route(FLUSH_ENDPOINT_PATH, post(Self::flush)) From 45e70ea7a723c7e7a5b7b0149fe6590db7c522cf Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 14:58:45 -0400 Subject: [PATCH 22/31] refactor(trace_agent): replace flushing_service Option with RouterExtension trait The flushing_service: Option> field was test-mode scaffolding on the production struct. Every TraceAgent in production carried a None that existed solely for a future test binary, and any additional test-mode hook would have had to add another Option field with the same shape, each pulling another consumer-specific dependency into trace_agent.rs. Replace it with a single generic extension seam: - pub trait RouterExtension { fn extend(&self, router: Router) -> Router; } - TraceAgent stores Option> and exposes with_router_extension(self, ext) -> Self in place of with_flushing_service. - make_router calls extension.extend(router) when set, before applying the outer fallback and body-limit layers. - FLUSH_ENDPOINT_PATH, the flush handler, and the crate::flushing FlushingService import are removed from trace_agent.rs. The trace- agent module now knows nothing about FlushingService. Future test-mode hooks add new methods (with defaults) to the trait or new impls behind it, not new Option fields on TraceAgent. A #[cfg(test)]-only SpyExtension validates the seam end-to-end: trait shape compiles, state is carried via Arc, and merged routes are reachable through the composed Router. Addresses review feedback on PR #1201 (deferred comment 6). --- bottlecap/src/startup.rs | 12 +-- bottlecap/src/traces/trace_agent.rs | 130 +++++++++++++++++++--------- 2 files changed, 94 insertions(+), 48 deletions(-) diff --git a/bottlecap/src/startup.rs b/bottlecap/src/startup.rs index d0fc9df31..4080c9bd7 100644 --- a/bottlecap/src/startup.rs +++ b/bottlecap/src/startup.rs @@ -46,7 +46,7 @@ pub struct TraceAgentPipeline { /// onto the current tokio runtime, but does **not** spawn the `TraceAgent` /// itself. The caller owns `trace_agent` and is responsible for spawning /// `trace_agent.start()`, optionally after further configuring it (for -/// example, via [`trace_agent::TraceAgent::with_flushing_service`]). +/// example, via [`trace_agent::TraceAgent::with_router_extension`]). /// /// Note: the aggregator/concentrator/dedup tasks spawned here have no /// external shutdown signal; they run until their command channels are @@ -159,11 +159,11 @@ pub fn build_trace_agent( /// further configure the `TraceAgent` before spawning it. /// /// Callers that need to attach a -/// [`crate::flushing::FlushingService`](crate::flushing::FlushingService) -/// (or otherwise customize the `TraceAgent`) should use -/// [`build_trace_agent`] directly and spawn the returned `TraceAgent` -/// themselves after applying the extra configuration, e.g. via -/// [`trace_agent::TraceAgent::with_flushing_service`]. +/// [`trace_agent::RouterExtension`](trace_agent::RouterExtension) (or +/// otherwise customize the `TraceAgent`) should use [`build_trace_agent`] +/// directly and spawn the returned `TraceAgent` themselves after applying +/// the extra configuration, e.g. via +/// [`trace_agent::TraceAgent::with_router_extension`]. pub fn start_trace_agent( config: &Arc, api_key_factory: &Arc, diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index fe47d2271..e2d7a5a6c 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -25,7 +25,6 @@ use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ appsec::processor::Processor as AppSecProcessor, config, - flushing::FlushingService, http::{extract_request_body, handler_not_found}, lifecycle::invocation::{ context::ReparentingInfo, processor_service::InvocationProcessorHandle, @@ -68,10 +67,6 @@ const V2_DEBUGGER_ENDPOINT_PATH: &str = "/debugger/v2/input"; const DEBUGGER_DIAGNOSTICS_ENDPOINT_PATH: &str = "/debugger/v1/diagnostics"; const INSTRUMENTATION_ENDPOINT_PATH: &str = "/telemetry/proxy/api/v2/apmtelemetry"; -// Test-mode only: exposed when a FlushingService is attached via -// TraceAgent::with_flushing_service. -const FLUSH_ENDPOINT_PATH: &str = "/flush"; - // Intake endpoints const DSM_INTAKE_PATH: &str = "/api/v0.1/pipeline_stats"; const LLM_OBS_SPANS_INTAKE_PATH: &str = "/api/v2/llmobs"; @@ -110,6 +105,20 @@ pub struct ProxyState { pub proxy_aggregator: Arc>, } +/// Extension seam for the [`TraceAgent`] HTTP router. Implementors receive +/// the fully-assembled production router and return a router with any +/// additional routes merged in. Used to attach optional routes (for example, +/// a deterministic drain endpoint) without adding a dedicated field per +/// route to [`TraceAgent`]. +/// +/// Implementors must apply `.with_state(...)` to their sub-router before +/// returning, so the type parameter of the returned [`Router`] is the same +/// as the one passed in (axum's `Router::merge` requires matching state +/// types). +pub trait RouterExtension: Send + Sync { + fn extend(&self, router: Router) -> Router; +} + pub struct TraceAgent { pub config: Arc, pub trace_processor: Arc, @@ -123,11 +132,11 @@ pub struct TraceAgent { tx: Sender, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, - /// Optional flushing service that, when attached via - /// [`TraceAgent::with_flushing_service`], backs a `POST /flush` route on - /// the same listener as the trace endpoints. `None` when the caller - /// wants no external flush trigger. - flushing_service: Option>, + /// Optional router extension that, when attached via + /// [`TraceAgent::with_router_extension`], adds routes to the listener + /// alongside the trace endpoints. `None` when the caller wants no + /// extra routes. + router_extension: Option>, } #[derive(Clone, Copy)] @@ -180,18 +189,19 @@ impl TraceAgent { shutdown_token: CancellationToken::new(), stats_concentrator, span_deduper, - flushing_service: None, + router_extension: None, } } - /// Attaches a [`FlushingService`] that will back a `POST /flush` route - /// registered on the same listener as the trace endpoints. Use this for - /// callers that need a deterministic, on-demand drain hook (for example, - /// a test harness that flushes between requests). Without this call, - /// the agent never registers a `POST /flush` route. + /// Attaches a [`RouterExtension`] that will be applied to the + /// fully-assembled production router before the outer fallback and + /// body-limit layers. Use this to add optional routes (for example, a + /// deterministic on-demand drain endpoint) without wiring a dedicated + /// field per route into [`TraceAgent`]. Without this call, the agent + /// exposes only its production routes. #[must_use] - pub fn with_flushing_service(mut self, flushing_service: Arc) -> Self { - self.flushing_service = Some(flushing_service); + pub fn with_router_extension(mut self, extension: Arc) -> Self { + self.router_extension = Some(extension); self } @@ -309,13 +319,8 @@ impl TraceAgent { .merge(proxy_router) .merge(info_router); - // POST /flush is only registered when a FlushingService has been - // attached via TraceAgent::with_flushing_service. - if let Some(flushing_service) = &self.flushing_service { - let flush_router = Router::new() - .route(FLUSH_ENDPOINT_PATH, post(Self::flush)) - .with_state(Arc::clone(flushing_service)); - router = router.merge(flush_router); + if let Some(extension) = &self.router_extension { + router = extension.extend(router); } router @@ -324,23 +329,6 @@ impl TraceAgent { .layer(DefaultBodyLimit::disable()) } - async fn flush(State(flushing_service): State>) -> StatusCode { - // Isolate panics and bound execution time: spawn so a panic in - // flush_blocking_final surfaces as 500, timeout so a stuck flush - // returns 504 instead of hanging the test harness. - let mut task = tokio::task::spawn(async move { - flushing_service.flush_blocking_final().await; - }); - match tokio::time::timeout(std::time::Duration::from_secs(30), &mut task).await { - Ok(Ok(())) => StatusCode::NO_CONTENT, - Ok(Err(_)) => StatusCode::INTERNAL_SERVER_ERROR, - Err(_) => { - task.abort(); - StatusCode::GATEWAY_TIMEOUT - } - } - } - async fn graceful_shutdown(shutdown_token: CancellationToken) { shutdown_token.cancelled().await; debug!("TRACE_AGENT | Shutdown signal received, shutting down"); @@ -815,3 +803,61 @@ fn success_response(message: &str) -> Response { debug!("{}", message); (StatusCode::OK, json!({"rate_by_service": {}}).to_string()).into_response() } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use axum::body::Body; + use axum::http::Request; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tower::ServiceExt; + + /// Test extension that records how many times its route was hit, to + /// prove the [`RouterExtension`] seam end-to-end: trait shape compiles, + /// state is carried through an [`Arc`], and merged routes are reachable + /// via the composed [`Router`]. + struct SpyExtension { + hits: Arc, + } + + impl RouterExtension for SpyExtension { + fn extend(&self, router: Router) -> Router { + router.merge( + Router::new() + .route( + "/spy", + post(|State(hits): State>| async move { + hits.fetch_add(1, Ordering::SeqCst); + StatusCode::NO_CONTENT + }), + ) + .with_state(Arc::clone(&self.hits)), + ) + } + } + + #[tokio::test] + async fn router_extension_adds_reachable_route_with_state() { + let hits = Arc::new(AtomicUsize::new(0)); + let extension: Arc = Arc::new(SpyExtension { + hits: Arc::clone(&hits), + }); + + let router = extension.extend(Router::new()); + + let response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/spy") + .body(Body::empty()) + .expect("build request"), + ) + .await + .expect("route response"); + + assert_eq!(response.status(), StatusCode::NO_CONTENT); + assert_eq!(hits.load(Ordering::SeqCst), 1); + } +} From a9f1c846c8708a08384a19f311b2fb2e02b9ae16 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 15:55:33 -0400 Subject: [PATCH 23/31] test(trace_agent): exercise RouterExtension seam through make_router The previous SpyExtension test called `extension.extend(Router::new())` directly, which only proves axum's Router::merge works. Build an actual TraceAgent via a small test helper, attach the spy via with_router_extension, and hit /spy through the full router returned by make_router. Add a second test confirming that with no extension attached, /spy falls through to the 404 handler. Also trim the "deterministic drain endpoint" example from the with_router_extension and start_trace_agent doc blocks; keep it only in the trait doc to avoid repetition across three locations. --- bottlecap/src/startup.rs | 10 ++-- bottlecap/src/traces/trace_agent.rs | 92 +++++++++++++++++++++++------ 2 files changed, 79 insertions(+), 23 deletions(-) diff --git a/bottlecap/src/startup.rs b/bottlecap/src/startup.rs index 4080c9bd7..4a82756ed 100644 --- a/bottlecap/src/startup.rs +++ b/bottlecap/src/startup.rs @@ -158,12 +158,10 @@ pub fn build_trace_agent( /// runtime. Convenience entry point for callers that do not need to /// further configure the `TraceAgent` before spawning it. /// -/// Callers that need to attach a -/// [`trace_agent::RouterExtension`](trace_agent::RouterExtension) (or -/// otherwise customize the `TraceAgent`) should use [`build_trace_agent`] -/// directly and spawn the returned `TraceAgent` themselves after applying -/// the extra configuration, e.g. via -/// [`trace_agent::TraceAgent::with_router_extension`]. +/// Callers that need to customize the `TraceAgent` (for example via +/// [`trace_agent::TraceAgent::with_router_extension`]) should use +/// [`build_trace_agent`] directly and spawn the returned `TraceAgent` +/// themselves after applying the extra configuration. pub fn start_trace_agent( config: &Arc, api_key_factory: &Arc, diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index e2d7a5a6c..5fa995f73 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -132,10 +132,8 @@ pub struct TraceAgent { tx: Sender, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, - /// Optional router extension that, when attached via - /// [`TraceAgent::with_router_extension`], adds routes to the listener - /// alongside the trace endpoints. `None` when the caller wants no - /// extra routes. + /// `None` when the caller wants no extra routes. See + /// [`TraceAgent::with_router_extension`]. router_extension: Option>, } @@ -195,10 +193,8 @@ impl TraceAgent { /// Attaches a [`RouterExtension`] that will be applied to the /// fully-assembled production router before the outer fallback and - /// body-limit layers. Use this to add optional routes (for example, a - /// deterministic on-demand drain endpoint) without wiring a dedicated - /// field per route into [`TraceAgent`]. Without this call, the agent - /// exposes only its production routes. + /// body-limit layers. Without this call, the agent exposes only its + /// production routes. #[must_use] pub fn with_router_extension(mut self, extension: Arc) -> Self { self.router_extension = Some(extension); @@ -808,15 +804,21 @@ fn success_response(message: &str) -> Response { #[allow(clippy::unwrap_used)] mod tests { use super::*; + use crate::{ + LAMBDA_RUNTIME_SLUG, config, + traces::{ + span_dedup_service::DedupService, stats_concentrator_service::StatsConcentratorService, + trace_aggregator_service::AggregatorService, + }, + }; use axum::body::Body; use axum::http::Request; + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use tower::ServiceExt; - /// Test extension that records how many times its route was hit, to - /// prove the [`RouterExtension`] seam end-to-end: trait shape compiles, - /// state is carried through an [`Arc`], and merged routes are reachable - /// via the composed [`Router`]. + /// Test extension that records how many times its route was hit. struct SpyExtension { hits: Arc, } @@ -837,14 +839,50 @@ mod tests { } } + fn build_test_agent() -> TraceAgent { + let config = Arc::new(config::Config::default()); + let (concentrator_svc, concentrator) = StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(concentrator_svc.run()); + let (aggregator_svc, aggregator_handle) = AggregatorService::default(); + tokio::spawn(aggregator_svc.run()); + let (dedup_svc, dedup_handle) = DedupService::new(); + tokio::spawn(dedup_svc.run()); + let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new(ObfuscationConfig::new().expect("ObfuscationConfig")), + }); + let stats_aggregator = Arc::new(Mutex::new( + stats_aggregator::StatsAggregator::new_with_concentrator(concentrator.clone()), + )); + let proxy_aggregator = Arc::new(Mutex::new(proxy_aggregator::Aggregator::default())); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + TraceAgent::new( + config, + aggregator_handle, + trace_processor, + stats_aggregator, + Arc::new(stats_processor::ServerlessStatsProcessor {}), + proxy_aggregator, + InvocationProcessorHandle::noop(), + None, + tags_provider, + concentrator, + dedup_handle, + ) + } + #[tokio::test] - async fn router_extension_adds_reachable_route_with_state() { + async fn with_router_extension_adds_reachable_route_to_make_router() { let hits = Arc::new(AtomicUsize::new(0)); - let extension: Arc = Arc::new(SpyExtension { + let agent = build_test_agent().with_router_extension(Arc::new(SpyExtension { hits: Arc::clone(&hits), - }); - - let router = extension.extend(Router::new()); + })); + let (stats_tx, _stats_rx) = mpsc::channel::(1); + let router = agent.make_router(stats_tx); let response = router .oneshot( @@ -860,4 +898,24 @@ mod tests { assert_eq!(response.status(), StatusCode::NO_CONTENT); assert_eq!(hits.load(Ordering::SeqCst), 1); } + + #[tokio::test] + async fn make_router_returns_404_for_extension_route_when_none_attached() { + let agent = build_test_agent(); + let (stats_tx, _stats_rx) = mpsc::channel::(1); + let router = agent.make_router(stats_tx); + + let response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/spy") + .body(Body::empty()) + .expect("build request"), + ) + .await + .expect("route response"); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } } From 05c8d550e2f4619eb980477dd4bd53eab9295a34 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 17:54:59 -0400 Subject: [PATCH 24/31] docs(noop): fix inaccurate consequence described in exhaustive-match comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dropping a oneshot sender causes the receiver to return ProcessorError::ChannelReceive, not a hang. Update the comment to describe the actual observable behavior. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/lifecycle/invocation/processor_service.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 4b48333e0..8a5d7bd2b 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -142,8 +142,10 @@ impl InvocationProcessorHandle { /// /// The match below is exhaustive: adding a new `ProcessorCommand` variant /// will fail to compile here, so the noop behavior for it must be - /// decided explicitly rather than silently dropping a response and - /// hanging the caller on its oneshot. + /// decided explicitly. A response-carrying variant placed in the + /// fire-and-forget arm would silently drop its sender, causing the + /// caller to receive `ProcessorError::ChannelReceive` instead of the + /// intended default. #[must_use] pub fn noop() -> Self { let (sender, mut receiver) = mpsc::channel::(1000); From 6c9bcbc03ef248ca1200b675a4c11fd3f6ddda24 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 17:56:04 -0400 Subject: [PATCH 25/31] docs(startup): count all four background tasks in build_trace_agent comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous note mentioned only the three tasks spawned directly in build_trace_agent (aggregator, concentrator, dedup). TraceAgent::new unconditionally spawns a fourth task: the trace-payload drain loop. Update the summary line and the leak warning to reflect all four tasks. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/startup.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/bottlecap/src/startup.rs b/bottlecap/src/startup.rs index 4a82756ed..fc8ecba4b 100644 --- a/bottlecap/src/startup.rs +++ b/bottlecap/src/startup.rs @@ -43,16 +43,18 @@ pub struct TraceAgentPipeline { /// Builds the full trace-processing pipeline (trace + stats + proxy /// aggregators, services, flushers) and the [`trace_agent::TraceAgent`] that /// owns the HTTP listener. Spawns the aggregator/concentrator/dedup services -/// onto the current tokio runtime, but does **not** spawn the `TraceAgent` -/// itself. The caller owns `trace_agent` and is responsible for spawning +/// onto the current tokio runtime; `TraceAgent::new` additionally spawns a +/// trace-payload drain task. Does **not** spawn the `TraceAgent` itself. +/// The caller owns `trace_agent` and is responsible for spawning /// `trace_agent.start()`, optionally after further configuring it (for /// example, via [`trace_agent::TraceAgent::with_router_extension`]). /// -/// Note: the aggregator/concentrator/dedup tasks spawned here have no -/// external shutdown signal; they run until their command channels are -/// dropped. Callers that abandon the returned `TraceAgent` without either -/// spawning it or dropping the pipeline handles will leak those background -/// tasks for the lifetime of the process. +/// Note: the four background tasks started during this call (aggregator, +/// concentrator, dedup, and the trace-payload drain task inside +/// `TraceAgent::new`) have no external shutdown signal; they run until +/// their command channels are dropped. Callers that abandon the returned +/// `TraceAgent` without either spawning it or dropping the pipeline handles +/// will leak those background tasks for the lifetime of the process. /// /// Most callers want [`start_trace_agent`] instead, which handles the spawn. pub fn build_trace_agent( From 68477beff21b22e697ac923a17056a33143ffc5e Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 18:00:32 -0400 Subject: [PATCH 26/31] fix(trace_agent): RouterExtension::extend returns Result to surface errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, a panic or error inside extend() would propagate as a panic through make_router() and start(), then be silently dropped by the tokio::spawn wrapper in start_trace_agent (which only logged Err returns, not panics). Changing the return type to Result> means implementors must return errors explicitly. Those errors propagate through make_router via ?, then through start() to the spawn wrapper, where they are already logged. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/trace_agent.rs | 45 +++++++++++++++++------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 5fa995f73..300b49a3b 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -106,17 +106,20 @@ pub struct ProxyState { } /// Extension seam for the [`TraceAgent`] HTTP router. Implementors receive -/// the fully-assembled production router and return a router with any -/// additional routes merged in. Used to attach optional routes (for example, -/// a deterministic drain endpoint) without adding a dedicated field per +/// the fully-assembled production router and return it with any additional +/// routes merged in. Used to attach optional routes (for example, a +/// deterministic drain endpoint) without adding a dedicated field per /// route to [`TraceAgent`]. /// -/// Implementors must apply `.with_state(...)` to their sub-router before -/// returning, so the type parameter of the returned [`Router`] is the same -/// as the one passed in (axum's `Router::merge` requires matching state -/// types). +/// Returning `Err` aborts agent startup: the error propagates through +/// [`TraceAgent::start`]. Implementors that carry state must call +/// `.with_state(...)` on their sub-router before merging, because +/// `Router::merge` requires both routers to share the same state type. pub trait RouterExtension: Send + Sync { - fn extend(&self, router: Router) -> Router; + fn extend( + &self, + router: Router, + ) -> Result>; } pub struct TraceAgent { @@ -220,7 +223,7 @@ impl TraceAgent { } }); - let router = self.make_router(stats_tx); + let router = self.make_router(stats_tx)?; let port = u16::try_from(TRACE_AGENT_PORT).expect("TRACE_AGENT_PORT is too large"); let socket = SocketAddr::from(([127, 0, 0, 1], port)); @@ -240,7 +243,10 @@ impl TraceAgent { Ok(()) } - fn make_router(&self, stats_tx: Sender) -> Router { + fn make_router( + &self, + stats_tx: Sender, + ) -> Result> { let stats_generator = Arc::new(StatsGenerator::new(self.stats_concentrator.clone())); let trace_state = TraceState { config: Arc::clone(&self.config), @@ -316,13 +322,13 @@ impl TraceAgent { .merge(info_router); if let Some(extension) = &self.router_extension { - router = extension.extend(router); + router = extension.extend(router)?; } - router + Ok(router .fallback(handler_not_found) // Disable the default body limit so we can use our own limit - .layer(DefaultBodyLimit::disable()) + .layer(DefaultBodyLimit::disable())) } async fn graceful_shutdown(shutdown_token: CancellationToken) { @@ -824,8 +830,11 @@ mod tests { } impl RouterExtension for SpyExtension { - fn extend(&self, router: Router) -> Router { - router.merge( + fn extend( + &self, + router: Router, + ) -> Result> { + Ok(router.merge( Router::new() .route( "/spy", @@ -835,7 +844,7 @@ mod tests { }), ) .with_state(Arc::clone(&self.hits)), - ) + )) } } @@ -882,7 +891,7 @@ mod tests { hits: Arc::clone(&hits), })); let (stats_tx, _stats_rx) = mpsc::channel::(1); - let router = agent.make_router(stats_tx); + let router = agent.make_router(stats_tx).expect("make_router"); let response = router .oneshot( @@ -903,7 +912,7 @@ mod tests { async fn make_router_returns_404_for_extension_route_when_none_attached() { let agent = build_test_agent(); let (stats_tx, _stats_rx) = mpsc::channel::(1); - let router = agent.make_router(stats_tx); + let router = agent.make_router(stats_tx).expect("make_router"); let response = router .oneshot( From 3727b31e3be520b07d2320398a85d1c51c237054 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 18:04:21 -0400 Subject: [PATCH 27/31] test(noop): guard all five request-response variants with an explicit timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The exhaustive match in noop() prevents unhandled variants at compile time, but it cannot prevent a future contributor from placing a response-carrying variant in the fire-and-forget arm. In that case, rx.await returns ProcessorError::ChannelReceive, and the test would hang rather than fail cleanly. Add a test that wraps every request-response call in a 500ms timeout so any such regression produces a clear failure instead of a hung test suite. 🤖 Co-Authored-By: Claude Code --- .../lifecycle/invocation/processor_service.rs | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 8a5d7bd2b..aab7bec04 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -826,4 +826,106 @@ mod tests { .await .expect("noop on_platform_report"); } + + /// Guards against a future `ProcessorCommand` variant with a `response` + /// field being accidentally placed in the fire-and-forget arm: that would + /// silently drop the sender, causing `rx.await` to return + /// `ProcessorError::ChannelReceive`. With an explicit timeout, any such + /// regression fails fast instead of hanging the test suite. + #[tokio::test] + async fn noop_request_response_variants_complete_within_timeout() { + use crate::{ + LAMBDA_RUNTIME_SLUG, config, + extension::telemetry::events::OnDemandReportMetrics, + traces::{ + stats_concentrator_service::StatsConcentratorService, + stats_generator::StatsGenerator, trace_processor::ServerlessTraceProcessor, + }, + }; + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + use std::collections::HashMap; + use tokio::time::{Duration, timeout}; + + let timeout_dur = Duration::from_millis(500); + + let config = Arc::new(config::Config::default()); + let (svc, concentrator) = StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(svc.run()); + let trace_sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(ServerlessTraceProcessor { + obfuscation_config: Arc::new(ObfuscationConfig::new().expect("ObfuscationConfig")), + }), + trace_tx: tokio::sync::mpsc::channel(1).0, + stats_generator: Arc::new(StatsGenerator::new(concentrator)), + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let handle = InvocationProcessorHandle::noop(); + + timeout(timeout_dur, handle.get_reparenting_info()) + .await + .expect("get_reparenting_info timed out") + .expect("get_reparenting_info"); + + timeout( + timeout_dur, + handle.update_reparenting(std::collections::VecDeque::new()), + ) + .await + .expect("update_reparenting timed out") + .expect("update_reparenting"); + + timeout(timeout_dur, handle.set_cold_start_span_trace_id(42)) + .await + .expect("set_cold_start_span_trace_id timed out") + .expect("set_cold_start_span_trace_id"); + + timeout( + timeout_dur, + handle.on_platform_runtime_done( + "rid".to_string(), + RuntimeDoneMetrics { + duration_ms: 0.0, + produced_bytes: None, + }, + Status::Success, + None, + Arc::clone(&tags_provider), + Arc::clone(&trace_sender), + 0, + ), + ) + .await + .expect("on_platform_runtime_done timed out") + .expect("on_platform_runtime_done"); + + timeout( + timeout_dur, + handle.on_platform_report( + "rid", + ReportMetrics::OnDemand(OnDemandReportMetrics { + duration_ms: 0.0, + billed_duration_ms: 0, + memory_size_mb: 0, + max_memory_used_mb: 0, + init_duration_ms: None, + restore_duration_ms: None, + }), + 0, + Status::Success, + &None, + &None, + tags_provider, + trace_sender, + ), + ) + .await + .expect("on_platform_report timed out") + .expect("on_platform_report"); + } } From 70f790639c051dd85f7f5fa618d2259a67e5ea35 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Fri, 24 Apr 2026 18:37:28 -0400 Subject: [PATCH 28/31] style(trace_agent): apply rustfmt to RouterExtension::extend signatures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Co-Authored-By: Claude Code --- bottlecap/src/traces/trace_agent.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 300b49a3b..c862e159e 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -116,10 +116,7 @@ pub struct ProxyState { /// `.with_state(...)` on their sub-router before merging, because /// `Router::merge` requires both routers to share the same state type. pub trait RouterExtension: Send + Sync { - fn extend( - &self, - router: Router, - ) -> Result>; + fn extend(&self, router: Router) -> Result>; } pub struct TraceAgent { @@ -830,10 +827,7 @@ mod tests { } impl RouterExtension for SpyExtension { - fn extend( - &self, - router: Router, - ) -> Result> { + fn extend(&self, router: Router) -> Result> { Ok(router.merge( Router::new() .route( From 240674e607099af9fbc1313e628a5082792da82d Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Tue, 28 Apr 2026 18:36:49 -0400 Subject: [PATCH 29/31] docs(trace_agent,startup): clarify RouterExtension err propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The RouterExtension::extend doc claimed errors abort agent startup, but the production caller start_trace_agent only logs and discards the error from TraceAgent::start; the surrounding pipeline keeps running with a dead trace channel. Soften the trait doc to describe this honestly, extend the start_trace_agent doc to point callers needing reactive error handling at build_trace_agent, and add a FailingExtension test that locks in the Err propagation contract through make_router. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/startup.rs | 10 ++++++++ bottlecap/src/traces/trace_agent.rs | 40 ++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/startup.rs b/bottlecap/src/startup.rs index fc8ecba4b..88365e941 100644 --- a/bottlecap/src/startup.rs +++ b/bottlecap/src/startup.rs @@ -160,6 +160,13 @@ pub fn build_trace_agent( /// runtime. Convenience entry point for callers that do not need to /// further configure the `TraceAgent` before spawning it. /// +/// Errors from `TraceAgent::start` (TCP bind failures, router-extension +/// failures, axum serve errors) are logged and discarded; this preserves +/// the pre-extraction behavior from `main.rs` and means the surrounding +/// pipeline keeps running with a dead trace channel. Callers that need to +/// react to startup errors should use [`build_trace_agent`] and spawn the +/// agent themselves. +/// /// Callers that need to customize the `TraceAgent` (for example via /// [`trace_agent::TraceAgent::with_router_extension`]) should use /// [`build_trace_agent`] directly and spawn the returned `TraceAgent` @@ -181,6 +188,9 @@ pub fn start_trace_agent( client, ); + // Log-only error handling preserved from the pre-extraction code in + // main.rs. See the doc comment above for callers that need reactive + // error handling. tokio::spawn(async move { if let Err(e) = trace_agent.start().await { error!("Error starting trace agent: {e:?}"); diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index c862e159e..c2205b027 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -111,10 +111,16 @@ pub struct ProxyState { /// deterministic drain endpoint) without adding a dedicated field per /// route to [`TraceAgent`]. /// -/// Returning `Err` aborts agent startup: the error propagates through -/// [`TraceAgent::start`]. Implementors that carry state must call -/// `.with_state(...)` on their sub-router before merging, because -/// `Router::merge` requires both routers to share the same state type. +/// Returning `Err` propagates out of [`TraceAgent::start`], aborting the +/// HTTP listener task. Note that the production convenience entry point +/// [`crate::startup::start_trace_agent`] spawns `start` and only logs its +/// error; the surrounding pipeline does not observe the failure. Callers +/// that need to react to startup errors must use +/// [`crate::startup::build_trace_agent`] and spawn the agent themselves. +/// +/// Implementors that carry state must call `.with_state(...)` on their +/// sub-router before merging, because `Router::merge` requires both +/// routers to share the same state type. pub trait RouterExtension: Send + Sync { fn extend(&self, router: Router) -> Result>; } @@ -842,6 +848,16 @@ mod tests { } } + /// Test extension that always fails, used to assert that `make_router` + /// surfaces extension errors instead of swallowing them. + struct FailingExtension; + + impl RouterExtension for FailingExtension { + fn extend(&self, _router: Router) -> Result> { + Err("extension failed during make_router".into()) + } + } + fn build_test_agent() -> TraceAgent { let config = Arc::new(config::Config::default()); let (concentrator_svc, concentrator) = StatsConcentratorService::new(Arc::clone(&config)); @@ -902,6 +918,22 @@ mod tests { assert_eq!(hits.load(Ordering::SeqCst), 1); } + #[tokio::test] + async fn make_router_propagates_extension_error() { + let agent = build_test_agent().with_router_extension(Arc::new(FailingExtension)); + let (stats_tx, _stats_rx) = mpsc::channel::(1); + + let err = agent + .make_router(stats_tx) + .expect_err("make_router should surface extension error"); + + assert!( + err.to_string() + .contains("extension failed during make_router"), + "unexpected error: {err}" + ); + } + #[tokio::test] async fn make_router_returns_404_for_extension_route_when_none_attached() { let agent = build_test_agent(); From 53b246cd2342400badb883414178e6dabcd8158c Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Tue, 28 Apr 2026 18:37:05 -0400 Subject: [PATCH 30/31] feat(bottlecap): gate InvocationProcessorHandle::noop() behind test-mode feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The noop constructor exists for tests and the upcoming testmode binary but was a public function on a struct used pervasively in production code, so nothing prevented a future caller from reaching for it from main. Add a test-mode cargo feature (matching the existing fips pattern) and gate noop with cfg(any(test, feature = "test-mode")) so it does not exist in default or fips builds. 🤖 Co-Authored-By: Claude Code --- bottlecap/Cargo.toml | 5 +++++ bottlecap/src/lifecycle/invocation/processor_service.rs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index b77902083..68a9af7bc 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -132,6 +132,11 @@ fips = [ "rustls/fips", "rustls-native-certs", ] +# Exposes test-only constructors (for example, +# `InvocationProcessorHandle::noop()`) to the upcoming testmode binary. +# Not enabled in `default` or `fips`, so the items it gates do not appear +# in production builds. +test-mode = [] [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)'] } diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index aab7bec04..5aa116487 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -146,6 +146,10 @@ impl InvocationProcessorHandle { /// fire-and-forget arm would silently drop its sender, causing the /// caller to receive `ProcessorError::ChannelReceive` instead of the /// intended default. + /// + /// Compiled only under unit tests and when the `test-mode` feature is + /// enabled, so this constructor does not exist in production builds. + #[cfg(any(test, feature = "test-mode"))] #[must_use] pub fn noop() -> Self { let (sender, mut receiver) = mpsc::channel::(1000); From 149d9d404ad58734bdfa7c8645a969b403f04891 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Tue, 28 Apr 2026 18:47:24 -0400 Subject: [PATCH 31/31] docs(processor_service): document tokio runtime requirement on noop() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The constructor calls tokio::spawn to start its draining task, which panics if there is no Tokio runtime registered for the current thread. Make that requirement explicit in the doc so future callers do not hit a surprise panic. 🤖 Co-Authored-By: Claude Code --- bottlecap/src/lifecycle/invocation/processor_service.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 5aa116487..1f48f1960 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -149,6 +149,10 @@ impl InvocationProcessorHandle { /// /// Compiled only under unit tests and when the `test-mode` feature is /// enabled, so this constructor does not exist in production builds. + /// + /// Must be called from within an active Tokio runtime: the constructor + /// uses `tokio::spawn` to start the draining task, which panics if no + /// runtime is registered for the current thread. #[cfg(any(test, feature = "test-mode"))] #[must_use] pub fn noop() -> Self {