Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,28 @@ Ensure strict type checking is isolated to paths where subclassing is intentiona
2024-05-11 — DAG Execution Memory Optimization
Learning: Passing a mutable dictionary of `asyncio.Task` objects through execution hot paths (like `_run_node`) creates a memory-leaking reference cycle (`tasks` dict -> `Task` object -> `Coroutine` -> `tasks` dict).
Action: Use pre-resolved tuples (e.g., `tuple(tasks[dep] for dep in deps)`) for dependencies when evaluating nodes. This isolates the references safely, prevents the cycle, and marginally improves hot path performance by reducing dictionary lookups.


## 2026-05-15 — Performance Optimizations in Workflow Engine

Learning:
We optimized engine execution by using generator expressions with empty fallback fast-paths (`tuple(x for x in y) if y else ()`) to avoid tuple memory allocations, and using the walrus operator (`:=`) to combine dictionary `get` lookups and validation.

Action:
In hot path execution graphs, use `tuple(tasks[d] for d in deps) if deps else ()` to bypass generator allocations for edge nodes entirely. Combine dictionary lookups with the walrus operator to avoid double-lookups or KeyError risks.

## 2026-05-17 — Safe Dependency Upgrades

Learning:
Continuous dependency upgrades are essential for security and reliability, but strict static analysis tools like `mypy` should have their major versions constrained to prevent sudden CI breakage.

Action:
Upgraded locked dependencies using `uv lock --upgrade` while explicitly constraining mypy<2.

## 2026-05-20 — Error Observability & Logging Tracebacks

Learning:
When handling failures gracefully inside a DAG execution engine (where exceptions are caught and wrapped into `TaskError` objects rather than crashing the process), logging only `logger.error("... %s", e)` discards the stack traceback. This severely limits observability and forces developers to guess where the task actually failed inside their custom logic.

Action:
Inside `except` blocks dealing with arbitrary user-code failures, always use `logger.exception(...)` instead of `logger.error(...)`. This natively appends the full traceback to the application logs while still safely swallowing the exception at runtime to prevent process crashes.
14 changes: 14 additions & 0 deletions .jules/warden.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
2026-05-12 — Assessment & Lifecycle
Observation / Pruned:
The prior agent, BOLT, successfully implemented an optimization resolving a memory leak in DAG execution by replacing application-level `asyncio.Task` dictionaries passed directly into `_run_node` with isolated task lists, breaking a circular reference loop. The tests confirm structural integrity.
Entropy Pruned: 0 lines. Codebase remains at zero-bloat state.

Alignment / Deferred:
Safe dependency bumps were verified. Explicitly locked `mypy` below version 2 within `pyproject.toml` to prevent strict analysis pipeline failure while upgrading other frameworks. Version safely bumped to `0.1.26`.

2026-05-05 — Assessment & Lifecycle
Observation / Pruned:
Verified structural soundness of the codebase. The fast-fail mechanism correctly utilizes `asyncio.wait` ensuring no unawaited coroutines leak. Scanned for dead code via `vulture`; remaining flags are confirmed as FastAPI/Pydantic false positives. Codebase zero-bloat state holds intact. Entropy Pruned: 0 lines.
Expand Down Expand Up @@ -171,3 +179,9 @@ Observation / Pruned:
Assessed micro-optimization for `functools.partial` using exact type checking. No dead code pruned today; codebase maintains structural zero-bloat state.
Alignment / Deferred:
Deferred major version bumps for strict analysis tooling (`mypy<2`) as standard procedure. Documented strict type checking exception rules for hot-path evaluation constraints.

2026-05-12 — Assessment & Lifecycle
Observation / Pruned:
No dead code observed; BOLT's _run_node optimization and fail-fast test coverage are structurally sound.
Alignment / Deferred:
Safely bumped uvicorn, ruff, and idna to latest minor/patch versions; pinned mypy to <2 to prevent breaking changes.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

All notable changes to this project will be documented in this file.

## [0.1.26] - 2026-05-12

