diff --git a/pyproject.toml b/pyproject.toml
index 4e9eb27..916f8d8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -70,6 +70,10 @@ target-version = "py314"
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP", "B", "C4", "SIM"]
+[tool.ruff.lint.per-file-ignores]
+# Inline HTML/CSS/JS template has unavoidably long lines
+"s3proxy/admin/templates.py" = ["E501"]
+
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
diff --git a/s3proxy/admin/__init__.py b/s3proxy/admin/__init__.py
new file mode 100644
index 0000000..1534196
--- /dev/null
+++ b/s3proxy/admin/__init__.py
@@ -0,0 +1,6 @@
+"""Admin dashboard for S3Proxy."""
+
+from .collectors import record_request
+from .router import create_admin_router
+
+__all__ = ["create_admin_router", "record_request"]
diff --git a/s3proxy/admin/auth.py b/s3proxy/admin/auth.py
new file mode 100644
index 0000000..5d1030d
--- /dev/null
+++ b/s3proxy/admin/auth.py
@@ -0,0 +1,44 @@
+"""HTTP Basic Auth for admin dashboard."""
+
+from __future__ import annotations
+
+import secrets
+from typing import TYPE_CHECKING
+
+from fastapi import Depends, HTTPException, status
+from fastapi.security import HTTPBasic, HTTPBasicCredentials
+
+if TYPE_CHECKING:
+ from ..config import Settings
+
+security = HTTPBasic(realm="S3Proxy Admin")
+_security_dep = Depends(security)
+
+
+def create_auth_dependency(settings: Settings, credentials_store: dict[str, str]):
+ """Build a Basic Auth dependency using configured or AWS-derived credentials."""
+ if settings.admin_username and settings.admin_password:
+ valid_username = settings.admin_username
+ valid_password = settings.admin_password
+ elif credentials_store:
+ valid_username = next(iter(credentials_store.keys()))
+ valid_password = credentials_store[valid_username]
+ else:
+ raise RuntimeError("No credentials configured for admin auth")
+
+ async def verify(credentials: HTTPBasicCredentials = _security_dep):
+ ok_user = secrets.compare_digest(
+ credentials.username.encode(), valid_username.encode()
+ )
+ ok_pass = secrets.compare_digest(
+ credentials.password.encode(), valid_password.encode()
+ )
+ if not (ok_user and ok_pass):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Invalid credentials",
+ headers={"WWW-Authenticate": 'Basic realm="S3Proxy Admin"'},
+ )
+ return credentials
+
+ return verify
diff --git a/s3proxy/admin/collectors.py b/s3proxy/admin/collectors.py
new file mode 100644
index 0000000..92c973d
--- /dev/null
+++ b/s3proxy/admin/collectors.py
@@ -0,0 +1,380 @@
+"""Data collectors for the admin dashboard."""
+
+from __future__ import annotations
+
+import hashlib
+import os
+import time
+from collections import defaultdict, deque
+from dataclasses import asdict, dataclass
+from typing import TYPE_CHECKING
+
+from .. import metrics
+
+if TYPE_CHECKING:
+ from ..config import Settings
+
+
+# ---------------------------------------------------------------------------
+# Sliding-window rate tracker over Prometheus counters
+# ---------------------------------------------------------------------------
+
+
+class RateTracker:
+ """Sample counter values on a schedule, then compute deltas over the window."""
+
+ def __init__(self, window_seconds: int = 3600, max_samples: int = 180):
+ self._window = window_seconds
+ self._max_samples = max_samples
+ self._snapshots: deque[tuple[float, dict[str, float]]] = deque(maxlen=max_samples)
+
+ def record(self, counters: dict[str, float]) -> None:
+ now = time.monotonic()
+ self._snapshots.append((now, dict(counters)))
+ cutoff = now - self._window
+ while len(self._snapshots) > 2 and self._snapshots[0][0] < cutoff:
+ self._snapshots.popleft()
+
+ def rate_per_second(self, key: str) -> float:
+ if len(self._snapshots) < 2:
+ return 0.0
+ t0, v0 = self._snapshots[0]
+ t1, v1 = self._snapshots[-1]
+ elapsed = t1 - t0
+ if elapsed < 0.5:
+ return 0.0
+ delta = v1.get(key, 0.0) - v0.get(key, 0.0)
+ return max(0.0, delta / elapsed)
+
+ def total(self, key: str) -> float:
+ if not self._snapshots:
+ return 0.0
+ _, v0 = self._snapshots[0]
+ _, v1 = self._snapshots[-1]
+ return max(0.0, v1.get(key, 0.0) - v0.get(key, 0.0))
+
+ def sparkline(self, key: str, points: int = 30) -> list[float]:
+ """Return per-bucket deltas suitable for a sparkline."""
+ if len(self._snapshots) < 2:
+ return []
+ deltas: list[float] = []
+ snaps = list(self._snapshots)
+ for prev, curr in zip(snaps, snaps[1:], strict=False):
+ elapsed = curr[0] - prev[0]
+ if elapsed <= 0:
+ continue
+ deltas.append(max(0.0, curr[1].get(key, 0.0) - prev[1].get(key, 0.0)))
+ if len(deltas) > points:
+ step = len(deltas) / points
+ deltas = [deltas[int(i * step)] for i in range(points)]
+ return [round(v, 2) for v in deltas]
+
+ def earliest_value(self, key: str) -> float | None:
+ if not self._snapshots:
+ return None
+ return self._snapshots[0][1].get(key, 0.0)
+
+
+_rate_tracker = RateTracker()
+
+
+# ---------------------------------------------------------------------------
+# Request log — ring buffer for the activity feed
+# ---------------------------------------------------------------------------
+
+
+@dataclass(slots=True, frozen=True)
+class RequestEntry:
+ timestamp: float
+ method: str
+ operation: str
+ bucket: str
+ key: str
+ status: int
+ duration_ms: float
+ size: int
+ client_ip: str
+
+
+class RequestLog:
+ def __init__(self, maxlen: int = 200):
+ self._entries: deque[RequestEntry] = deque(maxlen=maxlen)
+
+ def record(self, entry: RequestEntry) -> None:
+ self._entries.append(entry)
+
+ def recent(self, limit: int = 10) -> list[dict]:
+ entries = list(self._entries)
+ entries.reverse()
+ return [asdict(e) for e in entries[:limit]]
+
+ def all(self) -> list[RequestEntry]:
+ return list(self._entries)
+
+
+_request_log = RequestLog(maxlen=200)
+
+
+def record_request(
+ method: str,
+ path: str,
+ operation: str,
+ status: int,
+ duration: float,
+ size: int,
+ client_ip: str = "",
+) -> None:
+ """Append a completed request to the ring buffer."""
+ bucket, key = _split_bucket_key(path)
+ _request_log.record(
+ RequestEntry(
+ timestamp=time.time(),
+ method=method,
+ operation=operation,
+ bucket=bucket,
+ key=key,
+ status=status,
+ duration_ms=round(duration * 1000, 1),
+ size=size,
+ client_ip=client_ip,
+ )
+ )
+
+
+def _split_bucket_key(path: str) -> tuple[str, str]:
+ stripped = path.lstrip("/")
+ if not stripped:
+ return "", ""
+ if "/" not in stripped:
+ return stripped, ""
+ bucket, _, key = stripped.partition("/")
+ return bucket, key
+
+
+# ---------------------------------------------------------------------------
+# Prometheus helpers
+# ---------------------------------------------------------------------------
+
+
+def _read_counter(counter) -> float:
+ return float(counter._value.get())
+
+
+def _read_labeled_counter_sum(counter) -> float:
+ total = 0.0
+ for sample in counter.collect()[0].samples:
+ if sample.name.endswith("_total"):
+ total += sample.value
+ return total
+
+
+def _read_errors_total() -> float:
+ errs = 0.0
+ for sample in metrics.REQUEST_COUNT.collect()[0].samples:
+ if not sample.name.endswith("_total"):
+ continue
+ status = str(sample.labels.get("status", ""))
+ if status.startswith(("4", "5")):
+ errs += sample.value
+ return errs
+
+
+# ---------------------------------------------------------------------------
+# Formatters
+# ---------------------------------------------------------------------------
+
+
+def _format_bytes(n: float) -> tuple[str, str]:
+ """Return (number, unit) pair so the UI can render them distinctly."""
+ value = float(n)
+ for unit in ("B", "KB", "MB", "GB", "TB", "PB"):
+ if abs(value) < 1024 or unit == "PB":
+ if unit == "B":
+ return f"{int(value)}", unit
+ return f"{value:.1f}" if value < 100 else f"{value:.0f}", unit
+ value /= 1024
+ return f"{value:.0f}", "PB"
+
+
+def _format_uptime(seconds: float) -> str:
+ s = int(seconds)
+ days, rem = divmod(s, 86400)
+ hours, rem = divmod(rem, 3600)
+ minutes = rem // 60
+ parts: list[str] = []
+ if days:
+ parts.append(f"{days}d")
+ if hours or days:
+ parts.append(f"{hours}h")
+ parts.append(f"{minutes}m")
+ return " ".join(parts)
+
+
+def _format_relative(ts: float, now: float | None = None) -> str:
+ delta = max(0.0, (now or time.time()) - ts)
+ if delta < 60:
+ return f"{int(delta)}s ago"
+ if delta < 3600:
+ return f"{int(delta // 60)}m ago"
+ if delta < 86400:
+ return f"{int(delta // 3600)}h ago"
+ return f"{int(delta // 86400)}d ago"
+
+
+def _format_size(n: int) -> str:
+ if n <= 0:
+ return "—"
+ num, unit = _format_bytes(n)
+ return f"{num} {unit}"
+
+
+# ---------------------------------------------------------------------------
+# Derived aggregations from the request log
+# ---------------------------------------------------------------------------
+
+
+def _derive_buckets(entries: list[RequestEntry]) -> list[dict]:
+ by_bucket: dict[str, dict] = defaultdict(
+ lambda: {"objects": set(), "bytes": 0, "last_seen": 0.0}
+ )
+ for e in entries:
+ if not e.bucket:
+ continue
+ info = by_bucket[e.bucket]
+ if e.key:
+ info["objects"].add(e.key)
+ if e.size > 0:
+ info["bytes"] += e.size
+ if e.timestamp > info["last_seen"]:
+ info["last_seen"] = e.timestamp
+
+ out: list[dict] = []
+ for name, info in by_bucket.items():
+ num, unit = _format_bytes(info["bytes"])
+ out.append(
+ {
+ "name": name,
+ "encrypted": True,
+ "objects": len(info["objects"]),
+ "size": f"{num} {unit}" if info["bytes"] > 0 else "—",
+ "last_seen": info["last_seen"],
+ }
+ )
+ out.sort(key=lambda b: b["last_seen"], reverse=True)
+ return out
+
+
+def _derive_keys(settings: Settings) -> list[dict]:
+ fp = hashlib.sha256(settings.kek).hexdigest()[:8]
+ return [
+ {
+ "id": f"key-{fp}",
+ "type": "Local (KEK)",
+ "status": "Active",
+ "created": "—",
+ }
+ ]
+
+
+# ---------------------------------------------------------------------------
+# Aggregate collector
+# ---------------------------------------------------------------------------
+
+
+def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") -> dict:
+ """Gather everything the dashboard renders in a single JSON blob."""
+ now = time.time()
+ uptime_s = max(0.0, time.monotonic() - start_time)
+
+ total_requests = _read_labeled_counter_sum(metrics.REQUEST_COUNT)
+ bytes_encrypted = _read_counter(metrics.BYTES_ENCRYPTED)
+ bytes_decrypted = _read_counter(metrics.BYTES_DECRYPTED)
+ errors_total = _read_errors_total()
+
+ counters = {
+ "requests": total_requests,
+ "bytes_crypto": bytes_encrypted + bytes_decrypted,
+ "errors": errors_total,
+ }
+ _rate_tracker.record(counters)
+
+ req_rate = _rate_tracker.rate_per_second("requests")
+ crypto_rate = _rate_tracker.rate_per_second("bytes_crypto")
+
+ num_enc, unit_enc = _format_bytes(bytes_encrypted)
+ num_thr, unit_thr = _format_bytes(crypto_rate)
+
+ entries = _request_log.all()
+ buckets = _derive_buckets(entries)
+ last_error_ts = next((e.timestamp for e in reversed(entries) if e.status >= 400), None)
+
+ return {
+ "header": {
+ "title": "S3 Encryption Proxy",
+ "status": "Running",
+ "uptime": _format_uptime(uptime_s),
+ "pod": os.environ.get("HOSTNAME", "local"),
+ "version": version,
+ },
+ "cards": {
+ "requests": {
+ "label": "Requests",
+ "value": f"{int(total_requests):,}",
+ "unit": "",
+ "spark": _rate_tracker.sparkline("requests"),
+ },
+ "data_encrypted": {
+ "label": "Data Encrypted",
+ "value": num_enc,
+ "unit": unit_enc,
+ "spark": _rate_tracker.sparkline("bytes_crypto"),
+ },
+ "errors": {
+ "label": "Errors",
+ "value": f"{int(errors_total):,}",
+ "unit": "",
+ "spark": _rate_tracker.sparkline("errors"),
+ },
+ "active_buckets": {
+ "label": "Active Buckets",
+ "value": str(len(buckets)),
+ "unit": "",
+ "detail": f"seen in last {len(entries)} reqs",
+ },
+ },
+ "activity": [
+ {
+ "time": _format_relative(e["timestamp"], now),
+ "operation": _operation_display(e["method"], e["operation"]),
+ "bucket": e["bucket"] or "—",
+ "object": e["key"] or "—",
+ "status": "Success" if e["status"] < 400 else "Error",
+ "status_code": e["status"],
+ "size": _format_size(e["size"]),
+ "client_ip": e["client_ip"] or "—",
+ "latency": f"{e['duration_ms']:.0f} ms",
+ }
+ for e in _request_log.recent(10)
+ ],
+ "buckets": [
+ {
+ "name": b["name"],
+ "encrypted": b["encrypted"],
+ "objects": f"{b['objects']:,}",
+ "size": b["size"],
+ }
+ for b in buckets[:8]
+ ],
+ "keys": _derive_keys(settings),
+ "footer": {
+ "version": version,
+ "req_per_s": f"{req_rate:.0f}",
+ "throughput": f"{num_thr} {unit_thr}/s" if crypto_rate > 0 else f"0 {unit_thr}/s",
+ "last_error": _format_relative(last_error_ts, now) if last_error_ts else "never",
+ },
+ }
+
+
+def _operation_display(method: str, operation: str) -> str:
+ """Shorten operation names for the feed (GET, PUT, DELETE, etc.)."""
+ return method or operation
diff --git a/s3proxy/admin/router.py b/s3proxy/admin/router.py
new file mode 100644
index 0000000..6fd9b95
--- /dev/null
+++ b/s3proxy/admin/router.py
@@ -0,0 +1,40 @@
+"""Admin dashboard router."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import APIRouter, Depends, Request
+from fastapi.responses import HTMLResponse, JSONResponse
+
+from .auth import create_auth_dependency
+from .collectors import collect_all
+from .templates import render_dashboard
+
+if TYPE_CHECKING:
+ from ..config import Settings
+
+
+def create_admin_router(
+ settings: Settings,
+ credentials_store: dict[str, str],
+ version: str = "1.0.0",
+) -> APIRouter:
+ """Build the admin dashboard router with Basic Auth."""
+ verify = create_auth_dependency(settings, credentials_store)
+ router = APIRouter(dependencies=[Depends(verify)])
+
+ @router.get("/", response_class=HTMLResponse)
+ async def dashboard() -> HTMLResponse:
+ return HTMLResponse(render_dashboard(admin_path=settings.admin_path))
+
+ @router.get("/api/status")
+ async def status(request: Request) -> JSONResponse:
+ data = collect_all(
+ request.app.state.settings,
+ request.app.state.start_time,
+ version=version,
+ )
+ return JSONResponse(data)
+
+ return router
diff --git a/s3proxy/admin/templates.py b/s3proxy/admin/templates.py
new file mode 100644
index 0000000..632825d
--- /dev/null
+++ b/s3proxy/admin/templates.py
@@ -0,0 +1,560 @@
+"""HTML template for the admin dashboard."""
+
+from __future__ import annotations
+
+_DASHBOARD_HTML = """
+
+
+
+
+S3 Encryption Proxy
+
+
+
+
+
+
+
+
+
+
+
+
S3 Encryption Proxy
+
+
+ Running
+ Uptime: —
+
+
+
+
+
+
+
+
+
+
+
+
Data Encrypted
+
+
—
+
+
+
+
+
+
+
+
+
+
+
+
Active Buckets
+
+
—
+
+
+
+
+
+
+
+
+
+ | Time | Operation | Bucket | Object |
+ Status | Size | Client IP | Latency |
+
+
+
+ |
+ No requests yet — traffic will appear here.
+ |
+
+
+
+
+
+
+
+
Buckets
+
+
+
+
+ | Name | Encryption | Objects | Size |
+
+
+ |
+ No buckets observed yet.
+ |
+
+
+
View all buckets →
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+def render_dashboard(admin_path: str = "/admin") -> str:
+ """Return the dashboard HTML with the API URL substituted."""
+ api_url = admin_path.rstrip("/") + "/api/status"
+ return _DASHBOARD_HTML.replace("__API_URL__", api_url)
diff --git a/s3proxy/app.py b/s3proxy/app.py
index ec755e6..eaa3fb2 100644
--- a/s3proxy/app.py
+++ b/s3proxy/app.py
@@ -5,6 +5,7 @@
import logging
import os
import sys
+import time
import uuid
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
@@ -78,8 +79,10 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
handler = S3ProxyHandler(settings, credentials_store, multipart_manager)
# Store in app.state for route access
+ app.state.settings = settings
app.state.handler = handler
app.state.verifier = verifier
+ app.state.start_time = time.monotonic()
yield
@@ -106,6 +109,15 @@ def create_app(settings: Settings | None = None) -> FastAPI:
app = FastAPI(title="S3Proxy", lifespan=lifespan, docs_url=None, redoc_url=None)
_register_exception_handlers(app)
+
+ if settings.admin_ui:
+ from .admin import create_admin_router
+
+ app.include_router(
+ create_admin_router(settings, credentials_store),
+ prefix=settings.admin_path,
+ )
+
_register_routes(app)
return app
@@ -116,7 +128,20 @@ def _register_exception_handlers(app: FastAPI) -> None:
@app.exception_handler(HTTPException)
async def s3_exception_handler(request: Request, exc: HTTPException):
- """Return S3-compatible error response with request ID."""
+ """Return S3-compatible error response with request ID.
+
+ Non-S3 exceptions that carry their own headers (e.g. admin auth 401 with
+ WWW-Authenticate) are passed through so browsers can prompt for credentials.
+ """
+ if not isinstance(exc, S3Error) and getattr(exc, "headers", None):
+ from fastapi.responses import JSONResponse
+
+ return JSONResponse(
+ status_code=exc.status_code,
+ content={"detail": exc.detail},
+ headers=exc.headers,
+ )
+
request_id = str(uuid.uuid4()).replace("-", "").upper()[:16]
if isinstance(exc, S3Error):
diff --git a/s3proxy/config.py b/s3proxy/config.py
index 5d6b5d1..6cf9c90 100644
--- a/s3proxy/config.py
+++ b/s3proxy/config.py
@@ -49,6 +49,18 @@ class Settings(BaseSettings):
# Logging
log_level: str = Field(default="INFO", description="Log level (DEBUG, INFO, WARNING, ERROR)")
+ # Admin dashboard
+ admin_ui: bool = Field(default=False, description="Enable the admin dashboard at admin_path")
+ admin_path: str = Field(default="/admin", description="URL path prefix for the admin UI")
+ admin_username: str = Field(
+ default="",
+ description="Admin dashboard username (falls back to AWS access key when blank)",
+ )
+ admin_password: str = Field(
+ default="",
+ description="Admin dashboard password (falls back to AWS secret key when blank)",
+ )
+
# Cached KEK derived from encrypt_key (computed once in model_post_init)
_kek: bytes = PrivateAttr()
diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py
index 763bc53..4e322c6 100644
--- a/s3proxy/request_handler.py
+++ b/s3proxy/request_handler.py
@@ -13,6 +13,7 @@
from structlog.stdlib import BoundLogger
from . import concurrency, crypto
+from .admin import record_request
from .errors import S3Error, raise_for_client_error, raise_for_exception
from .handlers import S3ProxyHandler
from .metrics import (
@@ -134,6 +135,13 @@ async def handle_proxy_request(
REQUEST_COUNT.labels(method=method, operation=operation, status=status_code).inc()
REQUEST_DURATION.labels(method=method, operation=operation).observe(duration)
+ try:
+ size = int(request.headers.get("content-length", "0"))
+ except ValueError:
+ size = 0
+ client_ip = request.client.host if request.client else ""
+ record_request(method, path, operation, status_code, duration, size, client_ip)
+
if reserved_memory > 0:
await concurrency.release_memory(reserved_memory)
logger.info(
diff --git a/tests/unit/test_admin.py b/tests/unit/test_admin.py
new file mode 100644
index 0000000..c57bf73
--- /dev/null
+++ b/tests/unit/test_admin.py
@@ -0,0 +1,151 @@
+"""Tests for the admin dashboard."""
+
+from __future__ import annotations
+
+import time
+
+import pytest
+from fastapi.testclient import TestClient
+
+from s3proxy import metrics
+from s3proxy.admin import collectors, record_request
+from s3proxy.admin.auth import create_auth_dependency
+from s3proxy.admin.router import create_admin_router
+from s3proxy.admin.templates import render_dashboard
+from s3proxy.config import Settings
+
+
+def _reset_collector_state() -> None:
+ collectors._request_log._entries.clear()
+ collectors._rate_tracker._snapshots.clear()
+
+
+@pytest.fixture(autouse=True)
+def _clean_state():
+ _reset_collector_state()
+ yield
+ _reset_collector_state()
+
+
+@pytest.fixture
+def admin_settings():
+ return Settings(
+ host="http://localhost:9000",
+ encrypt_key="test-kek-32bytes!!!!!!!!!!!!!!!!",
+ admin_ui=True,
+ admin_username="admin",
+ admin_password="secret",
+ )
+
+
+def test_record_request_splits_bucket_and_key() -> None:
+ record_request("GET", "/my-bucket/path/to/file.txt", "GetObject", 200, 0.042, 1024, "10.0.0.1")
+ entries = collectors._request_log.all()
+ assert len(entries) == 1
+ e = entries[0]
+ assert e.bucket == "my-bucket"
+ assert e.key == "path/to/file.txt"
+ assert e.status == 200
+ assert e.duration_ms == pytest.approx(42.0)
+ assert e.client_ip == "10.0.0.1"
+
+
+def test_collect_all_builds_expected_sections(admin_settings) -> None:
+ record_request("PUT", "/customer-data/invoice.pdf", "PutObject", 200, 0.05, 2048, "10.0.0.1")
+ record_request("GET", "/archives/log.gz", "GetObject", 500, 0.1, 0, "10.0.0.2")
+
+ start = time.monotonic() - 120 # 2 minutes
+ data = collectors.collect_all(admin_settings, start_time=start, version="9.9.9")
+
+ assert data["header"]["title"] == "S3 Encryption Proxy"
+ assert data["header"]["status"] == "Running"
+ assert "m" in data["header"]["uptime"]
+
+ assert set(data["cards"].keys()) == {"requests", "data_encrypted", "errors", "active_buckets"}
+ assert data["cards"]["active_buckets"]["value"] == "2"
+
+ ops = [row["operation"] for row in data["activity"]]
+ assert ops == ["GET", "PUT"] # newest first
+ assert data["activity"][0]["status"] == "Error"
+ assert data["activity"][1]["status"] == "Success"
+ assert data["activity"][1]["bucket"] == "customer-data"
+ assert data["activity"][1]["size"] == "2.0 KB"
+
+ bucket_names = {b["name"] for b in data["buckets"]}
+ assert bucket_names == {"customer-data", "archives"}
+
+ assert data["keys"][0]["status"] == "Active"
+ assert data["footer"]["version"] == "9.9.9"
+
+
+def test_render_dashboard_injects_api_url() -> None:
+ html = render_dashboard(admin_path="/ops")
+ assert '"/ops/api/status"' in html
+ assert "__API_URL__" not in html
+
+
+def _make_app(settings: Settings):
+ from fastapi import FastAPI
+
+ app = FastAPI()
+ router = create_admin_router(settings, credentials_store={}, version="1.2.3")
+ app.include_router(router, prefix=settings.admin_path)
+ app.state.settings = settings
+ app.state.start_time = time.monotonic()
+ return app
+
+
+def test_dashboard_requires_auth(admin_settings) -> None:
+ client = TestClient(_make_app(admin_settings))
+ r = client.get("/admin/")
+ assert r.status_code == 401
+ assert r.headers.get("WWW-Authenticate", "").lower().startswith("basic")
+
+
+def test_dashboard_html_served_with_auth(admin_settings) -> None:
+ client = TestClient(_make_app(admin_settings))
+ r = client.get("/admin/", auth=("admin", "secret"))
+ assert r.status_code == 200
+ assert "S3 Encryption Proxy" in r.text
+ assert "Recent Activity" in r.text
+
+
+def test_status_api_returns_expected_shape(admin_settings) -> None:
+ client = TestClient(_make_app(admin_settings))
+ r = client.get("/admin/api/status", auth=("admin", "secret"))
+ assert r.status_code == 200
+ payload = r.json()
+ assert payload["header"]["status"] == "Running"
+ assert payload["footer"]["version"] == "1.2.3"
+ for key in ("requests", "data_encrypted", "errors", "active_buckets"):
+ assert key in payload["cards"]
+
+
+def test_auth_falls_back_to_aws_credentials() -> None:
+ settings = Settings(
+ host="http://localhost:9000",
+ encrypt_key="test-kek",
+ admin_ui=True,
+ )
+ dep = create_auth_dependency(settings, {"AKIAEXAMPLE": "secret-key"})
+ assert callable(dep)
+
+
+def test_auth_raises_when_no_credentials() -> None:
+ settings = Settings(
+ host="http://localhost:9000",
+ encrypt_key="test-kek",
+ admin_ui=True,
+ )
+ with pytest.raises(RuntimeError):
+ create_auth_dependency(settings, {})
+
+
+def test_collector_does_not_crash_on_empty_metrics(admin_settings) -> None:
+ """collect_all must work even before any request has been recorded."""
+ # Ensure we don't blow up on cold start
+ data = collectors.collect_all(admin_settings, start_time=time.monotonic(), version="x")
+ expected = f"{int(collectors._read_labeled_counter_sum(metrics.REQUEST_COUNT)):,}"
+ assert data["cards"]["requests"]["value"] == expected
+ assert data["activity"] == []
+ assert data["buckets"] == []