Summary
The sync entry points (run_workflow(), Runtime.run(), Executor.run()) call asyncio.run() which fails inside already-running event loops. Currently, all three detect the running loop and raise RuntimeError with a message to use the async variant — but provide no fallback mechanism. This blocks SDK adoption in FastAPI, Jupyter, and any ASGI context.
Current Architecture
Three parallel sync entry points, all identical pattern:
| Entry Point |
File |
Lines |
Detection |
run_workflow() |
__init__.py |
80-88 |
asyncio.get_running_loop() → raise |
Runtime.run() |
builder.py |
96-103 |
Same pattern |
Executor.run() |
core.py |
631-632 |
Delegates to _ensure_no_running_loop() (lines 1240-1251) |
Sync→Async Call Chain
run_workflow() → asyncio.run() → run_workflow_async()
→ builds config, storage, memory, registries, LLM client
→ Executor(steps, ...).run_async()
→ __execute_steps_loop()
→ _dispatch_agent() [async]
→ _dispatch_function() [sync]
→ _dispatch_tool() [async]
The Failure Mode
# In FastAPI route handler or Jupyter cell:
from agent_runtime import run_workflow
result = run_workflow("workflow.yaml", inputs={...})
# → RuntimeError: "Detected a running event loop. Use `run_workflow_async()` instead."
Existing TODO
core.py:1244-1245:
# TODO(eng): Provide an opt-in helper to run sync APIs in async contexts by
# dispatching to a dedicated worker thread if we ever need that behavior.
Implementation Options (Zero New Dependencies)
Option A: Worker Thread (Recommended)
import threading, asyncio
def _run_sync_in_thread(coro):
result, exc = [None], [None]
def worker():
try:
result[0] = asyncio.run(coro)
except Exception as e:
exc[0] = e
t = threading.Thread(target=worker)
t.start()
t.join()
if exc[0]:
raise exc[0]
return result[0]
- Pros: Zero deps, deterministic, clean loop isolation
- Cons: Thread overhead, state sharing needs care
Option B: Optional nest_asyncio
- Detect loop → try
import nest_asyncio → apply() → asyncio.run()
- Cons: Violates zero-dep principle, safety concerns with nested loops
Option C: Status Quo + Better Docs
- Keep raising, document explicitly
- Least effort, most user friction
Test Coverage
- Single test:
test_runtime.py:236-255 — verifies error is raised
- No FastAPI/Jupyter integration tests
- No thread-pool fallback tests
Key Code Locations
| Concern |
File |
Lines |
| Public sync API |
__init__.py |
51-94 |
| Public async API |
__init__.py |
97-179 |
| Executor sync wrapper |
core.py |
617-644 |
| Loop detection |
core.py |
1240-1251 |
| TODO comment |
core.py |
1244-1245 |
| Builder sync wrapper |
builder.py |
76-103 |
| Builder async wrapper |
builder.py |
105-147 |
| Dependencies |
pyproject.toml |
29-32 (PyYAML + typing-extensions only) |
Priority
P0 — Blocks SDK adoption. FastAPI and Jupyter are the two most common embedding contexts. Low effort with worker thread approach (~1 day).
🤖 Generated with Claude Code
Summary
The sync entry points (
run_workflow(),Runtime.run(),Executor.run()) callasyncio.run()which fails inside already-running event loops. Currently, all three detect the running loop and raiseRuntimeErrorwith a message to use the async variant — but provide no fallback mechanism. This blocks SDK adoption in FastAPI, Jupyter, and any ASGI context.Current Architecture
Three parallel sync entry points, all identical pattern:
run_workflow()__init__.pyasyncio.get_running_loop()→ raiseRuntime.run()builder.pyExecutor.run()core.py_ensure_no_running_loop()(lines 1240-1251)Sync→Async Call Chain
The Failure Mode
Existing TODO
core.py:1244-1245:Implementation Options (Zero New Dependencies)
Option A: Worker Thread (Recommended)
Option B: Optional nest_asyncio
import nest_asyncio→apply()→asyncio.run()Option C: Status Quo + Better Docs
Test Coverage
test_runtime.py:236-255— verifies error is raisedKey Code Locations
__init__.py__init__.pycore.pycore.pycore.pybuilder.pybuilder.pypyproject.tomlPriority
P0 — Blocks SDK adoption. FastAPI and Jupyter are the two most common embedding contexts. Low effort with worker thread approach (~1 day).
🤖 Generated with Claude Code