diff --git a/Cargo.lock b/Cargo.lock index 9a0139f110..adc5363702 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5838,6 +5838,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring 0.17.5", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "k8s-e2e-tests" version = "0.1.0" @@ -10604,6 +10619,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" +[[package]] +name = "simple_asn1" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.17", + "time", +] + [[package]] name = "siphasher" version = "0.3.11" @@ -12586,6 +12613,7 @@ dependencies = [ "inventory", "ipnet", "itertools 0.14.0", + "jsonwebtoken", "k8s-openapi 0.22.0", "kube", "lapin", diff --git a/Cargo.toml b/Cargo.toml index 2168524260..afe082c727 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,6 +163,7 @@ hyper-openssl = { version = "0.9.2", default-features = false } hyper-proxy = { version = "0.9.1", default-features = false, features = ["openssl-tls"] } indexmap = { version = "2.7.0", default-features = false, features = ["serde", "std"] } itertools = { version = "0.14.0", default-features = false, optional = false, features = ["use_alloc"] } +jsonwebtoken = { version = "9", default-features = false, features = ["use_pem"] } metrics = "0.24.1" metrics-tracing-context = { version = "0.17.0", default-features = false } metrics-util = { version = "0.18.0", default-features = false, features = ["registry"] } @@ -324,6 +325,7 @@ prost-types = { workspace = true, optional = true } # GCP goauth = { version = "0.14.0", optional = true } smpl_jwt = { version = "0.8.0", default-features = false, optional = true } +jsonwebtoken = { workspace = true, optional = true } object_store = { workspace = true, optional = true } # AMQP @@ -707,7 +709,8 @@ sources-utils-net-unix = [] sources-websocket = ["dep:tokio-tungstenite"] sources-wef = ["dep:wef"] -sources-vector = ["dep:prost", "dep:tonic", "protobuf-build"] +sources-vector = ["dep:prost", "dep:tonic", "protobuf-build", "sources-utils-jwt-auth"] +sources-utils-jwt-auth = ["dep:jsonwebtoken"] # Transforms transforms = ["transforms-logs", "transforms-metrics"] @@ -872,8 +875,9 @@ sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] sinks-socket = ["sinks-utils-udp"] sinks-splunk_hec = [] sinks-statsd = ["sinks-utils-udp", "tokio-util/net"] +sinks-utils-jwt-auth = ["dep:tonic"] sinks-utils-udp = [] -sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build", "dep:prost"] +sinks-vector = ["sinks-utils-udp", "sinks-utils-jwt-auth", "protobuf-build", "dep:prost"] sinks-websocket = ["dep:tokio-tungstenite"] sinks-webhdfs = ["dep:opendal"] diff --git a/src/sinks/util/jwt_auth.rs b/src/sinks/util/jwt_auth.rs new file mode 100644 index 0000000000..0b95799643 --- /dev/null +++ b/src/sinks/util/jwt_auth.rs @@ -0,0 +1,322 @@ +use vector_lib::configurable::configurable_component; + +/// Source of a JWT bearer token sent with outgoing requests. +/// +/// Exactly one variant must be configured. +/// +/// ## Examples +/// +/// Inline value (use Vector's `${VAR}` interpolation for env vars): +/// ```toml +/// jwt_token.type = "inline" +/// jwt_token.value = "${MY_JWT_TOKEN}" +/// ``` +/// +/// File path (re-read on every request for Kubernetes secret rotation): +/// ```toml +/// jwt_token.type = "file" +/// jwt_token.path = "/var/run/secrets/vector/token" +/// ``` +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum JwtTokenConfig { + /// Inline token value. + /// + /// Supports Vector's `${ENV_VAR}` interpolation. The value is resolved once at + /// config load time. + Inline { + /// JWT bearer token value. + value: String, + }, + + /// Path to a file containing the JWT bearer token. + /// + /// The file is re-read on **every request** so that a rotated Kubernetes secret + /// volume mount is picked up automatically without restarting the agent. + File { + /// Path to the token file. + path: String, + }, +} + +/// Pre-parsed auth state built once at sink construction time. +/// +/// `site_id` and, for inline tokens, the `Authorization` header value are parsed +/// into `MetadataValue` up front so the hot request path pays no allocation or +/// parse cost for those fields. +#[cfg(feature = "sinks-utils-jwt-auth")] +#[derive(Debug)] +pub struct JwtAuthState { + site_id: tonic::metadata::MetadataValue, + token: JwtAuthToken, +} + +#[cfg(feature = "sinks-utils-jwt-auth")] +#[derive(Debug)] +pub enum JwtAuthToken { + /// Fully-formatted `"Bearer "` ready to insert into gRPC metadata. + Static(tonic::metadata::MetadataValue), + /// Path to a file re-read on every request (K8s secret rotation). + File(String), +} + +#[cfg(feature = "sinks-utils-jwt-auth")] +impl JwtAuthState { + /// Builds a `JwtAuthState` from a raw `site_id` string and a [`JwtTokenConfig`]. + /// + /// Both the `site_id` and any inline token value are parsed into + /// [`tonic::metadata::MetadataValue`] here — once at construction — so the hot + /// request path has no parse or allocation work to do. + /// + /// Returns a `&'static str` error message on failure; the caller is responsible + /// for converting it to the appropriate error type. + pub fn from_config( + site_id: &str, + token: JwtTokenConfig, + ) -> Result { + let site_id = site_id + .parse() + .map_err(|_| "site_id contains characters invalid for gRPC metadata")?; + + let token = match token { + JwtTokenConfig::Inline { value } => { + let header = format!("Bearer {value}") + .parse() + .map_err(|_| "JWT token (inline) contains characters invalid for gRPC metadata")?; + JwtAuthToken::Static(header) + } + JwtTokenConfig::File { path } => JwtAuthToken::File(path), + }; + + Ok(Self { site_id, token }) + } + + /// Returns the pre-parsed `x-site-id` metadata value. + pub fn site_id(&self) -> &tonic::metadata::MetadataValue { + &self.site_id + } + + /// Returns the current bearer token as a gRPC `MetadataValue`. + /// + /// For [`JwtAuthToken::Static`] this is a cheap clone of the pre-built value. + /// For [`JwtAuthToken::File`] the token file is re-read on every call so that + /// a rotated Kubernetes secret is picked up without restarting the agent. + /// + /// Returns an error message string on failure; the caller is responsible for + /// converting it to the appropriate error type. + pub fn bearer_token( + &self, + ) -> Result, String> { + match &self.token { + JwtAuthToken::Static(value) => Ok(value.clone()), + JwtAuthToken::File(path) => { + let raw = std::fs::read_to_string(path) + .map_err(|err| format!("failed to read token file '{}': {err}", path))?; + let token = raw.trim(); + format!("Bearer {token}") + .parse::>() + .map_err(|err| format!("token file contains invalid characters: {err}")) + } + } + } +} + +#[cfg(all(test, feature = "sinks-utils-jwt-auth"))] +mod tests { + use std::io::Write; + + use super::*; + + // -- from_config -- + + #[test] + fn from_config_inline_builds_correctly() { + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::Inline { + value: "my-jwt-token".into(), + }, + ) + .expect("valid inline config should build"); + + assert_eq!(state.site_id().to_str().unwrap(), "site-abc"); + assert_eq!( + state.bearer_token().unwrap().to_str().unwrap(), + "Bearer my-jwt-token" + ); + } + + #[test] + fn from_config_file_stores_path() { + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::File { + path: "/some/path/token".into(), + }, + ) + .expect("valid file config should build"); + + // Construction succeeds without reading the file. + assert_eq!(state.site_id().to_str().unwrap(), "site-abc"); + assert!(matches!(state.token, JwtAuthToken::File(_))); + } + + #[test] + fn from_config_invalid_site_id_returns_error() { + let result = JwtAuthState::from_config( + "site\0bad", + JwtTokenConfig::Inline { + value: "token".into(), + }, + ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .contains("site_id contains characters invalid")); + } + + #[test] + fn from_config_inline_invalid_token_returns_error() { + let result = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::Inline { + value: "bad\0token".into(), + }, + ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .contains("JWT token (inline) contains characters invalid")); + } + + // -- bearer_token -- + + #[test] + fn bearer_token_inline_is_cheap_clone() { + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::Inline { + value: "tok".into(), + }, + ) + .unwrap(); + + // Both calls return the same value without I/O. + assert_eq!( + state.bearer_token().unwrap().to_str().unwrap(), + state.bearer_token().unwrap().to_str().unwrap() + ); + } + + #[test] + fn bearer_token_file_reads_content() { + let mut f = tempfile::NamedTempFile::new().unwrap(); + write!(f, "my-rotating-token").unwrap(); + + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::File { + path: f.path().to_str().unwrap().into(), + }, + ) + .unwrap(); + + assert_eq!( + state.bearer_token().unwrap().to_str().unwrap(), + "Bearer my-rotating-token" + ); + } + + #[test] + fn bearer_token_file_trims_trailing_whitespace() { + let mut f = tempfile::NamedTempFile::new().unwrap(); + write!(f, "token-with-newline\n").unwrap(); + + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::File { + path: f.path().to_str().unwrap().into(), + }, + ) + .unwrap(); + + assert_eq!( + state.bearer_token().unwrap().to_str().unwrap(), + "Bearer token-with-newline" + ); + } + + #[test] + fn bearer_token_file_rotation_reflected_on_next_call() { + let mut f = tempfile::NamedTempFile::new().unwrap(); + write!(f, "original-token").unwrap(); + + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::File { + path: f.path().to_str().unwrap().into(), + }, + ) + .unwrap(); + + assert_eq!( + state.bearer_token().unwrap().to_str().unwrap(), + "Bearer original-token" + ); + + // Simulate Kubernetes secret rotation. + f.reopen().unwrap(); + std::fs::write(f.path(), "rotated-token").unwrap(); + + assert_eq!( + state.bearer_token().unwrap().to_str().unwrap(), + "Bearer rotated-token" + ); + } + + #[test] + fn bearer_token_file_not_found_returns_error_with_path() { + let path = "/nonexistent/path/to/token"; + let state = JwtAuthState::from_config( + "site-abc", + JwtTokenConfig::File { path: path.into() }, + ) + .unwrap(); + + let err = state.bearer_token().unwrap_err(); + assert!(err.contains(path), "error should contain path: {err}"); + } + + // -- serde round-trips -- + + #[test] + fn jwt_token_config_inline_deserializes() { + let toml = r#"type = "inline" +value = "my-token""#; + let cfg: JwtTokenConfig = toml::from_str(toml).unwrap(); + assert!(matches!(cfg, JwtTokenConfig::Inline { value } if value == "my-token")); + } + + #[test] + fn jwt_token_config_file_deserializes() { + let toml = r#"type = "file" +path = "/var/run/secrets/token""#; + let cfg: JwtTokenConfig = toml::from_str(toml).unwrap(); + assert!( + matches!(cfg, JwtTokenConfig::File { path } if path == "/var/run/secrets/token") + ); + } + + #[test] + fn jwt_token_config_missing_type_fails() { + let toml = r#"value = "token""#; + assert!(toml::from_str::(toml).is_err()); + } + + #[test] + fn jwt_token_config_unknown_type_fails() { + let toml = r#"type = "env""#; + assert!(toml::from_str::(toml).is_err()); + } +} diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index d99af96e8d..c75712251a 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -1,5 +1,6 @@ pub mod adaptive_concurrency; pub mod auth; +pub mod jwt_auth; // https://github.com/mcarton/rust-derivative/issues/112 #[allow(clippy::non_canonical_clone_impl)] pub mod batch; @@ -43,6 +44,9 @@ pub use buffer::{ Buffer, Compression, PartitionBuffer, PartitionInnerBuffer, }; pub use builder::SinkBuilderExt; +pub use jwt_auth::JwtTokenConfig; +#[cfg(feature = "sinks-utils-jwt-auth")] +pub use jwt_auth::{JwtAuthState, JwtAuthToken}; pub use compressor::Compressor; pub use compressor::Decompressor; pub use normalizer::Normalizer; diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index 28d857dae4..35b1038ad4 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -20,14 +20,41 @@ use crate::{ proto::vector as proto, sinks::{ util::{ - retries::RetryLogic, BatchConfig, RealtimeEventBasedDefaultBatchSettings, - ServiceBuilderExt, TowerRequestConfig, + retries::RetryLogic, BatchConfig, JwtTokenConfig, + RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, VectorSink as VectorSinkType, }, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; +/// JWT authentication configuration for the `vector` sink. +/// +/// When present, each outgoing request includes an `authorization: Bearer ` header +/// and an `x-site-id` header. +/// +/// ## Example +/// +/// ```toml +/// [sinks.my_sink.auth] +/// jwt_token.type = "file" +/// jwt_token.path = "/var/run/secrets/vector/token" +/// site_id = "${SITE_ID}" +/// ``` +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct VectorSinkAuthConfig { + /// Source of the JWT bearer token attached to every request. + pub jwt_token: JwtTokenConfig, + + /// Site ID sent in the `x-site-id` metadata header on every request. + /// + /// Supports Vector's `${ENV_VAR}` interpolation, e.g. `site_id = "${SITE_ID}"`. + pub site_id: String, +} + + /// Configuration for the `vector` sink. #[configurable_component(sink("vector", "Relay observability data to a Vector instance."))] #[derive(Clone, Debug)] @@ -77,6 +104,12 @@ pub struct VectorConfig { skip_serializing_if = "crate::serde::is_default" )] pub(in crate::sinks::vector) acknowledgements: AcknowledgementsConfig, + + /// JWT authentication settings. + /// + /// When configured, each request carries a bearer token and site ID in its gRPC metadata. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub auth: Option, } impl VectorConfig { @@ -102,6 +135,7 @@ fn default_config(address: &str) -> VectorConfig { request: TowerRequestConfig::default(), tls: None, acknowledgements: Default::default(), + auth: None, } } @@ -120,9 +154,9 @@ impl SinkConfig for VectorConfig { .clone() .map(|uri| uri.uri) .unwrap_or_else(|| uri.clone()); - let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false); + let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false, None)?; let healthcheck = healthcheck(healthcheck_client, cx.healthcheck); - let service = VectorService::new(client, uri, self.compression); + let service = VectorService::new(client, uri, self.compression, self.auth.clone())?; let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; @@ -240,6 +274,9 @@ impl RetryLogic for VectorGrpcRetryLogic { | Unauthenticated | DataLoss ), + // A missing or unreadable token file is a local configuration error; + // retrying will not fix it and would block the batch indefinitely. + VectorSinkError::JwtTokenUnavailable { .. } => false, _ => true, } } diff --git a/src/sinks/vector/mod.rs b/src/sinks/vector/mod.rs index 1ed61448e2..845169a142 100644 --- a/src/sinks/vector/mod.rs +++ b/src/sinks/vector/mod.rs @@ -28,6 +28,9 @@ pub enum VectorSinkError { #[snafu(display("URL has no host."))] NoHost, + + #[snafu(display("JWT token unavailable: {}", message))] + JwtTokenUnavailable { message: String }, } #[cfg(test)] @@ -226,6 +229,201 @@ mod tests { .collect() } + // -- JWT auth integration tests -- + + fn make_ok_responder() -> impl Fn() -> hyper::Response + Clone + Send + Sync + 'static + { + move || { + hyper::Response::builder() + .header("grpc-status", "0") + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + } + } + + async fn run_auth_sink(config: VectorConfig, in_addr: std::net::SocketAddr) -> http::request::Parts { + let cx = SinkContext::default(); + let (sink, _) = config.build(cx).await.expect("sink should build"); + + let (rx, trigger, server) = build_test_server_generic(in_addr, make_ok_responder()); + tokio::spawn(server); + + let (_, events) = random_lines_with_stream(8, 1, None); + sink.run(events).await.expect("sink run failed"); + drop(trigger); + + let mut parts_list: Vec<_> = rx.collect().await; + assert_eq!(parts_list.len(), 1, "expected exactly one request"); + parts_list.remove(0).0 + } + + #[tokio::test] + async fn auth_inline_sends_authorization_and_site_id_headers() { + let in_addr = next_addr(); + let config: VectorConfig = toml::from_str(&format!( + r#" + address = "http://{}/" + [auth] + site_id = "site-123" + [auth.jwt_token] + type = "inline" + value = "my-jwt-token" + "#, + in_addr + )) + .unwrap(); + + let parts = run_auth_sink(config, in_addr).await; + + assert_eq!( + parts.headers.get("authorization").unwrap().to_str().unwrap(), + "Bearer my-jwt-token" + ); + assert_eq!( + parts.headers.get("x-site-id").unwrap().to_str().unwrap(), + "site-123" + ); + } + + #[tokio::test] + async fn auth_file_sends_authorization_and_site_id_headers() { + let mut f = tempfile::NamedTempFile::new().unwrap(); + std::io::Write::write_all(&mut f, b"file-token").unwrap(); + + let in_addr = next_addr(); + let config: VectorConfig = toml::from_str(&format!( + r#" + address = "http://{}/" + [auth] + site_id = "site-file" + [auth.jwt_token] + type = "file" + path = "{}" + "#, + in_addr, + f.path().display() + )) + .unwrap(); + + let parts = run_auth_sink(config, in_addr).await; + + assert_eq!( + parts.headers.get("authorization").unwrap().to_str().unwrap(), + "Bearer file-token" + ); + assert_eq!( + parts.headers.get("x-site-id").unwrap().to_str().unwrap(), + "site-file" + ); + } + + #[tokio::test] + async fn no_auth_sends_no_auth_headers() { + let in_addr = next_addr(); + let config: VectorConfig = + toml::from_str(&format!(r#"address = "http://{}/""#, in_addr)).unwrap(); + + let parts = run_auth_sink(config, in_addr).await; + + assert!( + parts.headers.get("authorization").is_none(), + "no authorization header expected" + ); + assert!( + parts.headers.get("x-site-id").is_none(), + "no x-site-id header expected" + ); + } + + #[tokio::test] + async fn auth_file_missing_token_file_fails_batch() { + let in_addr = next_addr(); + let config: VectorConfig = toml::from_str(&format!( + r#" + address = "http://{}/" + [auth] + site_id = "site-abc" + [auth.jwt_token] + type = "file" + path = "/nonexistent/path/to/token" + "#, + in_addr + )) + .unwrap(); + + let cx = SinkContext::default(); + let (sink, _) = config.build(cx).await.unwrap(); + + let (_rx, _trigger, server) = build_test_server_generic(in_addr, make_ok_responder()); + tokio::spawn(server); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_, events) = random_lines_with_stream(8, 1, Some(batch)); + + sink.run(events).await.expect("run itself should not error"); + assert_eq!( + receiver.try_recv(), + Ok(BatchStatus::Rejected), + "batch should be rejected when token file is missing" + ); + } + + #[tokio::test] + async fn build_rejects_invalid_site_id() { + let in_addr = next_addr(); + + // Parse a valid config, then inject an invalid site_id programmatically + // (NUL bytes cannot appear in TOML strings so we must bypass the parser). + let mut config: VectorConfig = toml::from_str(&format!( + r#" + address = "http://{}/" + [auth] + site_id = "valid-site" + [auth.jwt_token] + type = "inline" + value = "token" + "#, + in_addr + )) + .unwrap(); + config.auth.as_mut().unwrap().site_id = "bad\0site".into(); + + let cx = SinkContext::default(); + assert!( + config.build(cx).await.is_err(), + "build should fail for site_id with invalid characters" + ); + } + + #[tokio::test] + async fn build_rejects_invalid_inline_token() { + let in_addr = next_addr(); + + let mut config: VectorConfig = toml::from_str(&format!( + r#" + address = "http://{}/" + [auth] + site_id = "valid-site" + [auth.jwt_token] + type = "inline" + value = "token" + "#, + in_addr + )) + .unwrap(); + + use crate::sinks::util::JwtTokenConfig; + config.auth.as_mut().unwrap().jwt_token = + JwtTokenConfig::Inline { value: "bad\0token".into() }; + + let cx = SinkContext::default(); + assert!( + config.build(cx).await.is_err(), + "build should fail for inline token with invalid characters" + ); + } + // taken from fn encode_body(msg: T) -> Bytes where diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index 1a0b240e43..0362e112dd 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::task::{Context, Poll}; use futures::{future::BoxFuture, TryFutureExt}; @@ -11,12 +12,12 @@ use tower::Service; use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_lib::stream::DriverResponse; -use super::VectorSinkError; +use super::{config::VectorSinkAuthConfig, VectorSinkError}; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, internal_events::EndpointBytesSent, proto::vector as proto_vector, - sinks::util::uri, + sinks::util::{uri, JwtAuthState}, Error, }; @@ -25,6 +26,7 @@ pub struct VectorService { pub client: proto_vector::Client, pub protocol: String, pub endpoint: String, + auth: Option>, } pub struct VectorResponse { @@ -69,7 +71,8 @@ impl VectorService { hyper_client: hyper::Client>, BoxBody>, uri: Uri, compression: bool, - ) -> Self { + auth: Option, + ) -> crate::Result { let (protocol, endpoint) = uri::protocol_endpoint(uri.clone()); let mut proto_client = proto_vector::Client::new(HyperSvc { uri, @@ -79,11 +82,21 @@ impl VectorService { if compression { proto_client = proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip); } - Self { + + let auth = auth + .map(|cfg| { + JwtAuthState::from_config(&cfg.site_id, cfg.jwt_token) + .map(|state| Arc::new(state)) + .map_err(|e| -> Error { e.into() }) + }) + .transpose()?; + + Ok(Self { client: proto_client, protocol, endpoint, - } + auth, + }) } } @@ -110,9 +123,22 @@ impl Service for VectorService { let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); let future = async move { + let mut request = list.request.into_request(); + + // Inject auth metadata on every call so a rotated token is always used. + if let Some(auth) = &service.auth { + let bearer = auth.bearer_token().map_err(|message| { + VectorSinkError::JwtTokenUnavailable { message } + })?; + request.metadata_mut().insert("authorization", bearer); + request + .metadata_mut() + .insert("x-site-id", auth.site_id().clone()); + } + service .client - .push_events(list.request.into_request()) + .push_events(request) .map_ok(|_response| { emit!(EndpointBytesSent { byte_size, diff --git a/src/sources/util/jwt_auth.rs b/src/sources/util/jwt_auth.rs new file mode 100644 index 0000000000..aecc198c4e --- /dev/null +++ b/src/sources/util/jwt_auth.rs @@ -0,0 +1,565 @@ +use std::sync::Arc; + +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; +use serde_json::Value; +use vector_lib::configurable::configurable_component; + +/// Errors returned by [`JwtAuth::validate`]. +#[derive(Debug)] +pub enum JwtAuthError { + /// The `authorization` header was present but the token is invalid, malformed, expired, + /// or failed signature verification. Callers should respond with HTTP 401 / + /// gRPC `Unauthenticated`. + InvalidToken(&'static str), + + /// The token was valid but the companion membership value header (`x-site-id`) was + /// absent or contained non-ASCII bytes. Callers should respond with HTTP 401 / + /// gRPC `Unauthenticated`. + MissingMembershipValue, + + /// The membership value is not listed in the token's [`JwtAuthConfig::membership_claim`]. + /// Callers should respond with HTTP 403 / gRPC `PermissionDenied`. + MembershipNotAuthorized, +} + +/// Source of the RSA public key PEM used to verify JWT signatures. +/// +/// Exactly one variant must be configured. +/// +/// ## Examples +/// +/// Inline PEM (use Vector's `${VAR}` interpolation for env vars): +/// ```toml +/// public_key.type = "inline" +/// public_key.value = "${AUTH0_RSA_PUBLIC_KEY}" +/// ``` +/// +/// File path (preferred for Kubernetes ConfigMap mounts — key is read once at startup): +/// ```toml +/// public_key.type = "file" +/// public_key.path = "/etc/certs/auth0.pem" +/// ``` +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum JwtPublicKey { + /// Inline PEM value. + /// + /// Supports Vector's `${ENV_VAR}` interpolation, e.g. + /// `value = "${AUTH0_RSA_PUBLIC_KEY}"`. The value is read once at startup. + Inline { + /// RSA public key in PEM format. + value: String, + }, + + /// Path to a file containing the RSA public key in PEM format. + /// + /// Preferred for Kubernetes ConfigMap or secret volume mounts. + /// The file is read once at source startup. + File { + /// Path to the PEM file. + path: String, + }, +} + +/// JWT authentication configuration for sources. +/// +/// Can be embedded in any source — HTTP-based (Splunk HEC, http_server, …) or gRPC-based +/// (vector, opentelemetry, …). The validation logic is transport-agnostic: callers extract +/// the `authorization` and membership-value header strings themselves and pass them to +/// [`JwtAuth::validate`]. +/// +/// Tokens are Auth0-issued RS256 JWTs. The RSA public key is parsed **once** at source +/// startup; per-request validation is purely in-process with no network calls. +/// +/// ## Backward-compatible fallback +/// +/// When the `authorization` header is absent, [`JwtAuth::validate`] returns `Ok(None)`. +/// Agents that predate authentication continue to be accepted during a rolling upgrade. +/// +/// ## Example +/// +/// ```toml +/// [sources.my_source.auth] +/// public_key.type = "inline" +/// public_key.value = "${AUTH0_RSA_PUBLIC_KEY}" +/// jwt_issuer = "https://your-tenant.example.com/" +/// jwt_audience = ["https://your-api-identifier"] +/// membership_claim = "https://your-domain.com/site_ids" +/// ``` +#[configurable_component] +#[derive(Clone, Debug)] +pub struct JwtAuthConfig { + /// Source of the RSA public key PEM used to verify JWT signatures. + /// + /// Set exactly one of `type = "inline"` (with `value`) or `type = "file"` (with `path`). + pub public_key: JwtPublicKey, + + /// Expected `iss` (issuer) claim, e.g. `"https://your-tenant.example.com/"`. + /// + /// When set, tokens whose issuer does not match are rejected. + #[serde(skip_serializing_if = "Option::is_none")] + pub jwt_issuer: Option, + + /// Expected `aud` (audience) claim values. + /// + /// When set, tokens that do not include at least one of these audiences are rejected. + #[serde(skip_serializing_if = "Option::is_none")] + pub jwt_audience: Option>, + + /// Name of the JWT claim whose array value the incoming membership value is checked + /// against. + /// + /// Auth0 custom claims must be namespaced, e.g. + /// `"https://your-domain.com/site_ids"`. Defaults to `"site_ids"`. + #[serde(default = "default_membership_claim")] + pub membership_claim: String, +} + +fn default_membership_claim() -> String { + "site_ids".to_string() +} + +impl JwtAuthConfig { + /// Builds the runtime [`JwtAuth`] by loading and parsing the RSA public key. + /// + /// All I/O and PEM parsing happen here — once at startup. + /// The resulting [`JwtAuth`] is cheap to clone and holds no file handles. + pub fn build(&self) -> crate::Result { + let pem = self.load_pem()?; + + let decoding_key = DecodingKey::from_rsa_pem(pem.as_bytes()) + .map_err(|e| format!("Failed to parse RSA public key PEM: {e}"))?; + + let mut validation = Validation::new(Algorithm::RS256); + + if let Some(issuer) = &self.jwt_issuer { + validation.set_issuer(&[issuer]); + } + + if let Some(audiences) = &self.jwt_audience { + validation.set_audience(audiences); + } else { + validation.validate_aud = false; + } + + Ok(JwtAuth(Arc::new(Inner { + decoding_key, + validation, + membership_claim: self.membership_claim.clone(), + }))) + } + + fn load_pem(&self) -> crate::Result { + match &self.public_key { + JwtPublicKey::Inline { value } => Ok(value.clone()), + JwtPublicKey::File { path } => std::fs::read_to_string(path) + .map_err(|e| format!("Failed to read JWT public key from '{path}': {e}").into()), + } + } +} + +// Private — holds the parsed key and validation config behind Arc so JwtAuth is +// cheap to clone across tokio tasks without copying the RSA key bytes. +struct Inner { + decoding_key: DecodingKey, + validation: Validation, + membership_claim: String, +} + +/// Runtime JWT authentication handle built from [`JwtAuthConfig`]. +/// +/// Cheap to clone — all state is held behind an [`Arc`]. +#[derive(Clone)] +pub struct JwtAuth(Arc); + +impl std::fmt::Debug for JwtAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JwtAuth") + .field("membership_claim", &self.0.membership_claim) + .finish_non_exhaustive() + } +} + +impl JwtAuth { + /// Validate JWT authentication from raw header string values. + /// + /// # Parameters + /// + /// * `authorization` — value of the `authorization` / `Authorization` header, if present. + /// Expected format: `"Bearer "`. + /// * `membership_value` — the value to look up inside the token's + /// [`JwtAuthConfig::membership_claim`] array (e.g. the `x-site-id` header value). + /// + /// # Returns + /// + /// * `Ok(None)` — `authorization` was absent; request is from a legacy sender and is + /// allowed through for backwards compatibility. + /// * `Ok(Some(value))` — token is valid and `value` was found in the membership claim. + /// The returned `&str` borrows from the `membership_value` argument. + /// * `Err(JwtAuthError::InvalidToken)` — token is malformed, expired, has a bad + /// signature, wrong issuer/audience, or the claim is missing. + /// * `Err(JwtAuthError::MissingMembershipValue)` — token is valid but + /// `membership_value` was `None`. + /// * `Err(JwtAuthError::MembershipNotAuthorized)` — value is not in the claim array. + pub fn validate<'a>( + &self, + authorization: Option<&str>, + membership_value: Option<&'a str>, + ) -> Result, JwtAuthError> { + let Some(auth_value) = authorization else { + debug!(message = "No authorization header; allowing request (legacy client fallback)."); + return Ok(None); + }; + + let token = auth_value + .strip_prefix("Bearer ") + .ok_or(JwtAuthError::InvalidToken("authorization must use Bearer scheme"))?; + + let inner = &self.0; + + let token_data = + decode::>(token, &inner.decoding_key, &inner.validation) + .map_err(|err| { + warn!(message = "JWT validation failed.", error = %err); + JwtAuthError::InvalidToken("invalid or expired token") + })?; + + let value = membership_value.ok_or(JwtAuthError::MissingMembershipValue)?; + + let allowed = token_data + .claims + .get(&inner.membership_claim) + .and_then(Value::as_array) + .ok_or(JwtAuthError::InvalidToken("token missing membership claim"))?; + + if !allowed.iter().any(|v| v.as_str() == Some(value)) { + warn!( + message = "JWT membership check failed.", + value = value, + claim = %inner.membership_claim, + ); + return Err(JwtAuthError::MembershipNotAuthorized); + } + + Ok(Some(value)) + } +} + +#[cfg(all(test, feature = "sources-utils-jwt-auth"))] +mod tests { + use std::collections::HashMap; + use std::io::Write; + use std::time::{SystemTime, UNIX_EPOCH}; + + use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; + + use super::*; + + // RSA-2048 test key pair (not used outside of tests). + const TEST_PRIVATE_KEY: &str = "-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDJ5D7lpMrGJpl7 +zCcZ73XqbzBaagaPa9QDoGmypTbOoiysnnmcTHfy+wcP2aBlDTC8aB+7iPdZr0tA +ENdzIQ0/kZFBWCdwqAtQYDyfGuZx9y+3E9I8RFleDqDSwA6aUrSoesC9OBHztebX +0m4T9dAWzn8Vr3CYKVpp4XcYwfX6iWszCm43zv4fCJu/qYX67IvOP8h66OMBZ8s7 +A4K15z1n8ScI3R6v6amc94iB7z2B9hdvuoTKk89dF5XGxE1ZVnIzSPr/8/oQQJgG +RaYqQAViy4kPmctW4uaI9ajQPIQe58LpNh1lDw+aLRHO/e0SCqbUNARTLSdSIwNV +3dltWgS9AgMBAAECggEAHPo4NuDYw+kdZYHvaM8QdyYfZBLMv0AkTaL0GNKS08S+ +McaLQO5O1x7FrDY5yddDU/+D8nhdvE8nN1pTejBXxPSBS0Y6XvaXrSErAlErm1b1 +z8q2BbVvuErUNXugfPD7AiWgTWhjVz4YFIkdCJtjEyrvXa7xM73XvtPAMtsAEcXv +MgeRaZVdIledQUozu72RfPuG0yYWG5j+1W1IjNDcuLvld+RrZZ6JqyedhHMwlsFU +bi1DDGaBvp7jkDr6hDp81dqUVposvq+yw3THoyDnQCNxrSCfDpRkYk7DWJKVD8XS +6GvFHuHfaktzm+KkUHBQAebGn6qM+3QBIOWXZkHBdwKBgQDwhVtLUNnz7LLOlAxH +/IF5WM96DoPilOG548yMt/81Zez9QzgJXhxefhCpl2ZQDUCWr9CFvn+98XFai8jt +voVQMV23AGi6nJJ+jGw9koQUt/uYAxZ4U8tG0KqxVGhmrab1MfTpLp2mQWkJN7y1 +Hk5moPHwpQhxW73qlzwR8Ug8FwKBgQDW4nX8ZvFfmyJcrckquh0KMpILe5i+klmd +ENU7TmlQ8Sq1QX2j+w4gOWpUR6/bnij1XeEsI21z10Sv3yEgu2E8V7Cqf9mJX0in ++H5+WpEbTHqgfWhA8wXoZIizRfHDKOsOnhNmTFMBBrcp0zd4V1N1xH+APkw1q3jF +YxnmMAMmSwKBgBH5xYLxffiO/iYWRnyy0HJjQs5ae1zZx6z+63Cw56/z+CxNc8iv +cetV/KTQHeNpuiQI68qzHBT0EIa138R08r21ks10iF86CHDQyd4oLxrlTTZlNK61 +hIG8YqVyK4NRAyNcInOy+jFMvi7kLYRTyYQ+DxbvHpxqQN1hhCnLIJztAoGAakX9 +zCKtZXc3+1YHk5YQHqb8C6nI1RdUMpXMn1QcSee8E4CcPqk/RzieGaiKlLcX0qHn +ZwjubMgeNEzJ+YIyiMFloi0wzPvO1yPSi3MHKNUeIJllIhoO5ewyn1cMRlTKS6Rq +O8Grm2pS0+CeImot4KSZ2jb1QeXYCOcGPA2qwRkCgYEAnCI12DQuInN8nLEo4qtq +XEgyvUZ0fGaezcmeT4hhY94l0/HXS0D0qXs/f/rvfFFnvRYlEyiycA4pClkNRNkM +TM9RBaFTEKw9NQP895KUx6hHIAM/LB1Qyf7cDixtwf8ly7Gqhx4vU9tCiiDGSr9Z +T+QEb2Rxj5SJ8cGbNr+NAEI= +-----END PRIVATE KEY-----"; + + const TEST_PUBLIC_KEY: &str = "-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyeQ+5aTKxiaZe8wnGe91 +6m8wWmoGj2vUA6BpsqU2zqIsrJ55nEx38vsHD9mgZQ0wvGgfu4j3Wa9LQBDXcyEN +P5GRQVgncKgLUGA8nxrmcfcvtxPSPERZXg6g0sAOmlK0qHrAvTgR87Xm19JuE/XQ +Fs5/Fa9wmClaaeF3GMH1+olrMwpuN87+Hwibv6mF+uyLzj/IeujjAWfLOwOCtec9 +Z/EnCN0er+mpnPeIge89gfYXb7qEypPPXReVxsRNWVZyM0j6//P6EECYBkWmKkAF +YsuJD5nLVuLmiPWo0DyEHufC6TYdZQ8Pmi0Rzv3tEgqm1DQEUy0nUiMDVd3ZbVoE +vQIDAQAB +-----END PUBLIC KEY-----"; + + fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + } + + /// Signs a JWT with the test private key. `extra` is merged into the claims. + fn make_token(extra: HashMap<&str, serde_json::Value>) -> String { + let mut claims = serde_json::Map::new(); + claims.insert("sub".into(), serde_json::json!("test-user")); + claims.insert("exp".into(), serde_json::json!(now_secs() + 3600)); + claims.insert("site_ids".into(), serde_json::json!(["site-123", "site-456"])); + for (k, v) in extra { + claims.insert(k.into(), v); + } + let key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY.as_bytes()).unwrap(); + encode(&Header::new(Algorithm::RS256), &claims, &key).unwrap() + } + + fn bearer(token: &str) -> String { + format!("Bearer {token}") + } + + /// Builds a `JwtAuth` from the test public key with optional issuer/audience. + fn build_auth(issuer: Option<&str>, audience: Option>) -> JwtAuth { + JwtAuthConfig { + public_key: JwtPublicKey::Inline { + value: TEST_PUBLIC_KEY.to_string(), + }, + jwt_issuer: issuer.map(str::to_string), + jwt_audience: audience.map(|v| v.iter().map(|s| s.to_string()).collect()), + membership_claim: "site_ids".to_string(), + } + .build() + .unwrap() + } + + // ── JwtAuthConfig::build ───────────────────────────────────────────────── + + #[test] + fn build_from_inline_pem_succeeds() { + let cfg = JwtAuthConfig { + public_key: JwtPublicKey::Inline { + value: TEST_PUBLIC_KEY.to_string(), + }, + jwt_issuer: None, + jwt_audience: None, + membership_claim: "site_ids".to_string(), + }; + assert!(cfg.build().is_ok()); + } + + #[test] + fn build_from_file_pem_succeeds() { + let mut f = tempfile::NamedTempFile::new().unwrap(); + f.write_all(TEST_PUBLIC_KEY.as_bytes()).unwrap(); + + let cfg = JwtAuthConfig { + public_key: JwtPublicKey::File { + path: f.path().to_str().unwrap().into(), + }, + jwt_issuer: None, + jwt_audience: None, + membership_claim: "site_ids".to_string(), + }; + assert!(cfg.build().is_ok()); + } + + #[test] + fn build_with_invalid_pem_fails() { + let cfg = JwtAuthConfig { + public_key: JwtPublicKey::Inline { + value: "this is not a PEM".to_string(), + }, + jwt_issuer: None, + jwt_audience: None, + membership_claim: "site_ids".to_string(), + }; + assert!(cfg.build().is_err()); + } + + #[test] + fn build_with_missing_pem_file_fails() { + let cfg = JwtAuthConfig { + public_key: JwtPublicKey::File { + path: "/nonexistent/path/key.pem".to_string(), + }, + jwt_issuer: None, + jwt_audience: None, + membership_claim: "site_ids".to_string(), + }; + assert!(cfg.build().is_err()); + } + + // ── JwtAuth::validate ──────────────────────────────────────────────────── + + #[test] + fn no_auth_header_allows_legacy_client() { + let auth = build_auth(None, None); + let result = auth.validate(None, Some("site-123")); + assert!(matches!(result, Ok(None))); + } + + #[test] + fn valid_token_with_authorized_site_id_passes() { + let auth = build_auth(None, None); + let token = make_token(HashMap::new()); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert_eq!(result.unwrap(), Some("site-123")); + } + + #[test] + fn non_bearer_scheme_rejected() { + let auth = build_auth(None, None); + let token = make_token(HashMap::new()); + let result = auth.validate(Some(&format!("Basic {token}")), Some("site-123")); + assert!(matches!(result, Err(JwtAuthError::InvalidToken(_)))); + } + + #[test] + fn malformed_token_rejected() { + let auth = build_auth(None, None); + let result = auth.validate(Some("Bearer not.a.jwt"), Some("site-123")); + assert!(matches!(result, Err(JwtAuthError::InvalidToken(_)))); + } + + #[test] + fn expired_token_rejected() { + let auth = build_auth(None, None); + let mut extra = HashMap::new(); + // exp in the past + extra.insert("exp", serde_json::json!(now_secs() - 3600)); + let token = make_token(extra); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert!(matches!(result, Err(JwtAuthError::InvalidToken(_)))); + } + + #[test] + fn wrong_issuer_rejected() { + let auth = build_auth(Some("https://expected.example.com/"), None); + let mut extra = HashMap::new(); + extra.insert("iss", serde_json::json!("https://other.example.com/")); + let token = make_token(extra); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert!(matches!(result, Err(JwtAuthError::InvalidToken(_)))); + } + + #[test] + fn matching_issuer_passes() { + let auth = build_auth(Some("https://expected.example.com/"), None); + let mut extra = HashMap::new(); + extra.insert("iss", serde_json::json!("https://expected.example.com/")); + let token = make_token(extra); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert!(result.is_ok()); + } + + #[test] + fn wrong_audience_rejected() { + let auth = build_auth(None, Some(vec!["https://expected-api/"])); + let mut extra = HashMap::new(); + extra.insert("aud", serde_json::json!(["https://other-api/"])); + let token = make_token(extra); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert!(matches!(result, Err(JwtAuthError::InvalidToken(_)))); + } + + #[test] + fn matching_audience_passes() { + let auth = build_auth(None, Some(vec!["https://expected-api/"])); + let mut extra = HashMap::new(); + extra.insert("aud", serde_json::json!(["https://expected-api/"])); + let token = make_token(extra); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert!(result.is_ok()); + } + + #[test] + fn missing_site_id_header_returns_missing_membership_value() { + let auth = build_auth(None, None); + let token = make_token(HashMap::new()); + let result = auth.validate(Some(&bearer(&token)), None); + assert!(matches!(result, Err(JwtAuthError::MissingMembershipValue))); + } + + #[test] + fn unauthorized_site_id_returns_membership_not_authorized() { + let auth = build_auth(None, None); + let token = make_token(HashMap::new()); + let result = auth.validate(Some(&bearer(&token)), Some("site-not-in-token")); + assert!(matches!( + result, + Err(JwtAuthError::MembershipNotAuthorized) + )); + } + + #[test] + fn missing_membership_claim_in_token_rejected() { + let auth = build_auth(None, None); + // Token has no site_ids claim at all. + let mut claims = serde_json::Map::new(); + claims.insert("sub".into(), serde_json::json!("user")); + claims.insert("exp".into(), serde_json::json!(now_secs() + 3600)); + let key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY.as_bytes()).unwrap(); + let token = encode(&Header::new(Algorithm::RS256), &claims, &key).unwrap(); + let result = auth.validate(Some(&bearer(&token)), Some("site-123")); + assert!(matches!(result, Err(JwtAuthError::InvalidToken(_)))); + } + + #[test] + fn custom_membership_claim_is_checked() { + let cfg = JwtAuthConfig { + public_key: JwtPublicKey::Inline { + value: TEST_PUBLIC_KEY.to_string(), + }, + jwt_issuer: None, + jwt_audience: None, + membership_claim: "allowed_tenants".to_string(), + }; + let auth = cfg.build().unwrap(); + + let mut extra = HashMap::new(); + extra.insert("allowed_tenants", serde_json::json!(["tenant-abc"])); + let token = make_token(extra); + + // "site-123" is in site_ids but not in allowed_tenants — membership check uses the + // configured claim, so this should be unauthorised rather than an invalid token. + assert!(matches!( + auth.validate(Some(&bearer(&token)), Some("site-123")), + Err(JwtAuthError::MembershipNotAuthorized) + )); + assert!(auth + .validate(Some(&bearer(&token)), Some("tenant-abc")) + .is_ok()); + } + + // ── JwtPublicKey serde ─────────────────────────────────────────────────── + + #[test] + fn public_key_inline_deserializes() { + let toml = r#"type = "inline" +value = "my-pem-value""#; + let key: JwtPublicKey = toml::from_str(toml).unwrap(); + assert!(matches!(key, JwtPublicKey::Inline { value } if value == "my-pem-value")); + } + + #[test] + fn public_key_file_deserializes() { + let toml = r#"type = "file" +path = "/etc/certs/auth0.pem""#; + let key: JwtPublicKey = toml::from_str(toml).unwrap(); + assert!( + matches!(key, JwtPublicKey::File { path } if path == "/etc/certs/auth0.pem") + ); + } + + #[test] + fn public_key_missing_type_fails() { + assert!(toml::from_str::(r#"value = "pem""#).is_err()); + } + + #[test] + fn public_key_unknown_type_fails() { + assert!(toml::from_str::(r#"type = "env""#).is_err()); + } +} diff --git a/src/sources/util/mod.rs b/src/sources/util/mod.rs index 20a527501e..4c1537c34b 100644 --- a/src/sources/util/mod.rs +++ b/src/sources/util/mod.rs @@ -1,6 +1,8 @@ #![allow(missing_docs)] #[cfg(feature = "sources-http_server")] mod body_decoding; +#[cfg(feature = "sources-utils-jwt-auth")] +pub mod jwt_auth; mod encoding_config; #[cfg(all(unix, feature = "sources-dnstap"))] pub mod framestream; @@ -69,6 +71,8 @@ pub use self::http::ErrorMessage; pub use self::http::HttpSource; #[cfg(feature = "sources-utils-http-auth")] pub use self::http::HttpSourceAuthConfig; +#[cfg(feature = "sources-utils-jwt-auth")] +pub use self::jwt_auth::{JwtAuth, JwtAuthConfig, JwtAuthError}; #[cfg(any( feature = "sources-aws_sqs", feature = "sources-gcp_pubsub", diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index 77020c618d..227753162d 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -21,7 +21,10 @@ use crate::{ internal_events::{EventsReceived, StreamClosedError}, proto::vector as proto, serde::bool_or_struct, - sources::{util::grpc::run_grpc_server, Source}, + sources::{ + util::{grpc::run_grpc_server, JwtAuth, JwtAuthConfig, JwtAuthError}, + Source, + }, tls::{MaybeTlsSettings, TlsEnableableConfig}, SourceSender, }; @@ -40,6 +43,8 @@ struct Service { pipeline: SourceSender, acknowledgements: bool, log_namespace: LogNamespace, + /// Present when JWT authentication is enabled. + auth: Option, } #[tonic::async_trait] @@ -48,6 +53,21 @@ impl proto::Service for Service { &self, request: Request, ) -> Result, Status> { + if let Some(auth) = &self.auth { + let metadata = request.metadata(); + let authorization = metadata.get("authorization").and_then(|v| v.to_str().ok()); + let site_id = metadata.get("x-site-id").and_then(|v| v.to_str().ok()); + auth.validate(authorization, site_id).map_err(|e| match e { + JwtAuthError::InvalidToken(msg) => Status::unauthenticated(msg), + JwtAuthError::MissingMembershipValue => { + Status::unauthenticated("missing x-site-id metadata header") + } + JwtAuthError::MembershipNotAuthorized => { + Status::permission_denied("site ID not authorized by this token") + } + })?; + } + let mut events: Vec = request .into_inner() .events @@ -138,6 +158,13 @@ pub struct VectorConfig { #[serde(default)] #[configurable(metadata(docs::hidden))] pub log_namespace: Option, + + /// JWT authentication settings. + /// + /// When omitted, all incoming requests are accepted without authentication. + /// See [`JwtAuthConfig`] for full documentation. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub auth: Option, } impl VectorConfig { @@ -158,6 +185,7 @@ impl Default for VectorConfig { tls: None, acknowledgements: Default::default(), log_namespace: None, + auth: None, } } } @@ -176,10 +204,13 @@ impl SourceConfig for VectorConfig { let acknowledgements = cx.do_acknowledgements(self.acknowledgements); let log_namespace = cx.log_namespace(self.log_namespace); + let auth = self.auth.as_ref().map(|cfg| cfg.build()).transpose()?; + let service = proto::Server::new(Service { pipeline: cx.out, acknowledgements, log_namespace, + auth, }) .accept_compressed(tonic::codec::CompressionEncoding::Gzip) // Tonic added a default of 4MB in 0.9. This replaces the old behavior. @@ -339,4 +370,198 @@ mod tests { ); run_test(&config, addr).await; } + + // ── JWT auth integration tests ─────────────────────────────────────────── + // + // These tests require `sources-utils-jwt-auth` (for jsonwebtoken) in addition + // to the `sinks-vector` feature already guarding this module. + + #[cfg(feature = "sources-utils-jwt-auth")] + mod jwt_auth_tests { + use std::collections::HashMap; + use std::time::{SystemTime, UNIX_EPOCH}; + + use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; + use vector_lib::event::{BatchNotifier, BatchStatus}; + + use super::*; + + const TEST_PRIVATE_KEY: &str = "-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDJ5D7lpMrGJpl7 +zCcZ73XqbzBaagaPa9QDoGmypTbOoiysnnmcTHfy+wcP2aBlDTC8aB+7iPdZr0tA +ENdzIQ0/kZFBWCdwqAtQYDyfGuZx9y+3E9I8RFleDqDSwA6aUrSoesC9OBHztebX +0m4T9dAWzn8Vr3CYKVpp4XcYwfX6iWszCm43zv4fCJu/qYX67IvOP8h66OMBZ8s7 +A4K15z1n8ScI3R6v6amc94iB7z2B9hdvuoTKk89dF5XGxE1ZVnIzSPr/8/oQQJgG +RaYqQAViy4kPmctW4uaI9ajQPIQe58LpNh1lDw+aLRHO/e0SCqbUNARTLSdSIwNV +3dltWgS9AgMBAAECggEAHPo4NuDYw+kdZYHvaM8QdyYfZBLMv0AkTaL0GNKS08S+ +McaLQO5O1x7FrDY5yddDU/+D8nhdvE8nN1pTejBXxPSBS0Y6XvaXrSErAlErm1b1 +z8q2BbVvuErUNXugfPD7AiWgTWhjVz4YFIkdCJtjEyrvXa7xM73XvtPAMtsAEcXv +MgeRaZVdIledQUozu72RfPuG0yYWG5j+1W1IjNDcuLvld+RrZZ6JqyedhHMwlsFU +bi1DDGaBvp7jkDr6hDp81dqUVposvq+yw3THoyDnQCNxrSCfDpRkYk7DWJKVD8XS +6GvFHuHfaktzm+KkUHBQAebGn6qM+3QBIOWXZkHBdwKBgQDwhVtLUNnz7LLOlAxH +/IF5WM96DoPilOG548yMt/81Zez9QzgJXhxefhCpl2ZQDUCWr9CFvn+98XFai8jt +voVQMV23AGi6nJJ+jGw9koQUt/uYAxZ4U8tG0KqxVGhmrab1MfTpLp2mQWkJN7y1 +Hk5moPHwpQhxW73qlzwR8Ug8FwKBgQDW4nX8ZvFfmyJcrckquh0KMpILe5i+klmd +ENU7TmlQ8Sq1QX2j+w4gOWpUR6/bnij1XeEsI21z10Sv3yEgu2E8V7Cqf9mJX0in ++H5+WpEbTHqgfWhA8wXoZIizRfHDKOsOnhNmTFMBBrcp0zd4V1N1xH+APkw1q3jF +YxnmMAMmSwKBgBH5xYLxffiO/iYWRnyy0HJjQs5ae1zZx6z+63Cw56/z+CxNc8iv +cetV/KTQHeNpuiQI68qzHBT0EIa138R08r21ks10iF86CHDQyd4oLxrlTTZlNK61 +hIG8YqVyK4NRAyNcInOy+jFMvi7kLYRTyYQ+DxbvHpxqQN1hhCnLIJztAoGAakX9 +zCKtZXc3+1YHk5YQHqb8C6nI1RdUMpXMn1QcSee8E4CcPqk/RzieGaiKlLcX0qHn +ZwjubMgeNEzJ+YIyiMFloi0wzPvO1yPSi3MHKNUeIJllIhoO5ewyn1cMRlTKS6Rq +O8Grm2pS0+CeImot4KSZ2jb1QeXYCOcGPA2qwRkCgYEAnCI12DQuInN8nLEo4qtq +XEgyvUZ0fGaezcmeT4hhY94l0/HXS0D0qXs/f/rvfFFnvRYlEyiycA4pClkNRNkM +TM9RBaFTEKw9NQP895KUx6hHIAM/LB1Qyf7cDixtwf8ly7Gqhx4vU9tCiiDGSr9Z +T+QEb2Rxj5SJ8cGbNr+NAEI= +-----END PRIVATE KEY-----"; + + const TEST_PUBLIC_KEY: &str = "-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyeQ+5aTKxiaZe8wnGe91 +6m8wWmoGj2vUA6BpsqU2zqIsrJ55nEx38vsHD9mgZQ0wvGgfu4j3Wa9LQBDXcyEN +P5GRQVgncKgLUGA8nxrmcfcvtxPSPERZXg6g0sAOmlK0qHrAvTgR87Xm19JuE/XQ +Fs5/Fa9wmClaaeF3GMH1+olrMwpuN87+Hwibv6mF+uyLzj/IeujjAWfLOwOCtec9 +Z/EnCN0er+mpnPeIge89gfYXb7qEypPPXReVxsRNWVZyM0j6//P6EECYBkWmKkAF +YsuJD5nLVuLmiPWo0DyEHufC6TYdZQ8Pmi0Rzv3tEgqm1DQEUy0nUiMDVd3ZbVoE +vQIDAQAB +-----END PUBLIC KEY-----"; + + fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + } + + fn make_token(extra: HashMap<&str, serde_json::Value>) -> String { + let mut claims = serde_json::Map::new(); + claims.insert("sub".into(), serde_json::json!("test-agent")); + claims.insert("exp".into(), serde_json::json!(now_secs() + 3600)); + claims.insert("site_ids".into(), serde_json::json!(["site-123"])); + for (k, v) in extra { + claims.insert(k.into(), v); + } + let key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY.as_bytes()).unwrap(); + encode(&Header::new(Algorithm::RS256), &claims, &key).unwrap() + } + + /// Builds a source and sink, runs `events` through them, and returns the + /// final `BatchStatus` seen by the sender. + async fn run_auth_pair( + source_auth_toml: &str, + sink_auth_toml: &str, + ) -> BatchStatus { + let addr = test_util::next_addr(); + + let source: VectorConfig = toml::from_str(&format!( + "address = \"{addr}\"\n{source_auth_toml}" + )) + .unwrap(); + + let (tx, _rx) = SourceSender::new_test(); + let server = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(server); + test_util::wait_for_tcp(addr).await; + + let sink_toml = format!("address = \"http://{addr}/\"\n{sink_auth_toml}"); + let sink_cfg: SinkConfig = toml::from_str(&sink_toml).unwrap(); + let (sink, _) = sink_cfg.build(SinkContext::default()).await.unwrap(); + + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let (_, stream) = test_util::random_lines_with_stream(8, 5, Some(batch)); + sink.run(stream).await.unwrap(); + + receiver.await + } + + #[tokio::test] + async fn valid_token_and_site_id_delivers() { + let token = make_token(HashMap::new()); + let source_auth = format!( + r#"[auth] +public_key.type = "inline" +public_key.value = "{}" +membership_claim = "site_ids""#, + TEST_PUBLIC_KEY.replace('\n', "\\n") + ); + let sink_auth = format!( + r#"[auth] +site_id = "site-123" +jwt_token.type = "inline" +jwt_token.value = "{token}""# + ); + assert_eq!( + run_auth_pair(&source_auth, &sink_auth).await, + BatchStatus::Delivered + ); + } + + #[tokio::test] + async fn legacy_sink_without_auth_is_accepted() { + // Source has auth configured, sink sends no token → legacy fallback allows through. + let source_auth = format!( + r#"[auth] +public_key.type = "inline" +public_key.value = "{}" +membership_claim = "site_ids""#, + TEST_PUBLIC_KEY.replace('\n', "\\n") + ); + assert_eq!( + run_auth_pair(&source_auth, "").await, + BatchStatus::Delivered + ); + } + + #[tokio::test] + async fn invalid_token_is_rejected() { + let source_auth = format!( + r#"[auth] +public_key.type = "inline" +public_key.value = "{}" +membership_claim = "site_ids""#, + TEST_PUBLIC_KEY.replace('\n', "\\n") + ); + let sink_auth = r#"[auth] +site_id = "site-123" +jwt_token.type = "inline" +jwt_token.value = "not.a.valid.jwt""#; + assert_eq!( + run_auth_pair(&source_auth, sink_auth).await, + BatchStatus::Rejected + ); + } + + #[tokio::test] + async fn unauthorized_site_id_is_rejected() { + let token = make_token(HashMap::new()); // site_ids = ["site-123"] + let source_auth = format!( + r#"[auth] +public_key.type = "inline" +public_key.value = "{}" +membership_claim = "site_ids""#, + TEST_PUBLIC_KEY.replace('\n', "\\n") + ); + let sink_auth = format!( + r#"[auth] +site_id = "site-not-in-token" +jwt_token.type = "inline" +jwt_token.value = "{token}""# + ); + assert_eq!( + run_auth_pair(&source_auth, &sink_auth).await, + BatchStatus::Rejected + ); + } + + #[tokio::test] + async fn source_without_auth_accepts_all_requests() { + // Source has no auth config; any sink (with or without a token) is accepted. + assert_eq!( + run_auth_pair("", "").await, + BatchStatus::Delivered + ); + } + } }