* **[QA Status]:** Verified structural soundness of the circular reference / memory leak fix within DAG evaluation. Core tests pass seamlessly without introducing side effects.
* **[Entropy Pruned]:** 0 lines. Codebase zero-bloat state holds intact.
* **[Dependencies Bumped]:** Successfully locked `mypy<2` to preserve strict typing while allowing other dependencies to bump minor/patch versions safely via `uv lock --upgrade`.
* **[Docs Updated]:** Appended ledger record to `.jules/warden.md` validating the memory pipeline corrections.
* **[Release]:** v0.1.26 cut, tagged, and ready.

## [0.1.25] - 2026-05-07

* **[QA Status]**: Verified structural soundness of the `functools.partial` unwrapping optimization. The exact type checking (`type(...) is functools.partial`) was evaluated to safely handle the hot-path execution loop without introducing regressions or breaking fast-fail mechanisms.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "catalyst"
version = "0.1.25"
version = "0.1.26"
description = "High-performance workflow engine for complex pipelines. Parallel DAG execution, <1s sorts, zero bloat."
authors = [
{ name = "shenald-dev", email = "bot@shenald.dev" }
Expand All @@ -17,7 +17,7 @@ dev = [
"pytest>=8.0.2",
"pytest-asyncio>=0.23.5",
"pytest-cov>=4.1.0",
"mypy>=1.8.0",
"mypy>=1.8.0,<2",
"ruff>=0.3.0",
"httpx>=0.27.0"
]
Expand Down
19 changes: 12 additions & 7 deletions src/catalyst/domain/engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import functools
import inspect
import types
import logging
import graphlib
from typing import Any, Callable, Iterable
Expand Down Expand Up @@ -81,10 +82,14 @@ def add_task(
# are not supported in task execution hot paths.
while type(base_func) is functools.partial:
base_func = base_func.func
if hasattr(base_func, "__call__") and inspect.iscoroutinefunction(
base_func.__call__
if not isinstance(
base_func,
(types.FunctionType, types.MethodType, types.BuiltinFunctionType),
):
is_async = True
if hasattr(base_func, "__call__") and inspect.iscoroutinefunction(
base_func.__call__
):
is_async = True

self._is_async[name] = is_async
self._predecessors[name] = (
Expand Down Expand Up @@ -131,8 +136,7 @@ async def _run_node(
)

try:
func = self.tasks.get(node)
if func is None:
if (func := self.tasks.get(node)) is None:
raise KeyError(f"Task {node!r} not found")
timeout = self._timeouts.get(node)
is_async = self._is_async.get(node, False)
Expand All @@ -146,7 +150,7 @@ async def _run_node(

return result
except Exception as e:
logger.error("Task %r failed: %s", node, e)
logger.exception("Task %r failed", node)
return TaskError(node, e)

async def execute(self) -> dict[str, Any]:
Expand All @@ -166,7 +170,8 @@ async def execute(self) -> dict[str, Any]:

for node in self._cached_topo_order:
deps = self._predecessors.get(node, [])
dep_tasks = tuple(tasks[dep] for dep in deps)
# Fast-path fallback to () avoids generator allocation overhead for edge nodes
dep_tasks = tuple(tasks[dep] for dep in deps) if deps else ()
tasks[node] = asyncio.create_task(self._run_node(node, dep_tasks))

if tasks:
Expand Down
2 changes: 1 addition & 1 deletion src/catalyst/presentation/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
app = FastAPI(
title="Catalyst Workflow API",
description="High-performance DAG execution engine interface",
version="0.1.25",
version="0.1.26",
)


Expand Down
4 changes: 3 additions & 1 deletion tests/test_fail_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async def downstream() -> str:

from typing import Any, Tuple

async def wrapped_run_node(node: str, dep_tasks: Tuple[asyncio.Task[Any], ...]) -> Any:
async def wrapped_run_node(
node: str, dep_tasks: Tuple[asyncio.Task[Any], ...]
) -> Any:
nonlocal downstream_eval_time
res = await orig_run_node(node, dep_tasks)
if node == "downstream":
Expand Down
Loading