From 64e44dc01042ef55651baf54b799d5c17256edfc Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Tue, 14 Apr 2026 17:21:07 -0400 Subject: [PATCH] LOG-9314: Share a single state machine amoung instances of log stream groups --- .../detect_exceptions/exception_detector.rs | 11 ++++++----- src/transforms/detect_exceptions/mod.rs | 15 ++++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/transforms/detect_exceptions/exception_detector.rs b/src/transforms/detect_exceptions/exception_detector.rs index 2b3b9c54d035f..c991710c95576 100644 --- a/src/transforms/detect_exceptions/exception_detector.rs +++ b/src/transforms/detect_exceptions/exception_detector.rs @@ -6,6 +6,7 @@ use crate::{ use chrono::{DateTime, Utc}; use regex::Regex; use std::collections::HashMap; +use std::sync::Arc; use vector_lib::lookup::path::OwnedTargetPath; #[derive(Debug, Clone)] @@ -13,7 +14,7 @@ pub struct RuleTarget { regex: Regex, to_state: ExceptionState, } -type StateMachine = HashMap>; +pub type StateMachine = HashMap>; use rules::*; @@ -63,7 +64,7 @@ pub struct TraceAccumulator { impl TraceAccumulator { pub fn new( - languages: Vec, + state_machine: Arc, multiline_flush_interval: Duration, max_bytes: usize, max_lines: usize, @@ -79,7 +80,7 @@ impl TraceAccumulator { buffer_start_time: Utc::now(), accumulated_messages: vec![], detector: ExceptionDetector { - state_machine: get_state_machines(languages), + state_machine, current_state: ExceptionState::StartState, }, } @@ -195,7 +196,7 @@ impl TraceAccumulator { pub struct ExceptionDetectorConfig {} pub struct ExceptionDetector { - pub state_machine: StateMachine, + pub state_machine: Arc, pub current_state: ExceptionState, } @@ -259,7 +260,7 @@ mod exception_detector_tests { fn check_exception(line: &str, detects_end: bool) { let lines = split(line); let mut detector = ExceptionDetector { - state_machine: get_state_machines(default_programming_languages()), + state_machine: Arc::new(get_state_machines(default_programming_languages())), current_state: ExceptionState::StartState, }; let after_exc = if detects_end { EndTrace } else { InsideTrace }; diff --git a/src/transforms/detect_exceptions/mod.rs b/src/transforms/detect_exceptions/mod.rs index 551be71630f4f..cec7b2d02a644 100644 --- a/src/transforms/detect_exceptions/mod.rs +++ b/src/transforms/detect_exceptions/mod.rs @@ -15,8 +15,8 @@ use crate::{ transforms::{TaskTransform, Transform}, }; use async_stream::stream; -use futures::{Stream, StreamExt, stream}; -use std::{collections::HashMap, pin::Pin, time::Duration}; +use futures::{stream, Stream, StreamExt}; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use vector_lib::{ config::{clone_input_definitions, log_schema}, configurable::configurable_component, @@ -190,7 +190,7 @@ impl TransformConfig for DetectExceptionsConfig { pub struct DetectExceptions { accumulators: HashMap, - languages: Vec, + state_machine: Arc, expire_after: Duration, flush_period: Duration, multiline_flush_interval: Duration, @@ -208,9 +208,14 @@ impl DetectExceptions { let owned_target_path: OwnedTargetPath = parse_target_path(config.message_key.as_str())?; + // Create the state machine once and share it across all accumulators + let state_machine = Arc::new(exception_detector::get_state_machines( + config.languages.clone(), + )); + Ok(DetectExceptions { accumulators: HashMap::new(), - languages: config.languages.clone(), + state_machine, group_by: config.group_by.clone(), expire_after: config.expire_after_ms, multiline_flush_interval: config.multiline_flush_interval_ms, @@ -229,7 +234,7 @@ impl DetectExceptions { self.accumulators.insert( discriminant.clone(), TraceAccumulator::new( - self.languages.clone(), + Arc::clone(&self.state_machine), self.multiline_flush_interval, self.max_bytes, self.max_lines,