diff --git a/src/telemetry_window_demo/ai_assisted_detection_demo/pipeline.py b/src/telemetry_window_demo/ai_assisted_detection_demo/pipeline.py index 6b8a74e..265a7eb 100644 --- a/src/telemetry_window_demo/ai_assisted_detection_demo/pipeline.py +++ b/src/telemetry_window_demo/ai_assisted_detection_demo/pipeline.py @@ -11,6 +11,7 @@ import yaml +from ..time_utils import parse_utc_timestamp from .llm import DemoStructuredCaseLlm SEVERITY_ORDER = {"low": 1, "medium": 2, "high": 3, "critical": 4} @@ -1037,7 +1038,7 @@ def derive_pipeline_ts(raw_events: Sequence[Mapping[str, Any]]) -> str: def parse_timestamp(raw_value: str) -> datetime: - return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).astimezone(UTC) + return parse_utc_timestamp(raw_value) def format_timestamp(value: datetime) -> str: diff --git a/src/telemetry_window_demo/config_change_investigation_demo/pipeline.py b/src/telemetry_window_demo/config_change_investigation_demo/pipeline.py index 7cd756a..2fe4c57 100644 --- a/src/telemetry_window_demo/config_change_investigation_demo/pipeline.py +++ b/src/telemetry_window_demo/config_change_investigation_demo/pipeline.py @@ -8,6 +8,8 @@ import yaml +from ..time_utils import parse_utc_timestamp + SEVERITY_ORDER = {"low": 1, "medium": 2, "high": 3, "critical": 4} CHANGE_REQUIRED_FIELDS = ( "change_id", @@ -35,6 +37,11 @@ "event_type", "details", ) +CONFIG_INPUT_PATH_FIELDS = ( + "config_changes", + "policy_denials", + "follow_on_events", +) def default_demo_root() -> Path: @@ -46,13 +53,14 @@ def run_demo( artifacts_dir: Path | None = None, ) -> dict[str, Any]: demo_root = Path(demo_root or default_demo_root()).resolve() - config = load_yaml(demo_root / "config" / "investigation.yaml") - input_paths = config.get("input_paths", {}) + config = validate_demo_config(load_yaml(demo_root / "config" / "investigation.yaml")) + input_paths = config["input_paths"] artifacts_dir = Path( artifacts_dir - or resolve_demo_path(demo_root, str(config.get("artifacts_dir", "artifacts"))) + or resolve_demo_path(demo_root, str(config["artifacts_dir"])) ).resolve() artifacts_dir.mkdir(parents=True, exist_ok=True) + correlation_minutes = int(config["correlation_minutes"]) config_changes = normalize_config_changes( load_jsonl(resolve_demo_path(demo_root, str(input_paths["config_changes"]))) @@ -69,17 +77,17 @@ def run_demo( rule_hits, policy_denials, follow_on_events, - correlation_minutes=int(config.get("correlation_minutes", 15)), + correlation_minutes=correlation_minutes, ) summary = build_investigation_summary( investigations, - correlation_minutes=int(config.get("correlation_minutes", 15)), + correlation_minutes=correlation_minutes, ) report_text = build_investigation_report( config_changes=config_changes, rule_hits=rule_hits, investigations=investigations, - correlation_minutes=int(config.get("correlation_minutes", 15)), + correlation_minutes=correlation_minutes, ) paths = { @@ -119,6 +127,39 @@ def load_yaml(path: Path) -> dict[str, Any]: return payload +def validate_demo_config(config: Mapping[str, Any]) -> dict[str, Any]: + input_paths = config.get("input_paths") + if not isinstance(input_paths, Mapping): + raise ValueError("Config field 'input_paths' must be a mapping.") + + validated_input_paths: dict[str, str] = {} + for field in CONFIG_INPUT_PATH_FIELDS: + validated_input_paths[field] = require_non_empty_string( + input_paths.get(field), + f"input_paths.{field}", + ) + + artifacts_dir = require_non_empty_string( + config.get("artifacts_dir", "artifacts"), + "artifacts_dir", + ) + correlation_minutes = require_positive_int( + config.get("correlation_minutes", 15), + "correlation_minutes", + ) + + rules = config.get("rules") + if not isinstance(rules, list) or not rules: + raise ValueError("Config field 'rules' must be a non-empty list.") + + return { + "input_paths": validated_input_paths, + "artifacts_dir": artifacts_dir, + "correlation_minutes": correlation_minutes, + "rules": rules, + } + + def load_jsonl(path: Path) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] with path.open("r", encoding="utf-8") as handle: @@ -306,6 +347,24 @@ def validate_rules(rules: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]: return validated +def require_positive_int(value: Any, field_name: str) -> int: + if isinstance(value, bool): + raise ValueError(f"{field_name} must be a positive integer.") + try: + parsed = int(value) + except (TypeError, ValueError) as exc: + raise ValueError(f"{field_name} must be a positive integer.") from exc + if parsed <= 0: + raise ValueError(f"{field_name} must be a positive integer.") + return parsed + + +def require_non_empty_string(value: Any, field_name: str) -> str: + if not isinstance(value, str) or not value.strip(): + raise ValueError(f"Config field '{field_name}' must be a non-empty string.") + return value.strip() + + def build_investigations( rule_hits: Sequence[Mapping[str, Any]], policy_denials: Sequence[Mapping[str, Any]], @@ -313,6 +372,10 @@ def build_investigations( correlation_minutes: int, ) -> list[dict[str, Any]]: investigations: list[dict[str, Any]] = [] + correlation_minutes = require_positive_int( + correlation_minutes, + "correlation_minutes", + ) correlation_window = timedelta(minutes=correlation_minutes) for hit in rule_hits: @@ -464,7 +527,7 @@ def normalize_optional_text(value: Any) -> str | None: def parse_timestamp(raw_value: str) -> datetime: - return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).astimezone(UTC) + return parse_utc_timestamp(raw_value) def format_timestamp(value: Any) -> str: diff --git a/src/telemetry_window_demo/rule_evaluation_and_dedup_demo/pipeline.py b/src/telemetry_window_demo/rule_evaluation_and_dedup_demo/pipeline.py index 91c0d3b..27a9345 100644 --- a/src/telemetry_window_demo/rule_evaluation_and_dedup_demo/pipeline.py +++ b/src/telemetry_window_demo/rule_evaluation_and_dedup_demo/pipeline.py @@ -9,6 +9,8 @@ import yaml +from ..time_utils import parse_utc_timestamp + SCOPE_FIELDS = ("entity", "source", "target", "host") REQUIRED_HIT_FIELDS = ( "hit_id", @@ -588,7 +590,7 @@ def write_text(content: str, path: Path) -> Path: def parse_timestamp(raw_value: str) -> datetime: - return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).astimezone(UTC) + return parse_utc_timestamp(raw_value) def format_timestamp(value: Any) -> str: diff --git a/src/telemetry_window_demo/time_utils.py b/src/telemetry_window_demo/time_utils.py new file mode 100644 index 0000000..cba5c89 --- /dev/null +++ b/src/telemetry_window_demo/time_utils.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from datetime import UTC, datetime + + +def parse_utc_timestamp(raw_value: str) -> datetime: + text = str(raw_value).strip() + if not text: + raise ValueError("Timestamp must be non-empty.") + + try: + timestamp = datetime.fromisoformat(text.replace("Z", "+00:00")) + except ValueError as exc: + raise ValueError(f"Invalid timestamp: {raw_value!r}") from exc + + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=UTC) + return timestamp.astimezone(UTC) diff --git a/tests/test_ai_assisted_detection_demo.py b/tests/test_ai_assisted_detection_demo.py index 3687bad..ecbf995 100644 --- a/tests/test_ai_assisted_detection_demo.py +++ b/tests/test_ai_assisted_detection_demo.py @@ -22,6 +22,7 @@ load_jsonl, load_yaml, normalize_events, + parse_timestamp, parse_and_validate_json_output, ) @@ -109,6 +110,12 @@ def test_rules_trigger_expected_hits() -> None: assert all(hit["attack_mapping"]["technique_id"] for hit in rule_hits) +def test_parse_timestamp_treats_naive_values_as_utc() -> None: + assert parse_timestamp("2026-03-10T10:00:00").isoformat() == ( + "2026-03-10T10:00:00+00:00" + ) + + def test_grouping_merges_hits_by_entities_and_time() -> None: _, _, _, _, grouped_cases, _ = _demo_inputs() diff --git a/tests/test_config_change_investigation_demo.py b/tests/test_config_change_investigation_demo.py index 9defee7..c097341 100644 --- a/tests/test_config_change_investigation_demo.py +++ b/tests/test_config_change_investigation_demo.py @@ -1,8 +1,12 @@ from __future__ import annotations import json +import shutil from pathlib import Path +import pytest +import yaml + from telemetry_window_demo.config_change_investigation_demo import default_demo_root, run_demo from telemetry_window_demo.config_change_investigation_demo.pipeline import ( build_investigations, @@ -12,6 +16,8 @@ normalize_config_changes, normalize_follow_on_events, normalize_policy_denials, + parse_timestamp, + validate_demo_config, ) @@ -34,6 +40,13 @@ def _load_json_file(path: Path): return json.loads(path.read_text(encoding="utf-8")) +def _copy_demo_root(tmp_path: Path) -> Path: + source_root = default_demo_root() + target_root = tmp_path / "demo-copy" + shutil.copytree(source_root, target_root) + return target_root + + def test_normalize_config_changes_is_sorted_and_complete() -> None: _, _, config_changes, _, _ = _load_demo_inputs() @@ -59,6 +72,12 @@ def test_evaluate_risky_config_changes_flags_expected_changes() -> None: assert [hit["severity"] for hit in hits] == ["critical", "high", "high"] +def test_parse_timestamp_treats_naive_values_as_utc() -> None: + assert parse_timestamp("2026-03-20T09:15:00").isoformat() == ( + "2026-03-20T09:15:00+00:00" + ) + + def test_build_investigations_uses_bounded_system_and_time_correlation() -> None: _, config, config_changes, policy_denials, follow_on_events = _load_demo_inputs() hits = evaluate_risky_config_changes(config_changes, config["rules"]) @@ -86,6 +105,111 @@ def test_build_investigations_uses_bounded_system_and_time_correlation() -> None ) +def test_build_investigations_includes_evidence_on_window_end_only() -> None: + change_time = parse_timestamp("2026-03-20T09:00:00Z") + rule_hits = [ + { + "investigation_id": "CCI-999", + "severity": "high", + "rule_id": "cfg_test", + "reason": "test rule", + "change_event": { + "change_id": "cfg-test", + "timestamp": change_time, + "actor": "operator", + "target_system": "identity-proxy", + "config_key": "test_key", + "old_value": "safe", + "new_value": "risky", + "change_result": "success", + }, + } + ] + policy_denials = [ + { + "denial_id": "before", + "timestamp": parse_timestamp("2026-03-20T08:59:59Z"), + "actor": "operator", + "target_system": "identity-proxy", + "policy_name": "before", + "decision": "deny", + "reason": "too early", + }, + { + "denial_id": "on-end", + "timestamp": parse_timestamp("2026-03-20T09:15:00Z"), + "actor": "operator", + "target_system": "identity-proxy", + "policy_name": "boundary", + "decision": "deny", + "reason": "inside boundary", + }, + { + "denial_id": "after", + "timestamp": parse_timestamp("2026-03-20T09:15:01Z"), + "actor": "operator", + "target_system": "identity-proxy", + "policy_name": "after", + "decision": "deny", + "reason": "too late", + }, + ] + + investigations = build_investigations( + rule_hits, + policy_denials, + follow_on_events=[], + correlation_minutes=15, + ) + + assert [item["denial_id"] for item in investigations[0]["attached_policy_denials"]] == [ + "on-end" + ] + + +def test_build_investigations_rejects_non_positive_correlation_window() -> None: + with pytest.raises(ValueError, match="correlation_minutes"): + build_investigations([], [], [], correlation_minutes=0) + + +def test_validate_demo_config_reports_missing_input_path_key() -> None: + _, config, _, _, _ = _load_demo_inputs() + config["input_paths"].pop("policy_denials") + + with pytest.raises(ValueError, match="input_paths.policy_denials"): + validate_demo_config(config) + + +def test_validate_demo_config_rejects_bad_correlation_window_type() -> None: + _, config, _, _, _ = _load_demo_inputs() + config["correlation_minutes"] = True + + with pytest.raises(ValueError, match="correlation_minutes"): + validate_demo_config(config) + + +def test_validate_demo_config_rejects_missing_rules() -> None: + _, config, _, _, _ = _load_demo_inputs() + config["rules"] = [] + + with pytest.raises(ValueError, match="rules"): + validate_demo_config(config) + + +def test_run_demo_reports_config_errors_before_loading_inputs(tmp_path) -> None: + demo_root = _copy_demo_root(tmp_path) + config_path = demo_root / "config" / "investigation.yaml" + config = load_yaml(config_path) + config["input_paths"].pop("config_changes") + config_path.write_text( + yaml.safe_dump(config, sort_keys=False), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="input_paths.config_changes"): + run_demo(demo_root=demo_root, artifacts_dir=tmp_path / "artifacts") + + def test_run_demo_is_deterministic_and_matches_committed_artifacts(tmp_path) -> None: demo_root, _, _, _, _ = _load_demo_inputs() first_dir = tmp_path / "run-one" diff --git a/tests/test_rule_evaluation_and_dedup_demo.py b/tests/test_rule_evaluation_and_dedup_demo.py index b04a756..681bec8 100644 --- a/tests/test_rule_evaluation_and_dedup_demo.py +++ b/tests/test_rule_evaluation_and_dedup_demo.py @@ -9,6 +9,7 @@ group_rule_hits_by_cooldown_key, load_json, normalize_rule_hits, + parse_timestamp, ) @@ -53,6 +54,12 @@ def test_group_rule_hits_by_rule_and_resolved_scope() -> None: } +def test_parse_timestamp_treats_naive_values_as_utc() -> None: + assert parse_timestamp("2026-03-18T10:00:00").isoformat() == ( + "2026-03-18T10:00:00+00:00" + ) + + def test_deduplicate_rule_hits_respects_cooldown_boundary() -> None: hits = normalize_rule_hits( [