From 5365e1a8e59a24acb559e53459c18c2324787b75 Mon Sep 17 00:00:00 2001 From: dianaKhortiuk-frontegg Date: Tue, 14 Apr 2026 20:43:45 +0400 Subject: [PATCH] Add comprehensive unit and e2e test suite (193 tests) Covers the entire SDK surface: config, context, authenticator (sync+async), identity mixin (sync+async), token resolvers (JWT + access token), cache services (local + Redis + negative caching), HTTP client, audits client, Flask/FastAPI secure access decorators, and live API integration tests. Test infrastructure includes session-scoped RSA keypair for real RS256 signature verification, vendor session fakes, and singleton resetters. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/conftest.py | 205 +++++++- tests/e2e/__init__.py | 0 tests/e2e/test_fastapi_secure_access.py | 96 ++++ tests/e2e/test_flask_secure_access.py | 90 ++++ tests/e2e/test_live_vendor_auth.py | 497 ++++++++++++++++++ tests/unit/__init__.py | 0 tests/unit/test_access_token_resolver.py | 168 ++++++ tests/unit/test_access_token_services.py | 140 +++++ tests/unit/test_async_identity_mixin.py | 109 ++++ tests/unit/test_audits_client.py | 40 ++ tests/unit/test_cache_access_token_service.py | 177 +++++++ tests/unit/test_exceptions.py | 28 + tests/unit/test_fastapi_frontegg_wrapper.py | 34 ++ tests/unit/test_fastapi_user_model.py | 75 +++ tests/unit/test_flask_frontegg_wrapper.py | 30 ++ tests/unit/test_flask_get_auth_header.py | 40 ++ .../unit/test_frontegg_async_authenticator.py | 38 ++ tests/unit/test_frontegg_authenticator.py | 42 ++ tests/unit/test_frontegg_config.py | 19 + tests/unit/test_frontegg_context.py | 57 ++ tests/unit/test_frontegg_urls.py | 42 ++ tests/unit/test_http_client.py | 57 ++ tests/unit/test_identity_mixin.py | 96 ++++ tests/unit/test_local_cache_manager.py | 41 ++ tests/unit/test_package_utils.py | 14 + tests/unit/test_redis_cache_manager.py | 38 ++ tests/unit/test_retry.py | 45 ++ tests/unit/test_token_resolver.py | 135 +++++ tests/unit/test_token_type_mismatch.py | 87 +++ 29 files changed, 2429 insertions(+), 11 deletions(-) create mode 100644 tests/e2e/__init__.py create mode 100644 tests/e2e/test_fastapi_secure_access.py create mode 100644 tests/e2e/test_flask_secure_access.py create mode 100644 tests/e2e/test_live_vendor_auth.py create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/test_access_token_resolver.py create mode 100644 tests/unit/test_access_token_services.py create mode 100644 tests/unit/test_async_identity_mixin.py create mode 100644 tests/unit/test_audits_client.py create mode 100644 tests/unit/test_cache_access_token_service.py create mode 100644 tests/unit/test_exceptions.py create mode 100644 tests/unit/test_fastapi_frontegg_wrapper.py create mode 100644 tests/unit/test_fastapi_user_model.py create mode 100644 tests/unit/test_flask_frontegg_wrapper.py create mode 100644 tests/unit/test_flask_get_auth_header.py create mode 100644 tests/unit/test_frontegg_async_authenticator.py create mode 100644 tests/unit/test_frontegg_authenticator.py create mode 100644 tests/unit/test_frontegg_config.py create mode 100644 tests/unit/test_frontegg_context.py create mode 100644 tests/unit/test_frontegg_urls.py create mode 100644 tests/unit/test_http_client.py create mode 100644 tests/unit/test_identity_mixin.py create mode 100644 tests/unit/test_local_cache_manager.py create mode 100644 tests/unit/test_package_utils.py create mode 100644 tests/unit/test_redis_cache_manager.py create mode 100644 tests/unit/test_retry.py create mode 100644 tests/unit/test_token_resolver.py create mode 100644 tests/unit/test_token_type_mismatch.py diff --git a/tests/conftest.py b/tests/conftest.py index f2c9aed..06becef 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,22 +1,205 @@ +"""Shared test fixtures for the Frontegg Python SDK test suite. + +Provides: +- An RSA keypair + helpers to mint RS256 JWTs (so tests exercise real signature + verification instead of mocking out PyJWT). +- Fakes for the vendor-token refresh HTTP call so instantiating + ``FronteggAuthenticator``/``FronteggAsyncAuthenticator`` never hits the network. +- Singleton resetters for ``FronteggContext`` and the Flask/FastAPI ``frontegg`` + modules so tests stay isolated. +""" +from __future__ import annotations + +import time +from typing import Any, Dict, Iterator, Optional +from unittest.mock import MagicMock + +import jwt import pytest -from flask import Flask +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + + +# --------------------------------------------------------------------------- +# RSA keypair / JWT helpers +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def rsa_keypair(): + """Generate an RSA keypair once per session; return (private_pem, public_pem).""" + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + private_pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + public_pem = ( + key.public_key() + .public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + .decode() + ) + return private_pem, public_pem + + +@pytest.fixture +def private_key(rsa_keypair) -> str: + return rsa_keypair[0] + + +@pytest.fixture +def public_key(rsa_keypair) -> str: + return rsa_keypair[1] + + +def _default_payload() -> Dict[str, Any]: + now = int(time.time()) + return { + "sub": "user-123", + "tenantId": "tenant-abc", + "applicationId": "app-xyz", + "type": "userToken", + "roles": ["admin"], + "permissions": ["fe.read"], + "email": "user@example.com", + "name": "Test User", + "iat": now, + "exp": now + 3600, + } + + +@pytest.fixture +def make_jwt(private_key): + """Return a factory that signs a JWT with the session RSA key.""" + + def _make(overrides: Optional[Dict[str, Any]] = None, algorithm: str = "RS256") -> str: + payload = _default_payload() + if overrides: + payload.update(overrides) + return jwt.encode(payload, private_key, algorithm=algorithm) + + return _make + + +@pytest.fixture +def valid_jwt(make_jwt) -> str: + return make_jwt() -from frontegg.flask2.flask import Frontegg +# --------------------------------------------------------------------------- +# Vendor token / public key mocking +# --------------------------------------------------------------------------- -def create_app(): - app = Flask('test') - app.config['FRONTEGG_CLIENT_ID'] = 'the-client-id' - app.config['FRONTEGG_API_KEY'] = 'my-api-key' - return app +class _FakeResponse: + def __init__(self, json_data: Any, status_code: int = 200): + self._json = json_data + self.status_code = status_code + self.headers: Dict[str, str] = {} + + def json(self) -> Any: + return self._json + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise RuntimeError(f"HTTP {self.status_code}") + + +@pytest.fixture +def fake_vendor_session(public_key) -> Iterator[MagicMock]: + """Patch ``FronteggAuthenticator.vendor_session_request`` so no real HTTP fires. + + The patched session: + - Responds to POST (vendor auth) with a fake token + 1h expiry. + - Responds to GET (public key fetch) with the session public key. + """ + from frontegg.common import frontegg_authenticator as mod + + mock_session = MagicMock(name="vendor_session_request") + mock_session.post.return_value = _FakeResponse( + {"token": "fake-vendor-token", "expiresIn": 3600} + ) + mock_session.get.return_value = _FakeResponse({"publicKey": public_key}) + mock_session.headers = {} + + original = mod.FronteggAuthenticator.vendor_session_request + mod.FronteggAuthenticator.vendor_session_request = mock_session + try: + yield mock_session + finally: + mod.FronteggAuthenticator.vendor_session_request = original @pytest.fixture -def app(): - return create_app() +def fake_async_vendor_session(public_key): + """Async equivalent: patch ``FronteggAsyncAuthenticator.vendor_session_request``.""" + from frontegg.common import frontegg_async_authenticator as mod + + class _AsyncFake: + def __init__(self): + self.headers: Dict[str, str] = {} + self.post_calls = [] + self.get_calls = [] + + async def post(self, url, json=None, timeout=None): + self.post_calls.append({"url": url, "json": json}) + return _FakeResponse({"token": "fake-vendor-token", "expiresIn": 3600}) + + async def get(self, url, timeout=None): + self.get_calls.append({"url": url}) + return _FakeResponse({"publicKey": public_key}) + + fake = _AsyncFake() + original = mod.FronteggAsyncAuthenticator.vendor_session_request + mod.FronteggAsyncAuthenticator.vendor_session_request = fake + try: + yield fake + finally: + mod.FronteggAsyncAuthenticator.vendor_session_request = original + + +# --------------------------------------------------------------------------- +# Singleton resetters +# --------------------------------------------------------------------------- @pytest.fixture(autouse=True) -def frontegg(app): - return Frontegg(app) +def reset_frontegg_context(): + """``FronteggContext`` is a process-wide singleton; wipe it between tests.""" + from frontegg.common.frontegg_context import FronteggContext + + FronteggContext._instance = None + FronteggContext.options = {} + yield + FronteggContext._instance = None + FronteggContext.options = {} + + +@pytest.fixture +def reset_flask_frontegg(): + """Reset the module-level Flask ``frontegg`` singleton.""" + import frontegg.flask as fe_flask + + saved_auth = fe_flask.frontegg.authenticator + saved_id = fe_flask.frontegg.identity_client + fe_flask.frontegg.authenticator = None + fe_flask.frontegg.identity_client = None + yield fe_flask.frontegg + fe_flask.frontegg.authenticator = saved_auth + fe_flask.frontegg.identity_client = saved_id + + +@pytest.fixture +def reset_fastapi_frontegg(): + import frontegg.fastapi as fe_fastapi + + saved_auth = fe_fastapi.frontegg.async_authenticator + saved_id = fe_fastapi.frontegg.async_identity_client + fe_fastapi.frontegg.async_authenticator = None + fe_fastapi.frontegg.async_identity_client = None + yield fe_fastapi.frontegg + fe_fastapi.frontegg.async_authenticator = saved_auth + fe_fastapi.frontegg.async_identity_client = saved_id diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/test_fastapi_secure_access.py b/tests/e2e/test_fastapi_secure_access.py new file mode 100644 index 0000000..ac8ab5a --- /dev/null +++ b/tests/e2e/test_fastapi_secure_access.py @@ -0,0 +1,96 @@ +"""End-to-end tests for FastAPI ``FronteggSecurity`` dependency. + +Builds a real FastAPI app wired to the async ``frontegg`` singleton with +fake vendor session + real RSA keys. Exercises protected endpoints via +FastAPI's ``TestClient`` (which runs the async handlers synchronously). +""" +import asyncio + +import pytest +from fastapi import Depends, FastAPI +from fastapi.testclient import TestClient + +from frontegg.fastapi import frontegg +from frontegg.fastapi.secure_access import FronteggSecurity, User + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +@pytest.fixture +def fastapi_app(fake_async_vendor_session, reset_fastapi_frontegg): + _run(reset_fastapi_frontegg.init_app("client-id", "api-key")) + + app = FastAPI() + + @app.get("/public") + def public(): + return {"ok": True} + + @app.get("/me") + def me(user: User = Depends(FronteggSecurity())): + return {"sub": user.sub, "tenant": user.tenant_id} + + @app.get("/admin-only") + def admin_only(user: User = Depends(FronteggSecurity(roles=["admin"]))): + return {"ok": True} + + @app.get("/needs-write") + def needs_write( + user: User = Depends(FronteggSecurity(permissions=["fe.write"])) + ): + return {"ok": True} + + return app + + +@pytest.fixture +def client(fastapi_app): + return TestClient(fastapi_app) + + +class TestFastAPISecureAccess: + def test_public_route(self, client): + assert client.get("/public").json() == {"ok": True} + + def test_missing_auth_returns_401(self, client): + assert client.get("/me").status_code == 401 + + def test_invalid_jwt_returns_401(self, client): + r = client.get("/me", headers={"Authorization": "Bearer garbage"}) + assert r.status_code == 401 + + def test_valid_jwt_returns_user(self, client, valid_jwt): + r = client.get( + "/me", headers={"Authorization": "Bearer " + valid_jwt} + ) + assert r.status_code == 200 + assert r.json() == {"sub": "user-123", "tenant": "tenant-abc"} + + def test_role_guard_allows_admin(self, client, valid_jwt): + r = client.get( + "/admin-only", headers={"Authorization": "Bearer " + valid_jwt} + ) + assert r.status_code == 200 + + def test_role_guard_rejects_non_admin(self, client, make_jwt): + token = make_jwt({"roles": ["reader"]}) + r = client.get( + "/admin-only", headers={"Authorization": "Bearer " + token} + ) + assert r.status_code == 403 + + def test_permission_guard_rejects_missing_perm(self, client, make_jwt): + token = make_jwt({"permissions": ["fe.read"]}) + r = client.get( + "/needs-write", headers={"Authorization": "Bearer " + token} + ) + assert r.status_code == 403 + + def test_permission_guard_allows_with_perm(self, client, make_jwt): + token = make_jwt({"permissions": ["fe.write"]}) + r = client.get( + "/needs-write", headers={"Authorization": "Bearer " + token} + ) + assert r.status_code == 200 diff --git a/tests/e2e/test_flask_secure_access.py b/tests/e2e/test_flask_secure_access.py new file mode 100644 index 0000000..324c8dd --- /dev/null +++ b/tests/e2e/test_flask_secure_access.py @@ -0,0 +1,90 @@ +"""End-to-end tests for Flask ``@with_authentication``. + +Builds a real Flask app, wires the ``frontegg`` singleton with a fake vendor +session + real RSA keys, and exercises protected endpoints through the +Flask test client — the same path a real request would travel. +""" +import pytest +from flask import Flask, g, jsonify + +from frontegg.flask import frontegg +from frontegg.flask.secure_access import with_authentication + + +@pytest.fixture +def app(fake_vendor_session, reset_flask_frontegg): + app = Flask("e2e") + reset_flask_frontegg.init_app("client-id", "api-key") + + @app.route("/public") + def public(): + return jsonify({"ok": True}) + + @app.route("/me") + @with_authentication() + def me(): + return jsonify({"sub": g.user["sub"], "tenantId": g.user["tenantId"]}) + + @app.route("/admin-only") + @with_authentication(role_keys=["admin"]) + def admin_only(): + return jsonify({"ok": True}) + + @app.route("/needs-write") + @with_authentication(permission_keys=["fe.write"]) + def needs_write(): + return jsonify({"ok": True}) + + return app + + +@pytest.fixture +def client(app): + return app.test_client() + + +class TestFlaskSecureAccess: + def test_public_route_unchanged(self, client): + r = client.get("/public") + assert r.status_code == 200 + assert r.get_json() == {"ok": True} + + def test_missing_auth_header_returns_401(self, client): + r = client.get("/me") + assert r.status_code == 401 + + def test_invalid_jwt_returns_401(self, client): + r = client.get("/me", headers={"Authorization": "Bearer garbage"}) + assert r.status_code == 401 + + def test_valid_jwt_returns_entity(self, client, valid_jwt): + r = client.get("/me", headers={"Authorization": "Bearer " + valid_jwt}) + assert r.status_code == 200 + assert r.get_json() == {"sub": "user-123", "tenantId": "tenant-abc"} + + def test_role_guard_allows_admin(self, client, valid_jwt): + r = client.get( + "/admin-only", headers={"Authorization": "Bearer " + valid_jwt} + ) + assert r.status_code == 200 + + def test_role_guard_rejects_non_admin(self, client, make_jwt): + token = make_jwt({"roles": ["reader"]}) + r = client.get( + "/admin-only", headers={"Authorization": "Bearer " + token} + ) + assert r.status_code == 403 + + def test_permission_guard_rejects_missing_perm(self, client, make_jwt): + token = make_jwt({"permissions": ["fe.read"]}) + r = client.get( + "/needs-write", headers={"Authorization": "Bearer " + token} + ) + assert r.status_code == 403 + + def test_permission_guard_allows_with_perm(self, client, make_jwt): + token = make_jwt({"permissions": ["fe.write"]}) + r = client.get( + "/needs-write", headers={"Authorization": "Bearer " + token} + ) + assert r.status_code == 200 diff --git a/tests/e2e/test_live_vendor_auth.py b/tests/e2e/test_live_vendor_auth.py new file mode 100644 index 0000000..89b9d1b --- /dev/null +++ b/tests/e2e/test_live_vendor_auth.py @@ -0,0 +1,497 @@ +"""Live e2e tests against the real Frontegg API. + +These tests hit the live Frontegg API — they validate that the SDK's +vendor authentication, public-key fetching, JWT validation, and +framework-level secure access actually work end-to-end with real HTTP. + +Run with: + FRONTEGG_TEST_CLIENT_ID=... FRONTEGG_TEST_API_KEY=... \ + pytest tests/e2e/test_live_vendor_auth.py -v + +Skip when credentials are absent (CI without secrets, local dev, etc.). +""" +from __future__ import annotations + +import asyncio +import os +import time + +import jwt +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +LIVE_CLIENT_ID = os.environ.get("FRONTEGG_TEST_CLIENT_ID") +LIVE_API_KEY = os.environ.get("FRONTEGG_TEST_API_KEY") + +pytestmark = pytest.mark.skipif( + not LIVE_CLIENT_ID or not LIVE_API_KEY, + reason="FRONTEGG_TEST_CLIENT_ID / FRONTEGG_TEST_API_KEY not set", +) + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _get_live_public_key(): + """Fetch the real Frontegg public key via the SDK.""" + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + return identity.fetch_public_key() + + +def _make_fake_jwt(overrides=None): + """Sign a JWT with a random RSA key (NOT Frontegg's) — always invalid.""" + key = rsa.generate_private_key(65537, 2048) + priv = key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode() + now = int(time.time()) + payload = { + "sub": "fake-user", + "tenantId": "fake-tenant", + "applicationId": "fake-app", + "type": "userToken", + "roles": ["admin"], + "permissions": ["fe.read"], + "iat": now, + "exp": now + 3600, + } + if overrides: + payload.update(overrides) + return jwt.encode(payload, priv, algorithm="RS256") + + +# =========================================================================== +# 1. VENDOR AUTHENTICATION +# =========================================================================== + + +class TestLiveSyncAuth: + def test_vendor_token_is_returned(self): + from frontegg.common import FronteggAuthenticator + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + assert auth.access_token is not None + assert len(auth.access_token) > 100 + assert auth.should_refresh_vendor_token is False + + def test_refresh_updates_token(self): + from frontegg.common import FronteggAuthenticator + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + auth.refresh_vendor_token() + assert auth.access_token is not None + assert auth.should_refresh_vendor_token is False + + def test_bad_credentials_raise(self): + from frontegg.common import FronteggAuthenticator + + with pytest.raises(Exception): + FronteggAuthenticator(LIVE_CLIENT_ID, "wrong-api-key") + + def test_bad_client_id_raises(self): + from frontegg.common import FronteggAuthenticator + + with pytest.raises(Exception): + FronteggAuthenticator("00000000-0000-0000-0000-000000000000", LIVE_API_KEY) + + +class TestLiveAsyncAuth: + def _fresh_auth(self, client_id, api_key): + from httpx import AsyncClient as AC + from frontegg.common import FronteggAsyncAuthenticator + + auth = FronteggAsyncAuthenticator(client_id, api_key) + auth.vendor_session_request = AC() # fresh client per test + return auth + + def test_vendor_token_is_returned(self): + auth = self._fresh_auth(LIVE_CLIENT_ID, LIVE_API_KEY) + assert auth.access_token is None + _run(auth.refresh_vendor_token()) + assert auth.access_token is not None + assert len(auth.access_token) > 100 + assert auth.should_refresh_vendor_token is False + + def test_bad_credentials_raise_async(self): + auth = self._fresh_auth(LIVE_CLIENT_ID, "wrong-api-key") + with pytest.raises(Exception): + _run(auth.refresh_vendor_token()) + + +# =========================================================================== +# 2. PUBLIC KEY FETCH +# =========================================================================== + + +class TestLivePublicKeySync: + def test_fetch_public_key(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + public_key = identity.fetch_public_key() + assert public_key.startswith("-----BEGIN PUBLIC KEY-----") + + def test_get_public_key_caches(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + k1 = identity.get_public_key() + k2 = identity.get_public_key() + assert k1 == k2 + + def test_public_key_is_valid_rsa(self): + pk_pem = _get_live_public_key() + from cryptography.hazmat.primitives.serialization import load_pem_public_key + + key = load_pem_public_key(pk_pem.encode()) + assert key.key_size >= 2048 + + +class TestLivePublicKeyAsync: + def test_fetch_public_key_async(self): + from httpx import AsyncClient as AC + from frontegg.common import FronteggAsyncAuthenticator, IdentityAsyncClientMixin + + async def _test(): + auth = FronteggAsyncAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + auth.vendor_session_request = AC() + await auth.refresh_vendor_token() + identity = IdentityAsyncClientMixin(auth) + return await identity.fetch_public_key() + + public_key = _run(_test()) + assert public_key.startswith("-----BEGIN PUBLIC KEY-----") + + +# =========================================================================== +# 3. IDENTITY VALIDATION WITH LIVE PUBLIC KEY +# =========================================================================== + + +class TestLiveIdentityValidation: + """Validate that the SDK correctly rejects forged tokens when using + the real Frontegg public key.""" + + def test_fake_jwt_rejected_by_live_public_key(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + from frontegg.helpers.exceptions import UnauthenticatedException + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + fake_token = _make_fake_jwt() + + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token( + "Bearer " + fake_token, None, "JWT" + ) + + def test_expired_fake_jwt_rejected(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + from frontegg.helpers.exceptions import UnauthenticatedException + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + expired = _make_fake_jwt({"exp": int(time.time()) - 3600}) + + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token( + "Bearer " + expired, None, "JWT" + ) + + def test_garbage_token_rejected(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + from frontegg.helpers.exceptions import UnauthenticatedException + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token( + "Bearer not.a.real.jwt.at.all", None, "JWT" + ) + + def test_empty_bearer_rejected(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + from frontegg.helpers.exceptions import UnauthenticatedException + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token("Bearer ", None, "JWT") + + def test_decode_jwt_rejects_missing_header(self): + from frontegg.common import FronteggAuthenticator, IdentityClientMixin + + auth = FronteggAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + identity = IdentityClientMixin(auth) + + with pytest.raises(Exception): + identity.decode_jwt(None) + + +# =========================================================================== +# 4. FLASK E2E WITH LIVE VENDOR TOKEN + REJECTION OF FORGED JWTs +# =========================================================================== + + +class TestLiveFlaskSecureAccess: + @pytest.fixture + def flask_client(self): + from flask import Flask, g, jsonify + from frontegg.flask.frontegg import Frontegg + from frontegg.flask.secure_access import with_authentication + + fe = Frontegg() + fe.init_app(LIVE_CLIENT_ID, LIVE_API_KEY) + app = Flask("live-flask-e2e") + + @app.route("/public") + def public(): + return jsonify({"ok": True}) + + @app.route("/protected") + @with_authentication() + def protected(): + return jsonify({"sub": g.user["sub"]}) + + @app.route("/admin-only") + @with_authentication(role_keys=["admin"]) + def admin_only(): + return jsonify({"ok": True}) + + # Monkey-patch the module-level frontegg used by with_authentication + import frontegg.flask as flask_mod + old = flask_mod.frontegg + flask_mod.frontegg = fe + yield app.test_client() + flask_mod.frontegg = old + + def test_public_endpoint_works(self, flask_client): + assert flask_client.get("/public").status_code == 200 + + def test_no_header_returns_401(self, flask_client): + assert flask_client.get("/protected").status_code == 401 + + def test_forged_jwt_returns_401(self, flask_client): + token = _make_fake_jwt() + r = flask_client.get( + "/protected", headers={"Authorization": f"Bearer {token}"} + ) + assert r.status_code == 401 + + def test_garbage_token_returns_401(self, flask_client): + r = flask_client.get( + "/protected", + headers={"Authorization": "Bearer xyz.abc.def"}, + ) + assert r.status_code == 401 + + def test_forged_admin_token_returns_401(self, flask_client): + token = _make_fake_jwt({"roles": ["admin"]}) + r = flask_client.get( + "/admin-only", headers={"Authorization": f"Bearer {token}"} + ) + assert r.status_code == 401 # signature mismatch, not 403 + + +# =========================================================================== +# 5. FASTAPI E2E WITH LIVE VENDOR TOKEN + REJECTION OF FORGED JWTs +# =========================================================================== + + +class TestLiveFastAPISecureAccess: + @pytest.fixture + def fastapi_client(self): + from httpx import AsyncClient as AC + from fastapi import Depends, FastAPI + from fastapi.testclient import TestClient + from frontegg.fastapi.frontegg import Frontegg + from frontegg.fastapi.secure_access import FronteggSecurity, User + + fe = Frontegg() + fe.async_authenticator = None + fe.async_identity_client = None + + async def _init(): + from frontegg.common import FronteggAsyncAuthenticator, IdentityAsyncClientMixin + from frontegg.common.frontegg_context import FronteggContext + FronteggContext.init({}) + auth = FronteggAsyncAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + auth.vendor_session_request = AC() + await auth.refresh_vendor_token() + fe.async_authenticator = auth + fe.async_identity_client = IdentityAsyncClientMixin(auth) + + _run(_init()) + app = FastAPI() + + @app.get("/public") + def public(): + return {"ok": True} + + @app.get("/protected") + def protected(user: User = Depends(FronteggSecurity())): + return {"sub": user.sub} + + @app.get("/admin-only") + def admin_only(user: User = Depends(FronteggSecurity(roles=["admin"]))): + return {"ok": True} + + import frontegg.fastapi as fastapi_mod + old = fastapi_mod.frontegg + fastapi_mod.frontegg = fe + yield TestClient(app) + fastapi_mod.frontegg = old + + def test_public_endpoint_works(self, fastapi_client): + assert fastapi_client.get("/public").status_code == 200 + + def test_no_header_returns_401(self, fastapi_client): + assert fastapi_client.get("/protected").status_code == 401 + + def test_forged_jwt_returns_401(self, fastapi_client): + token = _make_fake_jwt() + r = fastapi_client.get( + "/protected", headers={"Authorization": f"Bearer {token}"} + ) + assert r.status_code == 401 + + def test_garbage_token_returns_401(self, fastapi_client): + r = fastapi_client.get( + "/protected", + headers={"Authorization": "Bearer xyz.abc.def"}, + ) + assert r.status_code == 401 + + def test_forged_admin_token_returns_401(self, fastapi_client): + token = _make_fake_jwt({"roles": ["admin"]}) + r = fastapi_client.get( + "/admin-only", headers={"Authorization": f"Bearer {token}"} + ) + assert r.status_code == 401 + + +# =========================================================================== +# 6. SDK WRAPPER INIT +# =========================================================================== + + +class TestLiveFlaskSDKInit: + def test_init_app_and_get_public_key(self): + from frontegg.flask.frontegg import Frontegg + + fe = Frontegg() + fe.init_app(LIVE_CLIENT_ID, LIVE_API_KEY) + assert fe.access_token is not None + pk = fe.get_public_key() + assert pk.startswith("-----BEGIN PUBLIC KEY-----") + + +class TestLiveFastAPISDKInit: + def test_init_app_and_get_public_key(self): + from httpx import AsyncClient as AC + from frontegg.common import FronteggAsyncAuthenticator, IdentityAsyncClientMixin + from frontegg.common.frontegg_context import FronteggContext + from frontegg.fastapi.frontegg import Frontegg + + fe = Frontegg() + + async def _init(): + FronteggContext.init({}) + auth = FronteggAsyncAuthenticator(LIVE_CLIENT_ID, LIVE_API_KEY) + auth.vendor_session_request = AC() + await auth.refresh_vendor_token() + fe.async_authenticator = auth + fe.async_identity_client = IdentityAsyncClientMixin(auth) + + _run(_init()) + assert fe.access_token is not None + pk = _run(fe.get_public_key()) + assert pk.startswith("-----BEGIN PUBLIC KEY-----") + + +# =========================================================================== +# 7. HTTP CLIENT +# =========================================================================== + + +class TestLiveHttpClient: + def test_get_vendor_config(self): + from frontegg.common.clients.http_client import HttpClient + + client = HttpClient( + LIVE_CLIENT_ID, + LIVE_API_KEY, + "https://api.frontegg.com/identity/", + ) + resp = client.get("resources/configurations/v1/") + assert resp.status_code == 200 + assert "publicKey" in resp.json() + + def test_get_with_invalid_endpoint_returns_404(self): + from frontegg.common.clients.http_client import HttpClient + + client = HttpClient( + LIVE_CLIENT_ID, + LIVE_API_KEY, + "https://api.frontegg.com/identity/", + ) + resp = client.get("resources/does-not-exist/") + assert resp.status_code in (404, 403) + + def test_post_audits_endpoint(self): + from frontegg.common.clients.http_client import HttpClient + + client = HttpClient( + LIVE_CLIENT_ID, + LIVE_API_KEY, + "https://api.frontegg.com/audits/", + ) + resp = client.post( + {"severity": "Info", "action": "sdk-e2e-test"}, + tenant_id="test-tenant-id", + ) + # Audit post may succeed or fail depending on tenant config, + # but it must not crash the SDK. + assert resp.status_code in (200, 201, 202, 400, 403, 404) + + +class TestLiveAsyncHttpClient: + def test_get_vendor_config_async(self): + from httpx import AsyncClient as AC + from frontegg.common.clients.async_http_client import HttpAsyncClient + + client = HttpAsyncClient( + LIVE_CLIENT_ID, + LIVE_API_KEY, + "https://api.frontegg.com/identity/", + ) + client.vendor_session_request = AC() + client.client = AC() + + async def _test(): + await client.refresh_vendor_token() + return await client.get("resources/configurations/v1/") + + resp = _run(_test()) + assert resp.status_code == 200 + assert "publicKey" in resp.json() diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_access_token_resolver.py b/tests/unit/test_access_token_resolver.py new file mode 100644 index 0000000..1184ff8 --- /dev/null +++ b/tests/unit/test_access_token_resolver.py @@ -0,0 +1,168 @@ +"""Unit tests for ``AccessTokenResolver`` — mirrors nodejs-sdk access-token-resolver.spec.ts. + +Covers: +- shouldHandle routing (AccessToken yes, JWT no) +- RS256 JWT verification against public key +- Active token ID validation (rejects revoked tokens) +- Service routing based on token type (User vs Tenant) +- Role/permission fetching via get_entity when options require it +- UnauthenticatedException when no service matches token type +- Token not in active IDs → UnauthenticatedException +""" +from unittest.mock import MagicMock, patch + +import pytest + +from frontegg.common.clients.types import AuthHeaderType, TokenTypes +from frontegg.helpers.exceptions import UnauthenticatedException, UnauthorizedException + + +@pytest.fixture +def resolver(fake_vendor_session): + """Build an ``AccessTokenResolver`` with mocked access token services.""" + from frontegg.common import FronteggAuthenticator + from frontegg.common.clients.token_resolvers.access_token_resolver import AccessTokenResolver + + auth = FronteggAuthenticator("c", "s") + resolver = AccessTokenResolver(auth) + return resolver + + +def _make_mock_service(token_type, entity_data=None, active_ids=None): + svc = MagicMock() + svc.should_handle = lambda t: t == token_type + svc.get_entity.return_value = entity_data or {} + svc.get_active_access_token_ids.return_value = active_ids or [] + return svc + + +class TestAccessTokenResolverShouldHandle: + def test_handles_access_token(self, resolver): + assert resolver.should_handle(AuthHeaderType.AccessToken.value) is True + + def test_does_not_handle_jwt(self, resolver): + assert resolver.should_handle(AuthHeaderType.JWT.value) is False + + +class TestAccessTokenResolverValidateToken: + def test_valid_token_active_id_passes(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.UserAccessToken.value, "sub": "tok-123"}) + svc = _make_mock_service( + TokenTypes.UserAccessToken.value, active_ids=["tok-123"] + ) + resolver._AccessTokenResolver__access_token_services = [svc] + + entity = resolver.validate_token(token, public_key) + assert entity["sub"] == "tok-123" + svc.get_active_access_token_ids.assert_called_once() + + def test_token_not_in_active_ids_raises(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.UserAccessToken.value, "sub": "tok-123"}) + svc = _make_mock_service( + TokenTypes.UserAccessToken.value, active_ids=["other-tok"] + ) + resolver._AccessTokenResolver__access_token_services = [svc] + + with pytest.raises(UnauthenticatedException): + resolver.validate_token(token, public_key) + + def test_fetches_entity_roles_when_roles_required(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.TenantAccessToken.value, "sub": "tok-456"}) + entity_with_roles = { + "sub": "tok-456", + "tenantId": "t", + "type": TokenTypes.TenantAccessToken.value, + "roles": ["admin"], + "permissions": ["read"], + } + svc = _make_mock_service( + TokenTypes.TenantAccessToken.value, + entity_data=entity_with_roles, + active_ids=["tok-456"], + ) + resolver._AccessTokenResolver__access_token_services = [svc] + + result = resolver.validate_token( + token, public_key, options={"roles": ["admin"], "permissions": None} + ) + svc.get_entity.assert_called_once() + assert result["roles"] == ["admin"] + + def test_fetches_entity_when_permissions_required(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.UserAccessToken.value, "sub": "tok-789"}) + entity_with_roles = { + "sub": "tok-789", + "roles": [], + "permissions": ["write"], + } + svc = _make_mock_service( + TokenTypes.UserAccessToken.value, + entity_data=entity_with_roles, + ) + resolver._AccessTokenResolver__access_token_services = [svc] + + result = resolver.validate_token( + token, public_key, options={"permissions": ["write"], "roles": None} + ) + svc.get_entity.assert_called_once() + + def test_insufficient_role_raises_unauthorized(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.TenantAccessToken.value, "sub": "tok-x"}) + entity_with_roles = { + "sub": "tok-x", + "roles": ["viewer"], + "permissions": [], + } + svc = _make_mock_service( + TokenTypes.TenantAccessToken.value, + entity_data=entity_with_roles, + ) + resolver._AccessTokenResolver__access_token_services = [svc] + + with pytest.raises(UnauthorizedException): + resolver.validate_token( + token, public_key, options={"roles": ["admin"], "permissions": None} + ) + + def test_routes_to_correct_service_by_token_type(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.TenantAccessToken.value, "sub": "tok-t"}) + + tenant_svc = _make_mock_service( + TokenTypes.TenantAccessToken.value, active_ids=["tok-t"] + ) + user_svc = _make_mock_service( + TokenTypes.UserAccessToken.value, active_ids=[] + ) + resolver._AccessTokenResolver__access_token_services = [user_svc, tenant_svc] + + resolver.validate_token(token, public_key) + tenant_svc.get_active_access_token_ids.assert_called_once() + user_svc.get_active_access_token_ids.assert_not_called() + + def test_no_matching_service_raises(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.TenantAccessToken.value, "sub": "tok-t"}) + no_match = MagicMock() + no_match.should_handle = lambda t: False + resolver._AccessTokenResolver__access_token_services = [no_match] + + with pytest.raises(UnauthenticatedException): + resolver.validate_token(token, public_key) + + def test_invalid_signature_raises(self, resolver, public_key): + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + import jwt + + bad_key = rsa.generate_private_key(65537, 2048) + bad_pem = bad_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode() + token = jwt.encode( + {"sub": "x", "type": TokenTypes.UserAccessToken.value}, + bad_pem, + algorithm="RS256", + ) + with pytest.raises(UnauthenticatedException): + resolver.validate_token(token, public_key) diff --git a/tests/unit/test_access_token_services.py b/tests/unit/test_access_token_services.py new file mode 100644 index 0000000..5329468 --- /dev/null +++ b/tests/unit/test_access_token_services.py @@ -0,0 +1,140 @@ +"""Unit tests for ``UserAccessTokenService`` and ``TenantAccessTokenService``. + +Mirrors nodejs-sdk user-access-token.service.spec.ts and +tenant-access-token.service.spec.ts. + +Covers: +- shouldHandle routing per token type +- getEntity calls correct identity endpoint +- getActiveAccessTokenIds calls correct endpoint +- "Api tokens are disabled" 403 error → UnauthenticatedException +""" +from unittest.mock import MagicMock + +import pytest + +from frontegg.common.clients.types import TokenTypes +from frontegg.helpers.exceptions import UnauthenticatedException + + +def _mock_http_client(): + return MagicMock(name="HttpClient") + + +def _make_response(json_data=None, status_code=200, text=None): + resp = MagicMock() + resp.json.return_value = json_data or {} + resp.status_code = status_code + resp.raise_for_status.return_value = None + resp.text = text or "" + return resp + + +def _make_403_api_tokens_disabled(): + import json + resp = MagicMock() + resp.status_code = 403 + resp.text = json.dumps({"errors": ["Api tokens are disabled"]}) + resp.response = resp # for nested attribute access in __is_api_tokens_disabled + + exc = Exception("HTTP 403") + exc.response = resp + resp.raise_for_status.side_effect = exc + return resp, exc + + +# ========================================================================= +# UserAccessTokenService +# ========================================================================= + +class TestUserAccessTokenService: + @pytest.fixture + def service(self): + from frontegg.common.clients.token_resolvers.access_token_services.services.user_access_token_service import ( + UserAccessTokenService, + ) + http = _mock_http_client() + return UserAccessTokenService(http), http + + def test_should_handle_user_access_token(self, service): + svc, _ = service + assert svc.should_handle(TokenTypes.UserAccessToken.value) is True + assert svc.should_handle(TokenTypes.TenantAccessToken.value) is False + + def test_get_entity_from_identity_calls_correct_endpoint(self, service): + svc, http = service + http.get.return_value = _make_response( + {"roles": ["viewer"], "permissions": ["read"]} + ) + entity = {"sub": "tok-u1", "tenantId": "t"} + result = svc.get_entity_from_identity(entity) + + url_called = http.get.call_args[0][0] + assert "users/access-tokens/v1/" in url_called + assert result["roles"] == ["viewer"] + assert result["permissions"] == ["read"] + + def test_get_active_token_ids_calls_correct_endpoint(self, service): + svc, http = service + http.get.return_value = _make_response(["id-1", "id-2"]) + result = svc.get_active_access_token_ids_from_identity() + + url_called = http.get.call_args[0][0] + assert "users/access-tokens/v1/active" in url_called + assert result == ["id-1", "id-2"] + + def test_api_tokens_disabled_raises_unauthenticated(self, service): + svc, http = service + _, exc = _make_403_api_tokens_disabled() + http.get.side_effect = exc + + with pytest.raises(UnauthenticatedException): + svc.get_entity({"sub": "tok-u"}) + + +# ========================================================================= +# TenantAccessTokenService +# ========================================================================= + +class TestTenantAccessTokenService: + @pytest.fixture + def service(self): + from frontegg.common.clients.token_resolvers.access_token_services.services.tenant_access_token_service import ( + TenantAccessTokenService, + ) + http = _mock_http_client() + return TenantAccessTokenService(http), http + + def test_should_handle_tenant_access_token(self, service): + svc, _ = service + assert svc.should_handle(TokenTypes.TenantAccessToken.value) is True + assert svc.should_handle(TokenTypes.UserAccessToken.value) is False + + def test_get_entity_from_identity_calls_correct_endpoint(self, service): + svc, http = service + http.get.return_value = _make_response( + {"roles": ["admin"], "permissions": ["write"]} + ) + entity = {"sub": "tok-t1", "tenantId": "t"} + result = svc.get_entity_from_identity(entity) + + url_called = http.get.call_args[0][0] + assert "tenants/access-tokens/v1/" in url_called + assert result["roles"] == ["admin"] + + def test_get_active_token_ids_calls_correct_endpoint(self, service): + svc, http = service + http.get.return_value = _make_response(["tid-1", "tid-2"]) + result = svc.get_active_access_token_ids_from_identity() + + url_called = http.get.call_args[0][0] + assert "tenants/access-tokens/v1/active" in url_called + assert result == ["tid-1", "tid-2"] + + def test_api_tokens_disabled_raises_unauthenticated(self, service): + svc, http = service + _, exc = _make_403_api_tokens_disabled() + http.get.side_effect = exc + + with pytest.raises(UnauthenticatedException): + svc.get_entity({"sub": "tok-t"}) diff --git a/tests/unit/test_async_identity_mixin.py b/tests/unit/test_async_identity_mixin.py new file mode 100644 index 0000000..28d1b0d --- /dev/null +++ b/tests/unit/test_async_identity_mixin.py @@ -0,0 +1,109 @@ +"""Unit tests for ``IdentityAsyncClientMixin``. + +Mirrors the sync identity_mixin tests but exercises the async path: +public key fetch, caching, token validation, and JWT decode. +""" +import asyncio + +import jwt +import pytest + +from frontegg.common import FronteggAsyncAuthenticator, IdentityAsyncClientMixin +from frontegg.common.clients.types import AuthHeaderType, TokenTypes +from frontegg.helpers.exceptions import UnauthenticatedException + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +@pytest.fixture +def identity(fake_async_vendor_session): + auth = FronteggAsyncAuthenticator("c", "s") + _run(auth.refresh_vendor_token()) + return IdentityAsyncClientMixin(auth) + + +class TestAsyncFetchPublicKey: + def test_fetches_public_key(self, identity, public_key): + key = _run(identity.fetch_public_key()) + assert key == public_key + + def test_caches_public_key(self, identity, fake_async_vendor_session): + _run(identity.get_public_key()) + _run(identity.get_public_key()) + _run(identity.get_public_key()) + # Only one GET call + assert len(fake_async_vendor_session.get_calls) == 1 + + +class TestAsyncValidateIdentityOnToken: + def test_valid_jwt_returns_entity(self, identity, valid_jwt): + entity = _run( + identity.validate_identity_on_token( + "Bearer " + valid_jwt, None, AuthHeaderType.JWT.value + ) + ) + assert entity["sub"] == "user-123" + assert entity["tenantId"] == "tenant-abc" + + def test_strips_bearer_prefix(self, identity, valid_jwt): + a = _run( + identity.validate_identity_on_token( + "Bearer " + valid_jwt, None, AuthHeaderType.JWT.value + ) + ) + b = _run( + identity.validate_identity_on_token( + valid_jwt, None, AuthHeaderType.JWT.value + ) + ) + assert a == b + + def test_invalid_token_raises_unauthenticated(self, identity): + with pytest.raises(UnauthenticatedException): + _run( + identity.validate_identity_on_token( + "Bearer garbage", None, AuthHeaderType.JWT.value + ) + ) + + def test_unknown_resolver_type_raises(self, identity, valid_jwt): + with pytest.raises(UnauthenticatedException): + _run( + identity.validate_identity_on_token( + valid_jwt, None, "NOPE" + ) + ) + + def test_role_enforcement(self, identity, make_jwt): + from frontegg.helpers.exceptions import UnauthorizedException + + token = make_jwt({"roles": ["reader"]}) + with pytest.raises(UnauthorizedException): + _run( + identity.validate_identity_on_token( + "Bearer " + token, + {"roles": ["admin"], "permissions": None}, + AuthHeaderType.JWT.value, + ) + ) + + +class TestAsyncDecodeJwt: + def test_missing_header_raises(self, identity): + with pytest.raises(jwt.InvalidTokenError): + _run(identity.decode_jwt(None)) + + def test_decodes_valid_jwt(self, identity, valid_jwt): + decoded = _run(identity.decode_jwt("Bearer " + valid_jwt)) + assert decoded["sub"] == "user-123" + + def test_decode_without_verification(self, identity, make_jwt): + token = make_jwt() + decoded = _run(identity.decode_jwt("Bearer " + token, verify=False)) + assert decoded["sub"] == "user-123" diff --git a/tests/unit/test_audits_client.py b/tests/unit/test_audits_client.py new file mode 100644 index 0000000..f32d91f --- /dev/null +++ b/tests/unit/test_audits_client.py @@ -0,0 +1,40 @@ +"""Unit tests for ``AuditsClient``.""" +from unittest.mock import MagicMock + +import pytest + +from frontegg.common.clients.audits_client import AuditsClient, Severity + + +class TestSeverityEnum: + def test_values(self): + assert Severity.INFO.value == "Info" + assert Severity.CRITICAL.value == "Critical" + assert Severity.ERROR.value == "Error" + + +class TestAuditsClient: + def test_send_audit_posts_and_returns_json(self): + http = MagicMock() + response = MagicMock() + response.json.return_value = {"id": "aud-1", "severity": "Info"} + http.post.return_value = response + + client = AuditsClient(http) + result = client.send_audit( + {"severity": "Info", "username": "u"}, tenant_id="tenant-1" + ) + + http.post.assert_called_once_with( + data={"severity": "Info", "username": "u"}, tenant_id="tenant-1" + ) + assert result == {"id": "aud-1", "severity": "Info"} + + def test_send_audit_raises_on_http_error(self): + http = MagicMock() + response = MagicMock() + response.raise_for_status.side_effect = RuntimeError("boom") + http.post.return_value = response + + with pytest.raises(RuntimeError, match="boom"): + AuditsClient(http).send_audit({"severity": "Info"}, tenant_id="t") diff --git a/tests/unit/test_cache_access_token_service.py b/tests/unit/test_cache_access_token_service.py new file mode 100644 index 0000000..aa49348 --- /dev/null +++ b/tests/unit/test_cache_access_token_service.py @@ -0,0 +1,177 @@ +"""Unit tests for ``CacheAccessTokenService`` variants — mirrors +nodejs-sdk cache-access-token.service.spec.ts. + +Covers: +- Cache hit returns cached value without calling inner service +- Cache miss calls inner service and caches result with 10s TTL +- Negative caching: ``{empty: True}`` sentinel on UnauthenticatedException +- Cached sentinel raises UnauthenticatedException +- Non-auth errors are NOT cached +- Active token ID caching (hit/miss/empty on error) +- Both CacheUserAccessTokenService and CacheTenantAccessTokenService +""" +from unittest.mock import MagicMock + +import pytest + +from frontegg.common.cache.local_cache_manager import LocalCacheManager +from frontegg.common.clients.types import TokenTypes +from frontegg.helpers.exceptions import UnauthenticatedException + + +def _build_service(cls, inner_mock): + entity_cache = LocalCacheManager() + ids_cache = LocalCacheManager() + service = cls(entity_cache, ids_cache, inner_mock) + return service, entity_cache, ids_cache + + +def _inner(): + m = MagicMock() + m.type = None + return m + + +# ========================================================================= +# CacheTenantAccessTokenService +# ========================================================================= + + +class TestCacheTenantAccessTokenService: + @pytest.fixture + def setup(self): + from frontegg.common.clients.token_resolvers.access_token_services.cache_services.cache_tenant_access_token_service import ( + CacheTenantAccessTokenService, + ) + + inner = _inner() + svc, e_cache, id_cache = _build_service(CacheTenantAccessTokenService, inner) + return svc, inner, e_cache, id_cache + + def test_should_handle_tenant(self, setup): + svc, *_ = setup + assert svc.should_handle(TokenTypes.TenantAccessToken.value) is True + assert svc.should_handle(TokenTypes.UserAccessToken.value) is False + + def test_get_entity_cache_hit_skips_inner(self, setup): + svc, inner, e_cache, _ = setup + entity = {"sub": "tok-1", "tenantId": "t"} + e_cache.set( + "frontegg_sdk_v1_tenant_access_tokens_tok-1", + {"sub": "tok-1", "roles": ["admin"], "permissions": ["r"]}, + ) + result = svc.get_entity(entity) + assert result["roles"] == ["admin"] + inner.get_entity.assert_not_called() + + def test_get_entity_cache_miss_calls_inner_and_caches(self, setup): + svc, inner, e_cache, _ = setup + entity_data = {"sub": "tok-2", "roles": ["v"], "permissions": ["r"]} + inner.get_entity.return_value = entity_data + entity = {"sub": "tok-2", "tenantId": "t"} + + result = svc.get_entity(entity) + assert result == entity_data + inner.get_entity.assert_called_once_with(entity) + # Verify it was cached + cached = e_cache.get("frontegg_sdk_v1_tenant_access_tokens_tok-2") + assert cached == entity_data + + def test_get_entity_caches_empty_on_unauth(self, setup): + svc, inner, e_cache, _ = setup + inner.get_entity.side_effect = UnauthenticatedException() + entity = {"sub": "tok-3", "tenantId": "t"} + + # The Python implementation doesn't re-raise — it returns None. + # (Unlike the Node SDK which throws.) We test what the code does. + svc.get_entity(entity) + cached = e_cache.get("frontegg_sdk_v1_tenant_access_tokens_tok-3") + assert cached == {"empty": True} + + def test_get_entity_raises_for_cached_empty_sentinel(self, setup): + svc, inner, e_cache, _ = setup + e_cache.set( + "frontegg_sdk_v1_tenant_access_tokens_tok-4", {"empty": True} + ) + with pytest.raises(UnauthenticatedException): + svc.get_entity({"sub": "tok-4", "tenantId": "t"}) + inner.get_entity.assert_not_called() + + def test_get_active_ids_cache_hit_skips_inner(self, setup): + svc, inner, _, id_cache = setup + id_cache.set("frontegg_sdk_v1_tenant_access_tokens_ids", ["id-1", "id-2"]) + result = svc.get_active_access_token_ids() + assert result == ["id-1", "id-2"] + inner.get_active_access_token_ids.assert_not_called() + + def test_get_active_ids_cache_miss_calls_inner(self, setup): + svc, inner, _, id_cache = setup + inner.get_active_access_token_ids.return_value = ["id-3"] + result = svc.get_active_access_token_ids() + assert result == ["id-3"] + inner.get_active_access_token_ids.assert_called_once() + assert id_cache.get("frontegg_sdk_v1_tenant_access_tokens_ids") == ["id-3"] + + def test_get_active_ids_caches_empty_on_unauth(self, setup): + svc, inner, _, id_cache = setup + inner.get_active_access_token_ids.side_effect = UnauthenticatedException() + result = svc.get_active_access_token_ids() + assert result == [] + assert id_cache.get("frontegg_sdk_v1_tenant_access_tokens_ids") == [] + + +# ========================================================================= +# CacheUserAccessTokenService +# ========================================================================= + + +class TestCacheUserAccessTokenService: + @pytest.fixture + def setup(self): + from frontegg.common.clients.token_resolvers.access_token_services.cache_services.cache_user_access_token_service import ( + CacheUserAccessTokenService, + ) + + inner = _inner() + svc, e_cache, id_cache = _build_service(CacheUserAccessTokenService, inner) + return svc, inner, e_cache, id_cache + + def test_should_handle_user(self, setup): + svc, *_ = setup + assert svc.should_handle(TokenTypes.UserAccessToken.value) is True + assert svc.should_handle(TokenTypes.TenantAccessToken.value) is False + + def test_get_entity_cache_hit(self, setup): + svc, inner, e_cache, _ = setup + e_cache.set( + "frontegg_sdk_v1_user_access_tokens_tok-u1", + {"sub": "tok-u1", "roles": ["r"], "permissions": ["p"]}, + ) + result = svc.get_entity({"sub": "tok-u1"}) + assert result["roles"] == ["r"] + inner.get_entity.assert_not_called() + + def test_get_entity_cache_miss(self, setup): + svc, inner, e_cache, _ = setup + inner.get_entity.return_value = {"sub": "tok-u2", "roles": [], "permissions": []} + result = svc.get_entity({"sub": "tok-u2"}) + inner.get_entity.assert_called_once() + assert e_cache.get("frontegg_sdk_v1_user_access_tokens_tok-u2") is not None + + def test_cached_empty_sentinel_raises(self, setup): + svc, inner, e_cache, _ = setup + e_cache.set("frontegg_sdk_v1_user_access_tokens_tok-u3", {"empty": True}) + with pytest.raises(UnauthenticatedException): + svc.get_entity({"sub": "tok-u3"}) + + def test_active_ids_cache_hit(self, setup): + svc, inner, _, id_cache = setup + id_cache.set("frontegg_sdk_v1_user_access_tokens_ids", ["u-id-1"]) + assert svc.get_active_access_token_ids() == ["u-id-1"] + inner.get_active_access_token_ids.assert_not_called() + + def test_active_ids_cache_miss(self, setup): + svc, inner, _, id_cache = setup + inner.get_active_access_token_ids.return_value = ["u-id-2"] + assert svc.get_active_access_token_ids() == ["u-id-2"] + inner.get_active_access_token_ids.assert_called_once() diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py new file mode 100644 index 0000000..f195bd4 --- /dev/null +++ b/tests/unit/test_exceptions.py @@ -0,0 +1,28 @@ +"""Unit tests for the ``frontegg.helpers.exceptions`` hierarchy.""" +from frontegg.helpers.exceptions import ( + HttpException, + UnauthenticatedException, + UnauthorizedException, +) + + +class TestExceptions: + def test_http_exception_stores_fields(self): + exc = HttpException("boom", 500, headers={"X-Foo": "1"}) + assert exc.content == "boom" + assert exc.status_code == 500 + assert exc.headers == {"X-Foo": "1"} + + def test_unauthenticated_defaults(self): + exc = UnauthenticatedException() + assert exc.status_code == 401 + assert exc.content == "Unauthenticated" + + def test_unauthorized_defaults(self): + exc = UnauthorizedException() + assert exc.status_code == 403 + assert exc.content == "Unauthorized" + + def test_inheritance(self): + assert isinstance(UnauthenticatedException(), HttpException) + assert isinstance(UnauthorizedException(), HttpException) diff --git a/tests/unit/test_fastapi_frontegg_wrapper.py b/tests/unit/test_fastapi_frontegg_wrapper.py new file mode 100644 index 0000000..e848b81 --- /dev/null +++ b/tests/unit/test_fastapi_frontegg_wrapper.py @@ -0,0 +1,34 @@ +"""Unit tests for the FastAPI-side ``Frontegg`` wrapper (async).""" +import asyncio + + +def run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +class TestFastAPIFronteggWrapper: + def test_init_app_creates_async_authenticator( + self, fake_async_vendor_session, reset_fastapi_frontegg + ): + run(reset_fastapi_frontegg.init_app("client", "secret")) + assert reset_fastapi_frontegg.async_authenticator is not None + assert reset_fastapi_frontegg.async_identity_client is not None + assert reset_fastapi_frontegg.access_token == "fake-vendor-token" + + def test_validate_identity_on_token_async( + self, fake_async_vendor_session, reset_fastapi_frontegg, valid_jwt + ): + run(reset_fastapi_frontegg.init_app("c", "s")) + entity = run( + reset_fastapi_frontegg.validate_identity_on_token( + "Bearer " + valid_jwt, None, "JWT" + ) + ) + assert entity["sub"] == "user-123" + + def test_decode_jwt_async( + self, fake_async_vendor_session, reset_fastapi_frontegg, valid_jwt + ): + run(reset_fastapi_frontegg.init_app("c", "s")) + decoded = run(reset_fastapi_frontegg.decode_jwt("Bearer " + valid_jwt)) + assert decoded["tenantId"] == "tenant-abc" diff --git a/tests/unit/test_fastapi_user_model.py b/tests/unit/test_fastapi_user_model.py new file mode 100644 index 0000000..eac15ba --- /dev/null +++ b/tests/unit/test_fastapi_user_model.py @@ -0,0 +1,75 @@ +"""Unit tests for the FastAPI ``User`` Pydantic model and ``get_auth_header``.""" +from unittest.mock import MagicMock + +from frontegg.fastapi.secure_access.frontegg_security import ( + TokenType, + User, + get_auth_header, +) + + +def _user(**overrides): + base = { + "sub": "user-123", + "tenantId": "tenant-abc", + "applicationId": "app-xyz", + "type": "userToken", + "roles": ["admin"], + "permissions": ["fe.read"], + "access_token": "t", + } + base.update(overrides) + return User(**base) + + +class TestUserModel: + def test_builds_from_minimal_claims(self): + u = _user() + assert u.sub == "user-123" + assert u.tenant_id == "tenant-abc" + assert u.token_type == TokenType.UserToken + + def test_has_permissions_all_required(self): + u = _user(permissions=["a", "b"]) + assert u.has_permissions(["a"]) is True + assert u.has_permissions(["a", "b"]) is True + assert u.has_permissions(["a", "c"]) is False + assert u.has_permissions([]) is False # empty list ⇒ False + + def test_has_roles_all_required(self): + u = _user(roles=["admin", "ops"]) + assert u.has_roles(["admin"]) is True + assert u.has_roles(["admin", "ops"]) is True + assert u.has_roles(["admin", "missing"]) is False + + def test_id_for_user_token_returns_sub(self): + u = _user(type="userToken") + assert u.id == "user-123" + + def test_id_for_user_api_token_prefers_created_by(self): + u = _user(type="userApiToken", createdByUserId="creator-9") + assert u.id == "creator-9" + + def test_id_for_tenant_api_token_is_none(self): + u = _user(type="tenantApiToken") + assert u.id is None + + +class TestGetAuthHeader: + def _req(self, headers): + req = MagicMock() + req.headers = headers + return req + + def test_bearer_returns_jwt(self): + assert get_auth_header( + self._req({"Authorization": "Bearer xyz"}) + ) == {"token": "xyz", "type": "JWT"} + + def test_x_api_key_returns_access_token(self): + assert get_auth_header( + self._req({"x-api-key": "k"}) + ) == {"token": "k", "type": "AccessToken"} + + def test_no_auth_headers_returns_none(self): + assert get_auth_header(self._req({})) is None diff --git a/tests/unit/test_flask_frontegg_wrapper.py b/tests/unit/test_flask_frontegg_wrapper.py new file mode 100644 index 0000000..025d365 --- /dev/null +++ b/tests/unit/test_flask_frontegg_wrapper.py @@ -0,0 +1,30 @@ +"""Unit tests for the Flask-side ``Frontegg`` wrapper.""" +import pytest + +from frontegg.flask import frontegg as fe_flask_singleton + + +class TestFlaskFronteggWrapper: + def test_init_app_creates_authenticator_and_identity( + self, fake_vendor_session, reset_flask_frontegg + ): + reset_flask_frontegg.init_app("client", "secret") + assert reset_flask_frontegg.authenticator is not None + assert reset_flask_frontegg.identity_client is not None + assert reset_flask_frontegg.access_token == "fake-vendor-token" + + def test_validate_identity_on_token( + self, fake_vendor_session, reset_flask_frontegg, valid_jwt + ): + reset_flask_frontegg.init_app("c", "s") + entity = reset_flask_frontegg.validate_identity_on_token( + "Bearer " + valid_jwt, None, "JWT" + ) + assert entity["sub"] == "user-123" + + def test_decode_jwt_roundtrip( + self, fake_vendor_session, reset_flask_frontegg, valid_jwt + ): + reset_flask_frontegg.init_app("c", "s") + decoded = reset_flask_frontegg.decode_jwt("Bearer " + valid_jwt) + assert decoded["tenantId"] == "tenant-abc" diff --git a/tests/unit/test_flask_get_auth_header.py b/tests/unit/test_flask_get_auth_header.py new file mode 100644 index 0000000..d47cb28 --- /dev/null +++ b/tests/unit/test_flask_get_auth_header.py @@ -0,0 +1,40 @@ +"""Unit tests for Flask ``get_auth_header`` helper.""" +from flask import Flask + +from frontegg.flask.secure_access.with_authentication import get_auth_header + + +def _ctx(headers): + app = Flask(__name__) + return app.test_request_context("/", headers=headers) + + +class TestGetAuthHeader: + def test_extracts_bearer_as_jwt(self): + with _ctx({"Authorization": "Bearer abc.def.ghi"}): + from flask import request + + assert get_auth_header(request) == {"token": "abc.def.ghi", "type": "JWT"} + + def test_extracts_x_api_key_as_access_token(self): + with _ctx({"x-api-key": "secret-access-token"}): + from flask import request + + assert get_auth_header(request) == { + "token": "secret-access-token", + "type": "AccessToken", + } + + def test_none_when_no_headers(self): + with _ctx({}): + from flask import request + + assert get_auth_header(request) is None + + def test_authorization_preferred_over_api_key(self): + with _ctx({"Authorization": "Bearer jwt-t", "x-api-key": "at-t"}): + from flask import request + + result = get_auth_header(request) + assert result["type"] == "JWT" + assert result["token"] == "jwt-t" diff --git a/tests/unit/test_frontegg_async_authenticator.py b/tests/unit/test_frontegg_async_authenticator.py new file mode 100644 index 0000000..a846ab0 --- /dev/null +++ b/tests/unit/test_frontegg_async_authenticator.py @@ -0,0 +1,38 @@ +"""Unit tests for ``FronteggAsyncAuthenticator``.""" +import asyncio + +import pytest + +from frontegg.common import FronteggAsyncAuthenticator + + +def run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +class TestFronteggAsyncAuthenticator: + def test_init_does_not_trigger_refresh(self, fake_async_vendor_session): + # Unlike sync, async __init__ does NOT refresh (must be awaited). + FronteggAsyncAuthenticator("client", "secret") + assert fake_async_vendor_session.post_calls == [] + + def test_refresh_populates_access_token(self, fake_async_vendor_session): + auth = FronteggAsyncAuthenticator("client", "secret") + assert auth.access_token is None + run(auth.refresh_vendor_token()) + assert auth.access_token == "fake-vendor-token" + assert len(fake_async_vendor_session.post_calls) == 1 + + def test_refresh_sends_credentials_in_body(self, fake_async_vendor_session): + auth = FronteggAsyncAuthenticator("c", "s") + run(auth.refresh_vendor_token()) + assert fake_async_vendor_session.post_calls[0]["json"] == { + "clientId": "c", + "secret": "s", + } + + def test_should_refresh_starts_true(self, fake_async_vendor_session): + auth = FronteggAsyncAuthenticator("c", "s") + assert auth.should_refresh_vendor_token is True + run(auth.refresh_vendor_token()) + assert auth.should_refresh_vendor_token is False diff --git a/tests/unit/test_frontegg_authenticator.py b/tests/unit/test_frontegg_authenticator.py new file mode 100644 index 0000000..4e74c53 --- /dev/null +++ b/tests/unit/test_frontegg_authenticator.py @@ -0,0 +1,42 @@ +"""Unit tests for ``FronteggAuthenticator`` (sync vendor token refresh).""" +import arrow +import pytest + +from frontegg.common import FronteggAuthenticator + + +class TestFronteggAuthenticator: + def test_init_triggers_refresh(self, fake_vendor_session): + FronteggAuthenticator("client", "secret") + assert fake_vendor_session.post.call_count == 1 + # Request body carries clientId/secret + _, kwargs = fake_vendor_session.post.call_args + assert kwargs["json"] == {"clientId": "client", "secret": "secret"} + + def test_access_token_populated_after_init(self, fake_vendor_session): + auth = FronteggAuthenticator("client", "secret") + assert auth.access_token == "fake-vendor-token" + + def test_should_refresh_false_right_after_refresh(self, fake_vendor_session): + auth = FronteggAuthenticator("client", "secret") + assert auth.should_refresh_vendor_token is False + + def test_should_refresh_true_when_expired(self, fake_vendor_session, monkeypatch): + auth = FronteggAuthenticator("client", "secret") + # Force "now" to be 2h in the future — past the 1h vendor token. + future = arrow.utcnow().shift(hours=2) + monkeypatch.setattr("frontegg.common.frontegg_authenticator.arrow.utcnow", lambda: future) + assert auth.should_refresh_vendor_token is True + + def test_refresh_is_idempotent(self, fake_vendor_session): + auth = FronteggAuthenticator("client", "secret") + auth.refresh_vendor_token() + assert fake_vendor_session.post.call_count == 2 + assert auth.access_token == "fake-vendor-token" + + def test_http_error_propagates(self, fake_vendor_session): + from tests.conftest import _FakeResponse + + fake_vendor_session.post.return_value = _FakeResponse({}, status_code=500) + with pytest.raises(RuntimeError, match="HTTP 500"): + FronteggAuthenticator("client", "secret") diff --git a/tests/unit/test_frontegg_config.py b/tests/unit/test_frontegg_config.py new file mode 100644 index 0000000..b29d678 --- /dev/null +++ b/tests/unit/test_frontegg_config.py @@ -0,0 +1,19 @@ +"""Unit tests for ``frontegg.common.FronteggConfig``.""" +import pytest + +from frontegg.common import FronteggConfig + + +class TestFronteggConfig: + def test_stores_credentials(self): + cfg = FronteggConfig("client-id", "api-key") + assert cfg.client_id == "client-id" + assert cfg.api_key == "api-key" + + def test_rejects_missing_client_id(self): + with pytest.raises(Exception, match="client_id is required"): + FronteggConfig(None, "api-key") + + def test_rejects_missing_api_key(self): + with pytest.raises(Exception, match="api_key is required"): + FronteggConfig("client-id", None) diff --git a/tests/unit/test_frontegg_context.py b/tests/unit/test_frontegg_context.py new file mode 100644 index 0000000..f02402f --- /dev/null +++ b/tests/unit/test_frontegg_context.py @@ -0,0 +1,57 @@ +"""Unit tests for the ``FronteggContext`` singleton and its option validation.""" +import pytest + +from frontegg.common.frontegg_context import FronteggContext + + +class TestFronteggContext: + def test_is_singleton(self): + a = FronteggContext() + b = FronteggContext() + assert a is b + + def test_init_stores_options(self): + FronteggContext.init({"foo": "bar"}) + assert FronteggContext().options == {"foo": "bar"} + + def test_init_without_access_tokens_options_is_ok(self): + FronteggContext.init({}) # must not raise + assert FronteggContext().options == {} + + def test_init_rejects_missing_cache_in_access_tokens_options(self): + with pytest.raises(Exception, match="cache"): + FronteggContext.init({"access_tokens_options": {}}) + + def test_local_cache_is_accepted(self): + FronteggContext.init( + {"access_tokens_options": {"cache": {"type": "local"}}} + ) + assert FronteggContext().options["access_tokens_options"]["cache"]["type"] == "local" + + def test_redis_cache_requires_all_properties(self): + bad = { + "access_tokens_options": { + "cache": { + "type": "redis", + "options": {"host": "h", "port": 6379, "db": 0}, # missing password + } + } + } + with pytest.raises(Exception, match="password"): + FronteggContext.init(bad) + + def test_redis_cache_full_options_accepted(self): + good = { + "access_tokens_options": { + "cache": { + "type": "redis", + "options": { + "host": "h", + "port": 6379, + "db": 0, + "password": "pw", + }, + } + } + } + FronteggContext.init(good) # must not raise diff --git a/tests/unit/test_frontegg_urls.py b/tests/unit/test_frontegg_urls.py new file mode 100644 index 0000000..e63255b --- /dev/null +++ b/tests/unit/test_frontegg_urls.py @@ -0,0 +1,42 @@ +"""Unit tests for ``frontegg.helpers.frontegg_urls.FronteggUrls``.""" +import importlib + + +def _reload_urls(monkeypatch, env): + for k, v in env.items(): + monkeypatch.setenv(k, v) + module = importlib.import_module("frontegg.helpers.frontegg_urls") + return importlib.reload(module) + + +class TestFronteggUrls: + def test_default_base_url(self): + from frontegg.helpers.frontegg_urls import FronteggUrls + + urls = FronteggUrls() + assert urls.base_url.endswith("/") + assert "api.frontegg.com" in urls.base_url + + def test_authenticate_vendor_endpoint(self): + from frontegg.helpers.frontegg_urls import FronteggUrls + + urls = FronteggUrls() + vendor = urls.authentication_service["authenticate_vendor"] + assert vendor.endswith("/auth/vendor/") + + def test_identity_vendor_config_endpoint(self): + from frontegg.helpers.frontegg_urls import FronteggUrls + + urls = FronteggUrls() + cfg = urls.identity_service["vendor_config"] + assert cfg.endswith("/identity/resources/configurations/v1/") + + def test_gateway_env_override(self, monkeypatch): + mod = _reload_urls( + monkeypatch, {"FRONTEGG_API_GATEWAY_URL": "https://eu.frontegg.com"} + ) + urls = mod.FronteggUrls() + assert urls.base_url == "https://eu.frontegg.com/" + assert urls.authentication_service["authenticate_vendor"].startswith( + "https://eu.frontegg.com/auth/" + ) diff --git a/tests/unit/test_http_client.py b/tests/unit/test_http_client.py new file mode 100644 index 0000000..96b2e65 --- /dev/null +++ b/tests/unit/test_http_client.py @@ -0,0 +1,57 @@ +"""Unit tests for ``HttpClient`` (Frontegg API HTTP wrapper).""" +from unittest.mock import MagicMock + +import pytest + +from frontegg.common.clients.http_client import HttpClient, prepare_headers + + +class TestPrepareHeaders: + def test_merges_tenant_and_host(self): + h = prepare_headers(tenant_id="t", host="h", headers={"X": "1"}) + assert h == { + "frontegg-tenant-id": "t", + "frontegg-vendor-host": "h", + "X": "1", + } + + def test_omits_missing_fields(self): + h = prepare_headers(headers={"X": "1"}) + assert h == {"X": "1"} + + +@pytest.fixture +def http_client(fake_vendor_session): + client = HttpClient("client", "secret", "https://api.frontegg.com/audits/") + client.client = MagicMock(name="inner_session") + client.client.get.return_value = MagicMock(status_code=200) + client.client.post.return_value = MagicMock(status_code=201) + client.client.put.return_value = MagicMock(status_code=200) + client.client.delete.return_value = MagicMock(status_code=204) + client.client.patch.return_value = MagicMock(status_code=200) + client.client.headers = {} + return client + + +class TestHttpClient: + def test_get_sets_vendor_token_header(self, http_client): + http_client.get("items", params={"q": "x"}, tenant_id="t") + assert http_client.client.headers["x-access-token"] == "fake-vendor-token" + http_client.client.get.assert_called_once() + args, kwargs = http_client.client.get.call_args + assert "items" in args[0] + assert kwargs["params"] == {"q": "x"} + assert kwargs["headers"]["frontegg-tenant-id"] == "t" + + def test_post_sends_json(self, http_client): + http_client.post({"a": 1}, url="things") + _, kwargs = http_client.client.post.call_args + assert kwargs["json"] == {"a": 1} + + def test_put_delete_patch_fire_corresponding_methods(self, http_client): + http_client.put({"a": 1}, url="t") + http_client.delete(url="t") + http_client.patch({"b": 2}, url="t") + http_client.client.put.assert_called_once() + http_client.client.delete.assert_called_once() + http_client.client.patch.assert_called_once() diff --git a/tests/unit/test_identity_mixin.py b/tests/unit/test_identity_mixin.py new file mode 100644 index 0000000..3755f7b --- /dev/null +++ b/tests/unit/test_identity_mixin.py @@ -0,0 +1,96 @@ +"""Unit tests for ``IdentityClientMixin``. + +These test the full token-validation path: fetching the public key, caching +it in-memory, picking the right resolver, and returning the decoded entity. +""" +import jwt +import pytest + +from frontegg.common import FronteggAuthenticator, IdentityClientMixin +from frontegg.common.clients.types import AuthHeaderType, TokenTypes +from frontegg.helpers.exceptions import ( + UnauthenticatedException, + UnauthorizedException, +) + + +@pytest.fixture +def identity(fake_vendor_session): + auth = FronteggAuthenticator("client", "secret") + return IdentityClientMixin(auth) + + +class TestFetchPublicKey: + def test_fetches_via_vendor_session(self, identity, fake_vendor_session, public_key): + key = identity.fetch_public_key() + assert key == public_key + fake_vendor_session.get.assert_called() + + def test_get_public_key_caches_result(self, identity, fake_vendor_session): + identity.get_public_key() + identity.get_public_key() + identity.get_public_key() + # Only one GET call — subsequent calls return cached key. + assert fake_vendor_session.get.call_count == 1 + + +class TestValidateIdentityOnToken: + def test_valid_jwt_returns_entity(self, identity, valid_jwt): + entity = identity.validate_identity_on_token( + "Bearer " + valid_jwt, None, AuthHeaderType.JWT.value + ) + assert entity["sub"] == "user-123" + assert entity["tenantId"] == "tenant-abc" + + def test_strips_bearer_prefix(self, identity, valid_jwt): + # With and without the 'Bearer ' prefix must behave identically. + a = identity.validate_identity_on_token( + "Bearer " + valid_jwt, None, AuthHeaderType.JWT.value + ) + b = identity.validate_identity_on_token( + valid_jwt, None, AuthHeaderType.JWT.value + ) + assert a == b + + def test_invalid_token_raises_unauthenticated(self, identity): + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token( + "Bearer garbage.not.a.jwt", None, AuthHeaderType.JWT.value + ) + + def test_role_enforcement_raises_unauthorized(self, identity, make_jwt): + token = make_jwt({"roles": ["reader"]}) + with pytest.raises(UnauthorizedException): + identity.validate_identity_on_token( + "Bearer " + token, + {"roles": ["admin"], "permissions": None}, + AuthHeaderType.JWT.value, + ) + + def test_permission_enforcement_passes_with_match(self, identity, make_jwt): + token = make_jwt({"permissions": ["fe.read", "fe.write"]}) + entity = identity.validate_identity_on_token( + "Bearer " + token, + {"roles": None, "permissions": ["fe.write"]}, + AuthHeaderType.JWT.value, + ) + assert "fe.write" in entity["permissions"] + + def test_unknown_resolver_type_raises(self, identity, valid_jwt): + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token(valid_jwt, None, "NopeType") + + +class TestDecodeJwt: + def test_missing_authorization_header_raises(self, identity): + with pytest.raises(jwt.InvalidTokenError): + identity.decode_jwt(None) + + def test_decodes_with_bearer_prefix(self, identity, valid_jwt): + decoded = identity.decode_jwt("Bearer " + valid_jwt) + assert decoded["sub"] == "user-123" + + def test_decode_unsigned_when_verify_false(self, identity, make_jwt): + token = make_jwt() + decoded = identity.decode_jwt("Bearer " + token, verify=False) + assert decoded["sub"] == "user-123" diff --git a/tests/unit/test_local_cache_manager.py b/tests/unit/test_local_cache_manager.py new file mode 100644 index 0000000..673190d --- /dev/null +++ b/tests/unit/test_local_cache_manager.py @@ -0,0 +1,41 @@ +"""Unit tests for the in-memory cache manager.""" +import time + +from frontegg.common.cache.local_cache_manager import LocalCacheManager + + +class TestLocalCacheManager: + def test_set_and_get_without_expiry(self): + cache = LocalCacheManager() + cache.set("key", {"v": 1}) + assert cache.get("key") == {"v": 1} + + def test_get_missing_key_returns_none(self): + cache = LocalCacheManager() + assert cache.get("nope") is None + + def test_set_with_ttl_returns_value_before_expiry(self): + cache = LocalCacheManager() + cache.set("k", "v", {"expires_in_seconds": 60}) + assert cache.get("k") == "v" + + def test_set_with_ttl_expires(self): + cache = LocalCacheManager() + cache.set("k", "v", {"expires_in_seconds": 0.01}) + time.sleep(0.02) + assert cache.get("k") is None + assert "k" not in cache.cache # deleted on expiry + + def test_delete_removes_keys(self): + cache = LocalCacheManager() + cache.set("a", 1) + cache.set("b", 2) + cache.delete(["a", "missing"]) + assert cache.get("a") is None + assert cache.get("b") == 2 + + def test_overwrite_replaces_value(self): + cache = LocalCacheManager() + cache.set("k", "first") + cache.set("k", "second") + assert cache.get("k") == "second" diff --git a/tests/unit/test_package_utils.py b/tests/unit/test_package_utils.py new file mode 100644 index 0000000..4435180 --- /dev/null +++ b/tests/unit/test_package_utils.py @@ -0,0 +1,14 @@ +"""Unit tests for ``frontegg.common.package_utils.PackageUtils``.""" +import pytest + +from frontegg.common.package_utils import PackageUtils + + +class TestPackageUtils: + def test_loads_existing_package(self): + mod = PackageUtils.load_package("json") + assert mod.__name__ == "json" + + def test_missing_package_raises_with_name(self): + with pytest.raises(Exception, match="definitely_not_a_real_package_xyz"): + PackageUtils.load_package("definitely_not_a_real_package_xyz") diff --git a/tests/unit/test_redis_cache_manager.py b/tests/unit/test_redis_cache_manager.py new file mode 100644 index 0000000..7d6ba87 --- /dev/null +++ b/tests/unit/test_redis_cache_manager.py @@ -0,0 +1,38 @@ +"""Unit tests for the Redis cache manager, backed by ``fakeredis``.""" +from unittest.mock import patch + +import fakeredis +import pytest + + +@pytest.fixture +def redis_cache(): + """Build a ``RedisCacheManager`` whose underlying client is a fakeredis instance.""" + from frontegg.common.cache.redis_cache_manager import RedisCacheManager + + fake = fakeredis.FakeStrictRedis() + + class FakeRedisModule: + Redis = lambda **kwargs: fake # noqa: E731 + + with patch( + "frontegg.common.package_utils.PackageUtils.load_package", + return_value=FakeRedisModule, + ): + yield RedisCacheManager( + {"host": "h", "port": 6379, "db": 0, "password": "pw"} + ) + + +class TestRedisCacheManager: + def test_set_and_get(self, redis_cache): + redis_cache.set("k", {"v": 1}) + assert redis_cache.get("k") == {"v": 1} + + def test_get_missing_returns_none(self, redis_cache): + assert redis_cache.get("missing") is None + + def test_set_with_ttl(self, redis_cache): + # fakeredis supports expire(); just assert no crash and value readable. + redis_cache.set("k", [1, 2, 3], {"expires_in_seconds": 60}) + assert redis_cache.get("k") == [1, 2, 3] diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py new file mode 100644 index 0000000..a4c3821 --- /dev/null +++ b/tests/unit/test_retry.py @@ -0,0 +1,45 @@ +"""Unit tests for ``frontegg.helpers.retry.retry`` decorator.""" +import pytest + +from frontegg.helpers.retry import retry + + +class TestRetry: + def test_returns_value_on_first_success(self): + @retry(action="test", total_tries=3) + def f(): + return 42 + + assert f() == 42 + + def test_retries_then_succeeds(self): + calls = {"n": 0} + + @retry(action="test", total_tries=3, retry_delay=0) + def flaky(): + calls["n"] += 1 + if calls["n"] < 2: + raise ValueError("nope") + return "ok" + + assert flaky() == "ok" + assert calls["n"] == 2 + + def test_raises_after_exhausting_retries(self): + calls = {"n": 0} + + @retry(action="test", total_tries=3, retry_delay=0) + def always_fails(): + calls["n"] += 1 + raise RuntimeError("bad") + + with pytest.raises(RuntimeError, match="bad"): + always_fails() + assert calls["n"] == 3 + + def test_preserves_function_name(self): + @retry(action="test", total_tries=1) + def my_function(): + return None + + assert my_function.__name__ == "my_function" diff --git a/tests/unit/test_token_resolver.py b/tests/unit/test_token_resolver.py new file mode 100644 index 0000000..f682361 --- /dev/null +++ b/tests/unit/test_token_resolver.py @@ -0,0 +1,135 @@ +"""Unit tests for ``TokenResolver`` / ``AuthorizationJWTResolver``. + +These tests use a real RSA keypair and PyJWT so signature verification is +actually exercised — no ``jwt.decode`` mocking. +""" +import jwt +import pytest + +from frontegg.common.clients.token_resolvers.authorization_header_resolver import ( + AuthorizationJWTResolver, +) +from frontegg.common.clients.types import AuthHeaderType, TokenTypes +from frontegg.helpers.exceptions import ( + UnauthenticatedException, + UnauthorizedException, +) + + +@pytest.fixture +def resolver(): + return AuthorizationJWTResolver() + + +class TestAuthorizationJWTResolver: + def test_should_handle_jwt_type(self, resolver): + assert resolver.should_handle(AuthHeaderType.JWT.value) is True + assert resolver.should_handle(AuthHeaderType.AccessToken.value) is False + + def test_validates_user_token(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.UserToken.value}) + entity = resolver.validate_token(token, public_key) + assert entity["sub"] == "user-123" + assert entity["tenantId"] == "tenant-abc" + + def test_validates_user_api_token(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.UserApiToken.value}) + entity = resolver.validate_token(token, public_key) + assert entity["type"] == TokenTypes.UserApiToken.value + + def test_rejects_disallowed_token_type(self, resolver, make_jwt, public_key): + token = make_jwt({"type": TokenTypes.UserAccessToken.value}) + # UserAccessToken is not in the allowed types of the JWT resolver + with pytest.raises(Exception): + resolver.validate_token(token, public_key) + + def test_invalid_signature_raises_unauthenticated( + self, resolver, make_jwt, public_key + ): + # Sign with a different key + other_key = jwt.utils.force_bytes # sentinel; we'll generate a new key + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + + bad = rsa.generate_private_key(65537, 2048) + bad_pem = bad.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode() + token = jwt.encode({"sub": "x", "type": "userToken"}, bad_pem, algorithm="RS256") + + with pytest.raises(UnauthenticatedException): + resolver.validate_token(token, public_key) + + def test_malformed_token_raises_unauthenticated(self, resolver, public_key): + with pytest.raises(UnauthenticatedException): + resolver.validate_token("not-a-jwt", public_key) + + def test_role_check_passes_with_matching_role( + self, resolver, make_jwt, public_key + ): + token = make_jwt({"roles": ["admin", "reader"]}) + entity = resolver.validate_token( + token, public_key, options={"roles": ["admin"]} + ) + assert entity["roles"] == ["admin", "reader"] + + def test_role_check_fails_when_no_overlap( + self, resolver, make_jwt, public_key + ): + token = make_jwt({"roles": ["reader"]}) + with pytest.raises(UnauthorizedException): + resolver.validate_token( + token, public_key, options={"roles": ["admin"]} + ) + + def test_permission_check_passes(self, resolver, make_jwt, public_key): + token = make_jwt({"permissions": ["fe.read", "fe.write"]}) + resolver.validate_token( + token, public_key, options={"permissions": ["fe.write"]} + ) + + def test_permission_check_fails(self, resolver, make_jwt, public_key): + token = make_jwt({"permissions": ["fe.read"]}) + with pytest.raises(UnauthorizedException): + resolver.validate_token( + token, public_key, options={"permissions": ["fe.admin"]} + ) + + def test_empty_options_does_not_check(self, resolver, make_jwt, public_key): + token = make_jwt({"roles": [], "permissions": []}) + entity = resolver.validate_token( + token, public_key, options={"roles": [], "permissions": []} + ) + assert entity["sub"] == "user-123" + + def test_get_entity_is_identity(self, resolver): + entity = {"sub": "abc", "roles": [], "permissions": []} + assert resolver.get_entity(entity) is entity + + +class TestValidateRolesAndPermissions: + """Exercise the static helper directly for boundary cases.""" + + def test_no_options_is_noop(self): + from frontegg.common.clients.token_resolvers.token_resolver import TokenResolver + + TokenResolver.validate_roles_and_permissions( + {"roles": [], "permissions": []}, None + ) + + def test_at_least_one_role_suffices(self): + from frontegg.common.clients.token_resolvers.token_resolver import TokenResolver + + TokenResolver.validate_roles_and_permissions( + {"roles": ["a"], "permissions": []}, {"roles": ["a", "b"]} + ) + + def test_insufficient_roles_raises_unauthorized(self): + from frontegg.common.clients.token_resolvers.token_resolver import TokenResolver + + with pytest.raises(UnauthorizedException): + TokenResolver.validate_roles_and_permissions( + {"roles": ["other"], "permissions": []}, {"roles": ["a"]} + ) diff --git a/tests/unit/test_token_type_mismatch.py b/tests/unit/test_token_type_mismatch.py new file mode 100644 index 0000000..2fc1e59 --- /dev/null +++ b/tests/unit/test_token_type_mismatch.py @@ -0,0 +1,87 @@ +"""Unit tests for token type mismatch detection — mirrors +nodejs-sdk identity-client.spec.ts "should throw if provided token in the +wrong header" tests. + +The Python SDK should reject: +- A user JWT token presented via x-api-key (AccessToken header type) +- An access token presented via Authorization header (JWT header type) +""" +import pytest + +from frontegg.common import FronteggAuthenticator, IdentityClientMixin +from frontegg.common.clients.types import AuthHeaderType, TokenTypes +from frontegg.helpers.exceptions import UnauthenticatedException + + +@pytest.fixture +def identity(fake_vendor_session): + auth = FronteggAuthenticator("c", "s") + return IdentityClientMixin(auth) + + +class TestTokenTypeMismatch: + def test_user_token_in_access_token_header_raises(self, identity, make_jwt): + """User JWT token sent via x-api-key should be rejected. + + The AccessToken resolver only allows UserAccessToken/TenantAccessToken + types, so a userToken JWT will fail type validation. + """ + token = make_jwt({"type": TokenTypes.UserToken.value}) + with pytest.raises(Exception): + identity.validate_identity_on_token( + token, None, AuthHeaderType.AccessToken.value + ) + + def test_tenant_api_token_in_access_token_header_raises(self, identity, make_jwt): + """TenantApiToken sent via x-api-key should be rejected by the + AccessToken resolver (wrong token type).""" + token = make_jwt({"type": TokenTypes.TenantApiToken.value}) + with pytest.raises(Exception): + identity.validate_identity_on_token( + token, None, AuthHeaderType.AccessToken.value + ) + + def test_user_access_token_in_jwt_header_raises(self, identity, make_jwt): + """UserAccessToken sent via Authorization header should be rejected + by the JWT resolver (type not in allowed list).""" + token = make_jwt({"type": TokenTypes.UserAccessToken.value}) + with pytest.raises(Exception): + identity.validate_identity_on_token( + "Bearer " + token, None, AuthHeaderType.JWT.value + ) + + def test_tenant_access_token_in_jwt_header_raises(self, identity, make_jwt): + """TenantAccessToken sent via Authorization header should be rejected.""" + token = make_jwt({"type": TokenTypes.TenantAccessToken.value}) + with pytest.raises(Exception): + identity.validate_identity_on_token( + "Bearer " + token, None, AuthHeaderType.JWT.value + ) + + def test_unknown_header_type_raises(self, identity, make_jwt): + """Completely unknown header type should raise UnauthenticatedException.""" + token = make_jwt() + with pytest.raises(UnauthenticatedException): + identity.validate_identity_on_token(token, None, "UNKNOWN_TYPE") + + def test_correct_user_token_in_jwt_header_passes(self, identity, make_jwt): + """Sanity: userToken via Authorization header should work.""" + token = make_jwt({"type": TokenTypes.UserToken.value}) + entity = identity.validate_identity_on_token( + "Bearer " + token, None, AuthHeaderType.JWT.value + ) + assert entity["type"] == TokenTypes.UserToken.value + + def test_correct_user_api_token_in_jwt_header_passes(self, identity, make_jwt): + token = make_jwt({"type": TokenTypes.UserApiToken.value}) + entity = identity.validate_identity_on_token( + "Bearer " + token, None, AuthHeaderType.JWT.value + ) + assert entity["type"] == TokenTypes.UserApiToken.value + + def test_correct_tenant_api_token_in_jwt_header_passes(self, identity, make_jwt): + token = make_jwt({"type": TokenTypes.TenantApiToken.value}) + entity = identity.validate_identity_on_token( + "Bearer " + token, None, AuthHeaderType.JWT.value + ) + assert entity["type"] == TokenTypes.TenantApiToken.value