Skip to content

Add async-safe sync wrappers for FastAPI/Jupyter embedding #2

@rogue-socket

Description

@rogue-socket

Summary

The sync entry points (run_workflow(), Runtime.run(), Executor.run()) call asyncio.run() which fails inside already-running event loops. Currently, all three detect the running loop and raise RuntimeError with a message to use the async variant — but provide no fallback mechanism. This blocks SDK adoption in FastAPI, Jupyter, and any ASGI context.

Current Architecture

Three parallel sync entry points, all identical pattern:

Entry Point File Lines Detection
run_workflow() __init__.py 80-88 asyncio.get_running_loop() → raise
Runtime.run() builder.py 96-103 Same pattern
Executor.run() core.py 631-632 Delegates to _ensure_no_running_loop() (lines 1240-1251)

Sync→Async Call Chain

run_workflow()  →  asyncio.run()  →  run_workflow_async()
  → builds config, storage, memory, registries, LLM client
  → Executor(steps, ...).run_async()
    → __execute_steps_loop()
      → _dispatch_agent() [async]
      → _dispatch_function() [sync]
      → _dispatch_tool() [async]

The Failure Mode

# In FastAPI route handler or Jupyter cell:
from agent_runtime import run_workflow
result = run_workflow("workflow.yaml", inputs={...})
# → RuntimeError: "Detected a running event loop. Use `run_workflow_async()` instead."

Existing TODO

core.py:1244-1245:

# TODO(eng): Provide an opt-in helper to run sync APIs in async contexts by
#   dispatching to a dedicated worker thread if we ever need that behavior.

Implementation Options (Zero New Dependencies)

Option A: Worker Thread (Recommended)

import threading, asyncio

def _run_sync_in_thread(coro):
    result, exc = [None], [None]
    def worker():
        try:
            result[0] = asyncio.run(coro)
        except Exception as e:
            exc[0] = e
    t = threading.Thread(target=worker)
    t.start()
    t.join()
    if exc[0]:
        raise exc[0]
    return result[0]
  • Pros: Zero deps, deterministic, clean loop isolation
  • Cons: Thread overhead, state sharing needs care

Option B: Optional nest_asyncio

  • Detect loop → try import nest_asyncioapply()asyncio.run()
  • Cons: Violates zero-dep principle, safety concerns with nested loops

Option C: Status Quo + Better Docs

  • Keep raising, document explicitly
  • Least effort, most user friction

Test Coverage

  • Single test: test_runtime.py:236-255 — verifies error is raised
  • No FastAPI/Jupyter integration tests
  • No thread-pool fallback tests

Key Code Locations

Concern File Lines
Public sync API __init__.py 51-94
Public async API __init__.py 97-179
Executor sync wrapper core.py 617-644
Loop detection core.py 1240-1251
TODO comment core.py 1244-1245
Builder sync wrapper builder.py 76-103
Builder async wrapper builder.py 105-147
Dependencies pyproject.toml 29-32 (PyYAML + typing-extensions only)

Priority

P0 — Blocks SDK adoption. FastAPI and Jupyter are the two most common embedding contexts. Low effort with worker thread approach (~1 day).


🤖 Generated with Claude Code

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions