Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import yaml

from ..time_utils import parse_utc_timestamp
from .llm import DemoStructuredCaseLlm

SEVERITY_ORDER = {"low": 1, "medium": 2, "high": 3, "critical": 4}
Expand Down Expand Up @@ -1037,7 +1038,7 @@ def derive_pipeline_ts(raw_events: Sequence[Mapping[str, Any]]) -> str:


def parse_timestamp(raw_value: str) -> datetime:
return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).astimezone(UTC)
return parse_utc_timestamp(raw_value)


def format_timestamp(value: datetime) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import yaml

from ..time_utils import parse_utc_timestamp

SEVERITY_ORDER = {"low": 1, "medium": 2, "high": 3, "critical": 4}
CHANGE_REQUIRED_FIELDS = (
"change_id",
Expand Down Expand Up @@ -35,6 +37,11 @@
"event_type",
"details",
)
CONFIG_INPUT_PATH_FIELDS = (
"config_changes",
"policy_denials",
"follow_on_events",
)


def default_demo_root() -> Path:
Expand All @@ -46,13 +53,14 @@ def run_demo(
artifacts_dir: Path | None = None,
) -> dict[str, Any]:
demo_root = Path(demo_root or default_demo_root()).resolve()
config = load_yaml(demo_root / "config" / "investigation.yaml")
input_paths = config.get("input_paths", {})
config = validate_demo_config(load_yaml(demo_root / "config" / "investigation.yaml"))
input_paths = config["input_paths"]
artifacts_dir = Path(
artifacts_dir
or resolve_demo_path(demo_root, str(config.get("artifacts_dir", "artifacts")))
or resolve_demo_path(demo_root, str(config["artifacts_dir"]))
).resolve()
artifacts_dir.mkdir(parents=True, exist_ok=True)
correlation_minutes = int(config["correlation_minutes"])

config_changes = normalize_config_changes(
load_jsonl(resolve_demo_path(demo_root, str(input_paths["config_changes"])))
Expand All @@ -69,17 +77,17 @@ def run_demo(
rule_hits,
policy_denials,
follow_on_events,
correlation_minutes=int(config.get("correlation_minutes", 15)),
correlation_minutes=correlation_minutes,
)
summary = build_investigation_summary(
investigations,
correlation_minutes=int(config.get("correlation_minutes", 15)),
correlation_minutes=correlation_minutes,
)
report_text = build_investigation_report(
config_changes=config_changes,
rule_hits=rule_hits,
investigations=investigations,
correlation_minutes=int(config.get("correlation_minutes", 15)),
correlation_minutes=correlation_minutes,
)

paths = {
Expand Down Expand Up @@ -119,6 +127,39 @@ def load_yaml(path: Path) -> dict[str, Any]:
return payload


def validate_demo_config(config: Mapping[str, Any]) -> dict[str, Any]:
input_paths = config.get("input_paths")
if not isinstance(input_paths, Mapping):
raise ValueError("Config field 'input_paths' must be a mapping.")

validated_input_paths: dict[str, str] = {}
for field in CONFIG_INPUT_PATH_FIELDS:
validated_input_paths[field] = require_non_empty_string(
input_paths.get(field),
f"input_paths.{field}",
)

artifacts_dir = require_non_empty_string(
config.get("artifacts_dir", "artifacts"),
"artifacts_dir",
)
correlation_minutes = require_positive_int(
config.get("correlation_minutes", 15),
"correlation_minutes",
)

rules = config.get("rules")
if not isinstance(rules, list) or not rules:
raise ValueError("Config field 'rules' must be a non-empty list.")

return {
"input_paths": validated_input_paths,
"artifacts_dir": artifacts_dir,
"correlation_minutes": correlation_minutes,
"rules": rules,
}


def load_jsonl(path: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8") as handle:
Expand Down Expand Up @@ -306,13 +347,35 @@ def validate_rules(rules: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]:
return validated


def require_positive_int(value: Any, field_name: str) -> int:
if isinstance(value, bool):
raise ValueError(f"{field_name} must be a positive integer.")
try:
parsed = int(value)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject non-integer correlation window values

require_positive_int currently coerces values with int(value), so a YAML value like correlation_minutes: 1.5 is silently truncated to 1 instead of being rejected. Because this helper is used by validate_demo_config and build_investigations, a misconfigured fractional window can shrink the evidence window and drop expected correlated events without any error, which defeats the intent of strict config validation.

Useful? React with 👍 / 👎.

except (TypeError, ValueError) as exc:
raise ValueError(f"{field_name} must be a positive integer.") from exc
if parsed <= 0:
raise ValueError(f"{field_name} must be a positive integer.")
return parsed


def require_non_empty_string(value: Any, field_name: str) -> str:
if not isinstance(value, str) or not value.strip():
raise ValueError(f"Config field '{field_name}' must be a non-empty string.")
return value.strip()


def build_investigations(
rule_hits: Sequence[Mapping[str, Any]],
policy_denials: Sequence[Mapping[str, Any]],
follow_on_events: Sequence[Mapping[str, Any]],
correlation_minutes: int,
) -> list[dict[str, Any]]:
investigations: list[dict[str, Any]] = []
correlation_minutes = require_positive_int(
correlation_minutes,
"correlation_minutes",
)
correlation_window = timedelta(minutes=correlation_minutes)

for hit in rule_hits:
Expand Down Expand Up @@ -464,7 +527,7 @@ def normalize_optional_text(value: Any) -> str | None:


def parse_timestamp(raw_value: str) -> datetime:
return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).astimezone(UTC)
return parse_utc_timestamp(raw_value)


def format_timestamp(value: Any) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import yaml

from ..time_utils import parse_utc_timestamp

SCOPE_FIELDS = ("entity", "source", "target", "host")
REQUIRED_HIT_FIELDS = (
"hit_id",
Expand Down Expand Up @@ -588,7 +590,7 @@ def write_text(content: str, path: Path) -> Path:


def parse_timestamp(raw_value: str) -> datetime:
return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).astimezone(UTC)
return parse_utc_timestamp(raw_value)


def format_timestamp(value: Any) -> str:
Expand Down
18 changes: 18 additions & 0 deletions src/telemetry_window_demo/time_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from datetime import UTC, datetime


def parse_utc_timestamp(raw_value: str) -> datetime:
text = str(raw_value).strip()
if not text:
raise ValueError("Timestamp must be non-empty.")

try:
timestamp = datetime.fromisoformat(text.replace("Z", "+00:00"))
except ValueError as exc:
raise ValueError(f"Invalid timestamp: {raw_value!r}") from exc

if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=UTC)
return timestamp.astimezone(UTC)
7 changes: 7 additions & 0 deletions tests/test_ai_assisted_detection_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
load_jsonl,
load_yaml,
normalize_events,
parse_timestamp,
parse_and_validate_json_output,
)

Expand Down Expand Up @@ -109,6 +110,12 @@ def test_rules_trigger_expected_hits() -> None:
assert all(hit["attack_mapping"]["technique_id"] for hit in rule_hits)


def test_parse_timestamp_treats_naive_values_as_utc() -> None:
assert parse_timestamp("2026-03-10T10:00:00").isoformat() == (
"2026-03-10T10:00:00+00:00"
)


def test_grouping_merges_hits_by_entities_and_time() -> None:
_, _, _, _, grouped_cases, _ = _demo_inputs()

Expand Down
124 changes: 124 additions & 0 deletions tests/test_config_change_investigation_demo.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

import json
import shutil
from pathlib import Path

import pytest
import yaml

from telemetry_window_demo.config_change_investigation_demo import default_demo_root, run_demo
from telemetry_window_demo.config_change_investigation_demo.pipeline import (
build_investigations,
Expand All @@ -12,6 +16,8 @@
normalize_config_changes,
normalize_follow_on_events,
normalize_policy_denials,
parse_timestamp,
validate_demo_config,
)


Expand All @@ -34,6 +40,13 @@ def _load_json_file(path: Path):
return json.loads(path.read_text(encoding="utf-8"))


def _copy_demo_root(tmp_path: Path) -> Path:
source_root = default_demo_root()
target_root = tmp_path / "demo-copy"
shutil.copytree(source_root, target_root)
return target_root


def test_normalize_config_changes_is_sorted_and_complete() -> None:
_, _, config_changes, _, _ = _load_demo_inputs()

Expand All @@ -59,6 +72,12 @@ def test_evaluate_risky_config_changes_flags_expected_changes() -> None:
assert [hit["severity"] for hit in hits] == ["critical", "high", "high"]


def test_parse_timestamp_treats_naive_values_as_utc() -> None:
assert parse_timestamp("2026-03-20T09:15:00").isoformat() == (
"2026-03-20T09:15:00+00:00"
)


def test_build_investigations_uses_bounded_system_and_time_correlation() -> None:
_, config, config_changes, policy_denials, follow_on_events = _load_demo_inputs()
hits = evaluate_risky_config_changes(config_changes, config["rules"])
Expand Down Expand Up @@ -86,6 +105,111 @@ def test_build_investigations_uses_bounded_system_and_time_correlation() -> None
)


def test_build_investigations_includes_evidence_on_window_end_only() -> None:
change_time = parse_timestamp("2026-03-20T09:00:00Z")
rule_hits = [
{
"investigation_id": "CCI-999",
"severity": "high",
"rule_id": "cfg_test",
"reason": "test rule",
"change_event": {
"change_id": "cfg-test",
"timestamp": change_time,
"actor": "operator",
"target_system": "identity-proxy",
"config_key": "test_key",
"old_value": "safe",
"new_value": "risky",
"change_result": "success",
},
}
]
policy_denials = [
{
"denial_id": "before",
"timestamp": parse_timestamp("2026-03-20T08:59:59Z"),
"actor": "operator",
"target_system": "identity-proxy",
"policy_name": "before",
"decision": "deny",
"reason": "too early",
},
{
"denial_id": "on-end",
"timestamp": parse_timestamp("2026-03-20T09:15:00Z"),
"actor": "operator",
"target_system": "identity-proxy",
"policy_name": "boundary",
"decision": "deny",
"reason": "inside boundary",
},
{
"denial_id": "after",
"timestamp": parse_timestamp("2026-03-20T09:15:01Z"),
"actor": "operator",
"target_system": "identity-proxy",
"policy_name": "after",
"decision": "deny",
"reason": "too late",
},
]

investigations = build_investigations(
rule_hits,
policy_denials,
follow_on_events=[],
correlation_minutes=15,
)

assert [item["denial_id"] for item in investigations[0]["attached_policy_denials"]] == [
"on-end"
]


def test_build_investigations_rejects_non_positive_correlation_window() -> None:
with pytest.raises(ValueError, match="correlation_minutes"):
build_investigations([], [], [], correlation_minutes=0)


def test_validate_demo_config_reports_missing_input_path_key() -> None:
_, config, _, _, _ = _load_demo_inputs()
config["input_paths"].pop("policy_denials")

with pytest.raises(ValueError, match="input_paths.policy_denials"):
validate_demo_config(config)


def test_validate_demo_config_rejects_bad_correlation_window_type() -> None:
_, config, _, _, _ = _load_demo_inputs()
config["correlation_minutes"] = True

with pytest.raises(ValueError, match="correlation_minutes"):
validate_demo_config(config)


def test_validate_demo_config_rejects_missing_rules() -> None:
_, config, _, _, _ = _load_demo_inputs()
config["rules"] = []

with pytest.raises(ValueError, match="rules"):
validate_demo_config(config)


def test_run_demo_reports_config_errors_before_loading_inputs(tmp_path) -> None:
demo_root = _copy_demo_root(tmp_path)
config_path = demo_root / "config" / "investigation.yaml"
config = load_yaml(config_path)
config["input_paths"].pop("config_changes")
config_path.write_text(
yaml.safe_dump(config, sort_keys=False),
encoding="utf-8",
)

with pytest.raises(ValueError, match="input_paths.config_changes"):
run_demo(demo_root=demo_root, artifacts_dir=tmp_path / "artifacts")


def test_run_demo_is_deterministic_and_matches_committed_artifacts(tmp_path) -> None:
demo_root, _, _, _, _ = _load_demo_inputs()
first_dir = tmp_path / "run-one"
Expand Down
Loading
Loading