From 166061622602c0e718367b18a7fdfab33b40a8ac Mon Sep 17 00:00:00 2001 From: maxjneto Date: Wed, 29 Apr 2026 22:02:17 -0300 Subject: [PATCH 1/2] Add Google ADK (Gemini) agent samples Demonstrates the temporalio[google-adk] integration with three examples: - basic: hello world agent + activity-backed tools - orchestration: sequential, parallel, and loop patterns - human_in_the_loop: signal-based approval before sensitive tool calls --- README.md | 1 + google_adk/README.md | 85 +++++++++++ google_adk/__init__.py | 0 .../basic/activities/get_weather_activity.py | 7 + .../basic/activities/search_web_activity.py | 7 + google_adk/basic/run_hello_world_workflow.py | 25 ++++ google_adk/basic/run_tools_workflow.py | 25 ++++ google_adk/basic/run_worker.py | 39 +++++ .../basic/workflows/hello_world_workflow.py | 46 ++++++ google_adk/basic/workflows/tools_workflow.py | 59 ++++++++ .../activities/sensitive_actions.py | 13 ++ .../human_in_the_loop/run_hitl_workflow.py | 71 +++++++++ google_adk/human_in_the_loop/run_worker.py | 42 ++++++ .../workflows/hitl_workflow.py | 140 ++++++++++++++++++ google_adk/orchestration/run_loop_workflow.py | 44 ++++++ .../orchestration/run_parallel_workflow.py | 50 +++++++ .../orchestration/run_sequential_workflow.py | 46 ++++++ google_adk/orchestration/run_worker.py | 37 +++++ .../orchestration/workflows/loop_workflow.py | 102 +++++++++++++ .../workflows/parallel_workflow.py | 98 ++++++++++++ .../workflows/sequential_workflow.py | 98 ++++++++++++ pyproject.toml | 6 + 22 files changed, 1041 insertions(+) create mode 100644 google_adk/README.md create mode 100644 google_adk/__init__.py create mode 100644 google_adk/basic/activities/get_weather_activity.py create mode 100644 google_adk/basic/activities/search_web_activity.py create mode 100644 google_adk/basic/run_hello_world_workflow.py create mode 100644 google_adk/basic/run_tools_workflow.py create mode 100644 google_adk/basic/run_worker.py create mode 100644 google_adk/basic/workflows/hello_world_workflow.py create mode 100644 google_adk/basic/workflows/tools_workflow.py create mode 100644 google_adk/human_in_the_loop/activities/sensitive_actions.py create mode 100644 google_adk/human_in_the_loop/run_hitl_workflow.py create mode 100644 google_adk/human_in_the_loop/run_worker.py create mode 100644 google_adk/human_in_the_loop/workflows/hitl_workflow.py create mode 100644 google_adk/orchestration/run_loop_workflow.py create mode 100644 google_adk/orchestration/run_parallel_workflow.py create mode 100644 google_adk/orchestration/run_sequential_workflow.py create mode 100644 google_adk/orchestration/run_worker.py create mode 100644 google_adk/orchestration/workflows/loop_workflow.py create mode 100644 google_adk/orchestration/workflows/parallel_workflow.py create mode 100644 google_adk/orchestration/workflows/sequential_workflow.py diff --git a/README.md b/README.md index d4d6a61b..607ab00c 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [env_config](env_config) - Load client configuration from TOML files with programmatic overrides. * [gevent_async](gevent_async) - Combine gevent and Temporal. +* [google_adk](google_adk) - Orchestrate durable AI agent workflows with Google ADK (Gemini) including tools, multi-agent orchestration, and human-in-the-loop approval. * [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow. * [langchain](langchain) - Orchestrate workflows for LangChain. * [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates. diff --git a/google_adk/README.md b/google_adk/README.md new file mode 100644 index 00000000..0ce94d13 --- /dev/null +++ b/google_adk/README.md @@ -0,0 +1,85 @@ +# Temporal Google ADK Integration + +This directory contains samples demonstrating how to use [Google's Agent Development Kit (ADK)](https://github.com/google/adk-python) with Temporal's durable execution engine. + +See the [module documentation](https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/google_adk_agents/README.md) for more information. + +## Overview + +The integration combines: +- **Temporal workflows** for durable orchestration, retries, and state management +- **Google ADK** for AI agent creation with Gemini models and tool interactions + +Every LLM call and tool execution is wrapped in a Temporal Activity, making agent workflows replay-safe, retryable, and observable. + +## Prerequisites + +- Temporal server [running locally](https://docs.temporal.io/cli/server#start-dev) +- Required dependencies installed via `uv sync --group google-adk` +- Google API key set as environment variable: `export GOOGLE_API_KEY=your_key_here` + +## Examples + +Each directory contains a complete example with its own instructions: + +### [Basic Examples](./basic/) + +Simple agent examples demonstrating core integration patterns: + +- **Hello World** — A minimal agent that responds in haikus +- **Tools** — An agent with activity-backed tools (weather, web search) + +```bash +# Terminal 1: Start the worker +uv run google_adk/basic/run_worker.py + +# Terminal 2: Run a workflow +uv run google_adk/basic/run_hello_world_workflow.py +uv run google_adk/basic/run_tools_workflow.py +``` + +### [Orchestration](./orchestration/) + +Multi-agent orchestration patterns — sequential pipelines, parallel fan-out, and iterative loops: + +- **Sequential** — Researcher → Writer → Editor pipeline via chained agent calls +- **Parallel** — Multiple agents answer the same question from different perspectives simultaneously +- **Loop** — Agent iterates on its own output until a termination condition is met + +```bash +# Terminal 1: Start the worker +uv run google_adk/orchestration/run_worker.py + +# Terminal 2: Run an orchestration +uv run google_adk/orchestration/run_sequential_workflow.py +uv run google_adk/orchestration/run_parallel_workflow.py +uv run google_adk/orchestration/run_loop_workflow.py +``` + +### [Human-in-the-Loop](./human_in_the_loop/) + +Demonstrates pausing agent execution for human approval before sensitive tool calls: + +- Agent attempts a sensitive action (send email, delete record) +- Workflow pauses and exposes pending approvals via `@workflow.query` +- External process approves/rejects via `@workflow.signal` +- Agent resumes or reports rejection + +```bash +# Terminal 1: Start the worker +uv run google_adk/human_in_the_loop/run_worker.py + +# Terminal 2: Run the HITL workflow (starts workflow, polls for approvals, approves) +uv run google_adk/human_in_the_loop/run_hitl_workflow.py +``` + +## Key Integration Patterns + +| Pattern | How It Works | +|---------|-------------| +| **Plugin** | `GoogleAdkPlugin` on `Client.connect()` — handles ADK/Pydantic serialization | +| **TemporalModel** | Wraps Gemini model calls as Temporal Activities for durability | +| **activity_tool** | Wraps `@activity.defn` functions as ADK-compatible tools | +| **InMemoryRunner** | ADK's runner executes the agent within the workflow | +| **Signals for HITL** | `@workflow.signal` enables external approval/rejection of tool calls | +| **Queries for observability** | `@workflow.query` exposes pending state without advancing the workflow | diff --git a/google_adk/__init__.py b/google_adk/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google_adk/basic/activities/get_weather_activity.py b/google_adk/basic/activities/get_weather_activity.py new file mode 100644 index 00000000..9b9d42c6 --- /dev/null +++ b/google_adk/basic/activities/get_weather_activity.py @@ -0,0 +1,7 @@ +from temporalio import activity + + +@activity.defn +async def get_weather(city: str) -> str: + """Get current weather for a city (mock implementation).""" + return f"72°F and sunny in {city}" diff --git a/google_adk/basic/activities/search_web_activity.py b/google_adk/basic/activities/search_web_activity.py new file mode 100644 index 00000000..1772dcca --- /dev/null +++ b/google_adk/basic/activities/search_web_activity.py @@ -0,0 +1,7 @@ +from temporalio import activity + + +@activity.defn +async def search_web(query: str) -> str: + """Search the web for information (mock implementation).""" + return f'Search results for "{query}": [Result 1: Overview] [Result 2: Details] [Result 3: Examples]' diff --git a/google_adk/basic/run_hello_world_workflow.py b/google_adk/basic/run_hello_world_workflow.py new file mode 100644 index 00000000..05eec4c0 --- /dev/null +++ b/google_adk/basic/run_hello_world_workflow.py @@ -0,0 +1,25 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin + +from google_adk.basic.workflows.hello_world_workflow import HelloWorldWorkflow + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[GoogleAdkPlugin()], + ) + + result = await client.execute_workflow( + HelloWorldWorkflow.run, + "Tell me about recursion in programming.", + id="google-adk-hello-world", + task_queue="google-adk-basic-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/basic/run_tools_workflow.py b/google_adk/basic/run_tools_workflow.py new file mode 100644 index 00000000..0de60344 --- /dev/null +++ b/google_adk/basic/run_tools_workflow.py @@ -0,0 +1,25 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin + +from google_adk.basic.workflows.tools_workflow import ToolsWorkflow + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[GoogleAdkPlugin()], + ) + + result = await client.execute_workflow( + ToolsWorkflow.run, + "What is the weather in Tokyo?", + id="google-adk-tools-workflow", + task_queue="google-adk-basic-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/basic/run_worker.py b/google_adk/basic/run_worker.py new file mode 100644 index 00000000..9ffe5974 --- /dev/null +++ b/google_adk/basic/run_worker.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin, ModelActivityParameters +from temporalio.worker import Worker + +from google_adk.basic.activities.get_weather_activity import get_weather +from google_adk.basic.activities.search_web_activity import search_web +from google_adk.basic.workflows.hello_world_workflow import HelloWorldWorkflow +from google_adk.basic.workflows.tools_workflow import ToolsWorkflow + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[ + GoogleAdkPlugin( + model_params=ModelActivityParameters( + start_to_close_timeout=timedelta(seconds=30) + ) + ), + ], + ) + + worker = Worker( + client, + task_queue="google-adk-basic-task-queue", + workflows=[HelloWorldWorkflow, ToolsWorkflow], + activities=[get_weather, search_web], + ) + print("Worker started on task queue: google-adk-basic-task-queue") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/basic/workflows/hello_world_workflow.py b/google_adk/basic/workflows/hello_world_workflow.py new file mode 100644 index 00000000..03ee51da --- /dev/null +++ b/google_adk/basic/workflows/hello_world_workflow.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from contextlib import aclosing + +from temporalio import workflow +from temporalio.contrib.google_adk_agents import TemporalModel + +with workflow.unsafe.imports_passed_through(): + from google.adk.agents import Agent + from google.adk.runners import InMemoryRunner + from google.genai import types + + +@workflow.defn +class HelloWorldWorkflow: + @workflow.run + async def run(self, prompt: str) -> str: + agent = Agent( + name="Assistant", + model=TemporalModel("gemini-2.5-flash"), + instruction="You only respond in haikus.", + ) + + runner = InMemoryRunner(agent=agent, app_name="hello_world") + session = await runner.session_service.create_session( + user_id="user", app_name="hello_world" + ) + + result = "" + async with aclosing( + runner.run_async( + user_id="user", + session_id=session.id, + new_message=types.Content( + role="user", + parts=[types.Part.from_text(text=prompt)], + ), + ) + ) as events: + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + result = part.text + + return result diff --git a/google_adk/basic/workflows/tools_workflow.py b/google_adk/basic/workflows/tools_workflow.py new file mode 100644 index 00000000..07995e99 --- /dev/null +++ b/google_adk/basic/workflows/tools_workflow.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from contextlib import aclosing +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.google_adk_agents import TemporalModel +from temporalio.contrib.google_adk_agents.workflow import activity_tool + +with workflow.unsafe.imports_passed_through(): + from google.adk.agents import Agent + from google.adk.runners import InMemoryRunner + from google.genai import types + + from google_adk.basic.activities.get_weather_activity import get_weather + from google_adk.basic.activities.search_web_activity import search_web + + +@workflow.defn +class ToolsWorkflow: + @workflow.run + async def run(self, question: str) -> str: + weather_tool = activity_tool( + get_weather, start_to_close_timeout=timedelta(seconds=10) + ) + search_tool = activity_tool( + search_web, start_to_close_timeout=timedelta(seconds=10) + ) + + agent = Agent( + name="ToolsAgent", + model=TemporalModel("gemini-2.5-flash"), + instruction="You are a helpful agent. Use tools to answer questions.", + tools=[weather_tool, search_tool], + ) + + runner = InMemoryRunner(agent=agent, app_name="tools_agent") + session = await runner.session_service.create_session( + user_id="user", app_name="tools_agent" + ) + + result = "" + async with aclosing( + runner.run_async( + user_id="user", + session_id=session.id, + new_message=types.Content( + role="user", + parts=[types.Part.from_text(text=question)], + ), + ) + ) as events: + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + result = part.text + + return result diff --git a/google_adk/human_in_the_loop/activities/sensitive_actions.py b/google_adk/human_in_the_loop/activities/sensitive_actions.py new file mode 100644 index 00000000..63cdf6e0 --- /dev/null +++ b/google_adk/human_in_the_loop/activities/sensitive_actions.py @@ -0,0 +1,13 @@ +from temporalio import activity + + +@activity.defn +async def send_email(to: str, subject: str, body: str) -> str: + """Send an email (mock implementation).""" + return f"Email sent to {to} with subject '{subject}'" + + +@activity.defn +async def delete_record(record_id: str) -> str: + """Delete a record from the database (mock implementation).""" + return f"Record {record_id} deleted successfully" diff --git a/google_adk/human_in_the_loop/run_hitl_workflow.py b/google_adk/human_in_the_loop/run_hitl_workflow.py new file mode 100644 index 00000000..0cc09488 --- /dev/null +++ b/google_adk/human_in_the_loop/run_hitl_workflow.py @@ -0,0 +1,71 @@ +"""Human-in-the-loop example — demonstrates approval workflow. + +This starter: +1. Starts the workflow with a prompt that will trigger a sensitive tool +2. Polls for pending approvals +3. Approves the tool call via signal +4. Waits for the final result +""" + +import asyncio + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin + +from google_adk.human_in_the_loop.workflows.hitl_workflow import ( + ApprovalSignal, + HumanInTheLoopWorkflow, +) + +WORKFLOW_ID = "google-adk-hitl-workflow" + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[GoogleAdkPlugin()], + ) + + # Start the workflow (don't await result yet) + handle = await client.start_workflow( + HumanInTheLoopWorkflow.run, + "Send an email to alice@example.com with subject 'Hello' and body 'How are you?'", + id=WORKFLOW_ID, + task_queue="google-adk-hitl-task-queue", + ) + print(f"Workflow started: {handle.id}") + + # Poll for pending approvals + print("Waiting for tool call to require approval...") + pending = [] + for _ in range(30): + pending = await handle.query(HumanInTheLoopWorkflow.get_pending_approvals) + if pending: + break + await asyncio.sleep(1) + + if not pending: + print("No pending approvals found (agent may not have used a sensitive tool)") + result = await handle.result() + print(f"Result: {result}") + return + + # Show pending approval and approve it + for call in pending: + print(f"\nPending approval:") + print(f" Tool: {call.tool_name}") + print(f" Arguments: {call.arguments}") + print(f" Approving call {call.call_id}...") + + await handle.signal( + HumanInTheLoopWorkflow.approve, + ApprovalSignal(call_id=call.call_id, approved=True), + ) + + # Wait for final result + result = await handle.result() + print(f"\nFinal result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/human_in_the_loop/run_worker.py b/google_adk/human_in_the_loop/run_worker.py new file mode 100644 index 00000000..e286bb06 --- /dev/null +++ b/google_adk/human_in_the_loop/run_worker.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin, ModelActivityParameters +from temporalio.worker import Worker + +from google_adk.human_in_the_loop.activities.sensitive_actions import ( + delete_record, + send_email, +) +from google_adk.human_in_the_loop.workflows.hitl_workflow import ( + HumanInTheLoopWorkflow, +) + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[ + GoogleAdkPlugin( + model_params=ModelActivityParameters( + start_to_close_timeout=timedelta(seconds=30) + ) + ), + ], + ) + + worker = Worker( + client, + task_queue="google-adk-hitl-task-queue", + workflows=[HumanInTheLoopWorkflow], + activities=[send_email, delete_record], + ) + print("Worker started on task queue: google-adk-hitl-task-queue") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/human_in_the_loop/workflows/hitl_workflow.py b/google_adk/human_in_the_loop/workflows/hitl_workflow.py new file mode 100644 index 00000000..660a2e0b --- /dev/null +++ b/google_adk/human_in_the_loop/workflows/hitl_workflow.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +from contextlib import aclosing +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any + +from temporalio import workflow +from temporalio.contrib.google_adk_agents import TemporalModel +from temporalio.contrib.google_adk_agents.workflow import activity_tool + +with workflow.unsafe.imports_passed_through(): + from google.adk.agents import Agent + from google.adk.runners import InMemoryRunner + from google.genai import types + + from google_adk.human_in_the_loop.activities.sensitive_actions import ( + delete_record, + send_email, + ) + + +@dataclass +class ApprovalSignal: + """Signal payload for approving/rejecting a tool call.""" + + call_id: str + approved: bool = True + + +@dataclass +class PendingToolCall: + """A tool call awaiting human approval.""" + + call_id: str + tool_name: str + arguments: dict[str, Any] = field(default_factory=dict) + + +@workflow.defn +class HumanInTheLoopWorkflow: + """Agent with human-in-the-loop approval for sensitive tools. + + When the agent attempts to use a tool marked as requiring confirmation, + the workflow pauses and waits for an approval signal before proceeding. + + - ``approve`` (signal): approve or reject a pending tool call + - ``get_pending_approvals`` (query): list tool calls awaiting approval + """ + + def __init__(self) -> None: + self._pending_calls: dict[str, PendingToolCall] = {} + self._approval_results: dict[str, ApprovalSignal] = {} + + @workflow.run + async def run(self, prompt: str) -> str: + # Tools that require human confirmation before execution + require_confirmation = {"send_email", "delete_record"} + + email_tool = activity_tool( + send_email, start_to_close_timeout=timedelta(seconds=10) + ) + delete_tool = activity_tool( + delete_record, start_to_close_timeout=timedelta(seconds=10) + ) + + wf_self = self + + async def before_tool_callback( + tool: Any, args: dict[str, Any], tool_context: Any + ) -> dict[str, Any] | None: + if tool.name not in require_confirmation: + return None # proceed without approval + + # Generate a unique call ID and register as pending + call_id = str(workflow.uuid4()) + wf_self._pending_calls[call_id] = PendingToolCall( + call_id=call_id, + tool_name=tool.name, + arguments=dict(args), + ) + + # Pause until approval signal arrives + await workflow.wait_condition( + lambda cid=call_id: cid in wf_self._approval_results + ) + + # Process approval + approval = wf_self._approval_results.pop(call_id) + wf_self._pending_calls.pop(call_id, None) + + if not approval.approved: + return {"error": f"Tool '{tool.name}' was rejected by reviewer."} + + return None # proceed with execution + + agent = Agent( + name="AssistantWithApproval", + model=TemporalModel("gemini-2.5-flash"), + instruction=( + "You are a helpful assistant. You can send emails and delete records. " + "Use these tools when the user asks you to." + ), + tools=[email_tool, delete_tool], + before_tool_callback=before_tool_callback, + ) + + runner = InMemoryRunner(agent=agent, app_name="hitl_agent") + session = await runner.session_service.create_session( + user_id="user", app_name="hitl_agent" + ) + + result = "" + async with aclosing( + runner.run_async( + user_id="user", + session_id=session.id, + new_message=types.Content( + role="user", + parts=[types.Part.from_text(text=prompt)], + ), + ) + ) as events: + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + result = part.text + + return result + + @workflow.signal + async def approve(self, signal: ApprovalSignal) -> None: + """Approve or reject a pending tool call.""" + self._approval_results[signal.call_id] = signal + + @workflow.query + def get_pending_approvals(self) -> list[PendingToolCall]: + """List all tool calls awaiting human approval.""" + return list(self._pending_calls.values()) diff --git a/google_adk/orchestration/run_loop_workflow.py b/google_adk/orchestration/run_loop_workflow.py new file mode 100644 index 00000000..9d5fe051 --- /dev/null +++ b/google_adk/orchestration/run_loop_workflow.py @@ -0,0 +1,44 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin + +from google_adk.orchestration.workflows.loop_workflow import ( + AgentConfig, + LoopInput, + LoopWorkflow, +) + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[GoogleAdkPlugin()], + ) + + result = await client.execute_workflow( + LoopWorkflow.run, + LoopInput( + message=( + "Write a haiku about Temporal.io. After writing it, " + "evaluate if it's good. If it is, respond with DONE. " + "If not, try again." + ), + agent_config=AgentConfig( + name="poet", + instruction="You are a poet who writes and critiques haiku. Be self-critical.", + ), + max_iterations=5, + termination_phrase="DONE", + ), + id="google-adk-loop-workflow", + task_queue="google-adk-orchestration-task-queue", + ) + print(f"Iterations: {result.iterations}") + for i, response in enumerate(result.responses): + print(f"\n--- Iteration {i + 1} ---") + print(response[:300] + "..." if len(response) > 300 else response) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/orchestration/run_parallel_workflow.py b/google_adk/orchestration/run_parallel_workflow.py new file mode 100644 index 00000000..85df22e7 --- /dev/null +++ b/google_adk/orchestration/run_parallel_workflow.py @@ -0,0 +1,50 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin + +from google_adk.orchestration.workflows.parallel_workflow import ( + AgentConfig, + ParallelInput, + ParallelWorkflow, +) + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[GoogleAdkPlugin()], + ) + + question = "What are the pros and cons of microservices?" + + result = await client.execute_workflow( + ParallelWorkflow.run, + ParallelInput( + messages=[question, question, question], + agent_configs=[ + AgentConfig( + name="backend_engineer", + instruction="Answer from the perspective of a backend engineer focused on system design.", + ), + AgentConfig( + name="devops_engineer", + instruction="Answer from the perspective of a DevOps engineer focused on deployment and operations.", + ), + AgentConfig( + name="tech_lead", + instruction="Answer from the perspective of a tech lead focused on team productivity.", + ), + ], + ), + id="google-adk-parallel-workflow", + task_queue="google-adk-orchestration-task-queue", + ) + print(f"Responses collected: {result.iterations}") + for i, response in enumerate(result.responses): + print(f"\n--- Perspective {i + 1} ---") + print(response[:300] + "..." if len(response) > 300 else response) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/orchestration/run_sequential_workflow.py b/google_adk/orchestration/run_sequential_workflow.py new file mode 100644 index 00000000..9f870acf --- /dev/null +++ b/google_adk/orchestration/run_sequential_workflow.py @@ -0,0 +1,46 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin + +from google_adk.orchestration.workflows.sequential_workflow import ( + AgentConfig, + SequentialInput, + SequentialWorkflow, +) + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[GoogleAdkPlugin()], + ) + + result = await client.execute_workflow( + SequentialWorkflow.run, + SequentialInput( + message="Explain how garbage collection works in Python", + agent_configs=[ + AgentConfig( + name="researcher", + instruction="Research the topic and provide key facts and technical details.", + ), + AgentConfig( + name="writer", + instruction="Take the research and write a clear, well-structured explanation for developers.", + ), + AgentConfig( + name="editor", + instruction="Edit the text for clarity and conciseness. Keep it under 200 words.", + ), + ], + ), + id="google-adk-sequential-workflow", + task_queue="google-adk-orchestration-task-queue", + ) + print(f"Steps completed: {result.iterations}") + print(f"Final response:\n{result.final_response}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/orchestration/run_worker.py b/google_adk/orchestration/run_worker.py new file mode 100644 index 00000000..d75b0e47 --- /dev/null +++ b/google_adk/orchestration/run_worker.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio.client import Client +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin, ModelActivityParameters +from temporalio.worker import Worker + +from google_adk.orchestration.workflows.loop_workflow import LoopWorkflow +from google_adk.orchestration.workflows.parallel_workflow import ParallelWorkflow +from google_adk.orchestration.workflows.sequential_workflow import SequentialWorkflow + + +async def main(): + client = await Client.connect( + "localhost:7233", + plugins=[ + GoogleAdkPlugin( + model_params=ModelActivityParameters( + start_to_close_timeout=timedelta(seconds=60) + ) + ), + ], + ) + + worker = Worker( + client, + task_queue="google-adk-orchestration-task-queue", + workflows=[SequentialWorkflow, ParallelWorkflow, LoopWorkflow], + ) + print("Worker started on task queue: google-adk-orchestration-task-queue") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/google_adk/orchestration/workflows/loop_workflow.py b/google_adk/orchestration/workflows/loop_workflow.py new file mode 100644 index 00000000..47a32444 --- /dev/null +++ b/google_adk/orchestration/workflows/loop_workflow.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from contextlib import aclosing +from dataclasses import dataclass + +from temporalio import workflow +from temporalio.contrib.google_adk_agents import TemporalModel + +with workflow.unsafe.imports_passed_through(): + from google.adk.agents import Agent + from google.adk.runners import InMemoryRunner + from google.genai import types + + +@dataclass +class AgentConfig: + """Configuration for the loop agent.""" + + name: str + model: str = "gemini-2.5-flash" + instruction: str = "" + + +@dataclass +class LoopInput: + """Input for loop orchestration.""" + + message: str + agent_config: AgentConfig | None = None + max_iterations: int = 5 + termination_phrase: str = "DONE" + + +@dataclass +class OrchestrationOutput: + """Output from an orchestration workflow.""" + + responses: list[str] | None = None + final_response: str = "" + iterations: int = 0 + + def __post_init__(self): + if self.responses is None: + self.responses = [] + + +@workflow.defn +class LoopWorkflow: + """Run an agent repeatedly until a termination condition is met. + + Pattern: Iterative refinement — the agent writes, critiques itself, + and stops when satisfied (output contains the termination phrase). + """ + + @workflow.run + async def run(self, input: LoopInput) -> OrchestrationOutput: + config = input.agent_config or AgentConfig(name="loop_agent") + responses: list[str] = [] + current_message = input.message + + for _ in range(input.max_iterations): + agent = Agent( + name=config.name, + model=TemporalModel(config.model), + instruction=config.instruction, + ) + + app_name = f"loop_{config.name}" + runner = InMemoryRunner(agent=agent, app_name=app_name) + session = await runner.session_service.create_session( + user_id="user", app_name=app_name + ) + + result = "" + async with aclosing( + runner.run_async( + user_id="user", + session_id=session.id, + new_message=types.Content( + role="user", + parts=[types.Part.from_text(text=current_message)], + ), + ) + ) as events: + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + result = part.text + + responses.append(result) + + if input.termination_phrase in result: + break + + current_message = result + + return OrchestrationOutput( + responses=responses, + final_response=responses[-1] if responses else "", + iterations=len(responses), + ) diff --git a/google_adk/orchestration/workflows/parallel_workflow.py b/google_adk/orchestration/workflows/parallel_workflow.py new file mode 100644 index 00000000..57738a00 --- /dev/null +++ b/google_adk/orchestration/workflows/parallel_workflow.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import asyncio +from contextlib import aclosing +from dataclasses import dataclass, field + +from temporalio import workflow +from temporalio.contrib.google_adk_agents import TemporalModel + +with workflow.unsafe.imports_passed_through(): + from google.adk.agents import Agent + from google.adk.runners import InMemoryRunner + from google.genai import types + + +@dataclass +class AgentConfig: + """Configuration for a single agent.""" + + name: str + model: str = "gemini-2.5-flash" + instruction: str = "" + + +@dataclass +class ParallelInput: + """Input for parallel orchestration.""" + + messages: list[str] = field(default_factory=list) + agent_configs: list[AgentConfig] = field(default_factory=list) + + +@dataclass +class OrchestrationOutput: + """Output from an orchestration workflow.""" + + responses: list[str] = field(default_factory=list) + final_response: str = "" + iterations: int = 0 + + +async def _run_single_agent(config: AgentConfig, message: str) -> str: + """Helper to run a single ADK agent and return its text output.""" + agent = Agent( + name=config.name, + model=TemporalModel(config.model), + instruction=config.instruction, + ) + + app_name = f"parallel_{config.name}" + runner = InMemoryRunner(agent=agent, app_name=app_name) + session = await runner.session_service.create_session( + user_id="user", app_name=app_name + ) + + result = "" + async with aclosing( + runner.run_async( + user_id="user", + session_id=session.id, + new_message=types.Content( + role="user", + parts=[types.Part.from_text(text=message)], + ), + ) + ) as events: + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + result = part.text + + return result + + +@workflow.defn +class ParallelWorkflow: + """Run agents concurrently, collect all results. + + Pattern: Multiple perspectives on the same question. + All agents run simultaneously via asyncio.gather. + """ + + @workflow.run + async def run(self, input: ParallelInput) -> OrchestrationOutput: + pairs = list(zip(input.agent_configs, input.messages, strict=False)) + + tasks = [ + asyncio.ensure_future(_run_single_agent(config, message)) + for config, message in pairs + ] + results: list[str] = await asyncio.gather(*tasks) + + return OrchestrationOutput( + responses=list(results), + final_response=results[-1] if results else "", + iterations=len(results), + ) diff --git a/google_adk/orchestration/workflows/sequential_workflow.py b/google_adk/orchestration/workflows/sequential_workflow.py new file mode 100644 index 00000000..5f256668 --- /dev/null +++ b/google_adk/orchestration/workflows/sequential_workflow.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from contextlib import aclosing +from dataclasses import dataclass, field +from typing import Any + +from temporalio import workflow +from temporalio.contrib.google_adk_agents import TemporalModel + +with workflow.unsafe.imports_passed_through(): + from google.adk.agents import Agent + from google.adk.runners import InMemoryRunner + from google.genai import types + + +@dataclass +class AgentConfig: + """Configuration for a single agent step.""" + + name: str + model: str = "gemini-2.5-flash" + instruction: str = "" + + +@dataclass +class SequentialInput: + """Input for sequential orchestration.""" + + message: str + agent_configs: list[AgentConfig] = field(default_factory=list) + + +@dataclass +class OrchestrationOutput: + """Output from an orchestration workflow.""" + + responses: list[str] = field(default_factory=list) + final_response: str = "" + iterations: int = 0 + + +async def _run_single_agent(config: AgentConfig, message: str) -> str: + """Helper to run a single ADK agent and return its text output.""" + agent = Agent( + name=config.name, + model=TemporalModel(config.model), + instruction=config.instruction, + ) + + app_name = f"orchestration_{config.name}" + runner = InMemoryRunner(agent=agent, app_name=app_name) + session = await runner.session_service.create_session( + user_id="user", app_name=app_name + ) + + result = "" + async with aclosing( + runner.run_async( + user_id="user", + session_id=session.id, + new_message=types.Content( + role="user", + parts=[types.Part.from_text(text=message)], + ), + ) + ) as events: + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + result = part.text + + return result + + +@workflow.defn +class SequentialWorkflow: + """Run agents in sequence, threading output as the next input. + + Pattern: researcher → writer → editor + Each agent's output becomes the next agent's input. + """ + + @workflow.run + async def run(self, input: SequentialInput) -> OrchestrationOutput: + responses: list[str] = [] + current_message = input.message + + for config in input.agent_configs: + result = await _run_single_agent(config, current_message) + responses.append(result) + current_message = result + + return OrchestrationOutput( + responses=responses, + final_response=responses[-1] if responses else "", + iterations=len(responses), + ) diff --git a/pyproject.toml b/pyproject.toml index 0927eea3..64be6820 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,11 @@ open-telemetry = [ "temporalio[opentelemetry]", "opentelemetry-exporter-otlp-proto-grpc", ] +google-adk = [ + "google-adk>=1.31.0,<2", + "google-genai>=1.14.0,<2", + "temporalio[google-adk]>=1.26.0,<2", +] openai-agents = [ "openai-agents[litellm] >= 0.14.1", "temporalio[openai-agents,opentelemetry] >= 1.26.0", @@ -66,6 +71,7 @@ include = ["./**/*.py"] packages = [ "activity_worker", "bedrock", + "google_adk", "cloud_export_to_parquet", "context_propagation", "custom_converter", From 2b089b8f90f10ff3eb72af86d1f258b83e3227fc Mon Sep 17 00:00:00 2001 From: maxjneto Date: Wed, 29 Apr 2026 22:24:58 -0300 Subject: [PATCH 2/2] Refactor Google ADK plugin initialization and clean up code in worker scripts Co-authored-by: Copilot --- google_adk/basic/run_worker.py | 11 ++--------- google_adk/human_in_the_loop/run_hitl_workflow.py | 2 +- google_adk/human_in_the_loop/run_worker.py | 11 ++--------- .../human_in_the_loop/workflows/hitl_workflow.py | 7 ++++--- google_adk/orchestration/run_worker.py | 11 ++--------- google_adk/orchestration/workflows/loop_workflow.py | 8 ++------ .../orchestration/workflows/sequential_workflow.py | 1 - 7 files changed, 13 insertions(+), 38 deletions(-) diff --git a/google_adk/basic/run_worker.py b/google_adk/basic/run_worker.py index 9ffe5974..8fb56122 100644 --- a/google_adk/basic/run_worker.py +++ b/google_adk/basic/run_worker.py @@ -1,10 +1,9 @@ from __future__ import annotations import asyncio -from datetime import timedelta from temporalio.client import Client -from temporalio.contrib.google_adk_agents import GoogleAdkPlugin, ModelActivityParameters +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin from temporalio.worker import Worker from google_adk.basic.activities.get_weather_activity import get_weather @@ -16,13 +15,7 @@ async def main(): client = await Client.connect( "localhost:7233", - plugins=[ - GoogleAdkPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(seconds=30) - ) - ), - ], + plugins=[GoogleAdkPlugin()], ) worker = Worker( diff --git a/google_adk/human_in_the_loop/run_hitl_workflow.py b/google_adk/human_in_the_loop/run_hitl_workflow.py index 0cc09488..13f663f5 100644 --- a/google_adk/human_in_the_loop/run_hitl_workflow.py +++ b/google_adk/human_in_the_loop/run_hitl_workflow.py @@ -52,7 +52,7 @@ async def main(): # Show pending approval and approve it for call in pending: - print(f"\nPending approval:") + print("\nPending approval:") print(f" Tool: {call.tool_name}") print(f" Arguments: {call.arguments}") print(f" Approving call {call.call_id}...") diff --git a/google_adk/human_in_the_loop/run_worker.py b/google_adk/human_in_the_loop/run_worker.py index e286bb06..6312415a 100644 --- a/google_adk/human_in_the_loop/run_worker.py +++ b/google_adk/human_in_the_loop/run_worker.py @@ -1,10 +1,9 @@ from __future__ import annotations import asyncio -from datetime import timedelta from temporalio.client import Client -from temporalio.contrib.google_adk_agents import GoogleAdkPlugin, ModelActivityParameters +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin from temporalio.worker import Worker from google_adk.human_in_the_loop.activities.sensitive_actions import ( @@ -19,13 +18,7 @@ async def main(): client = await Client.connect( "localhost:7233", - plugins=[ - GoogleAdkPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(seconds=30) - ) - ), - ], + plugins=[GoogleAdkPlugin()], ) worker = Worker( diff --git a/google_adk/human_in_the_loop/workflows/hitl_workflow.py b/google_adk/human_in_the_loop/workflows/hitl_workflow.py index 660a2e0b..315f8699 100644 --- a/google_adk/human_in_the_loop/workflows/hitl_workflow.py +++ b/google_adk/human_in_the_loop/workflows/hitl_workflow.py @@ -81,9 +81,10 @@ async def before_tool_callback( ) # Pause until approval signal arrives - await workflow.wait_condition( - lambda cid=call_id: cid in wf_self._approval_results - ) + def _is_approved(cid: str = call_id) -> bool: + return cid in wf_self._approval_results + + await workflow.wait_condition(_is_approved) # Process approval approval = wf_self._approval_results.pop(call_id) diff --git a/google_adk/orchestration/run_worker.py b/google_adk/orchestration/run_worker.py index d75b0e47..b5641ade 100644 --- a/google_adk/orchestration/run_worker.py +++ b/google_adk/orchestration/run_worker.py @@ -1,10 +1,9 @@ from __future__ import annotations import asyncio -from datetime import timedelta from temporalio.client import Client -from temporalio.contrib.google_adk_agents import GoogleAdkPlugin, ModelActivityParameters +from temporalio.contrib.google_adk_agents import GoogleAdkPlugin from temporalio.worker import Worker from google_adk.orchestration.workflows.loop_workflow import LoopWorkflow @@ -15,13 +14,7 @@ async def main(): client = await Client.connect( "localhost:7233", - plugins=[ - GoogleAdkPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(seconds=60) - ) - ), - ], + plugins=[GoogleAdkPlugin()], ) worker = Worker( diff --git a/google_adk/orchestration/workflows/loop_workflow.py b/google_adk/orchestration/workflows/loop_workflow.py index 47a32444..865d40ef 100644 --- a/google_adk/orchestration/workflows/loop_workflow.py +++ b/google_adk/orchestration/workflows/loop_workflow.py @@ -1,7 +1,7 @@ from __future__ import annotations from contextlib import aclosing -from dataclasses import dataclass +from dataclasses import dataclass, field from temporalio import workflow from temporalio.contrib.google_adk_agents import TemporalModel @@ -35,14 +35,10 @@ class LoopInput: class OrchestrationOutput: """Output from an orchestration workflow.""" - responses: list[str] | None = None + responses: list[str] = field(default_factory=list) final_response: str = "" iterations: int = 0 - def __post_init__(self): - if self.responses is None: - self.responses = [] - @workflow.defn class LoopWorkflow: diff --git a/google_adk/orchestration/workflows/sequential_workflow.py b/google_adk/orchestration/workflows/sequential_workflow.py index 5f256668..0bc84cdc 100644 --- a/google_adk/orchestration/workflows/sequential_workflow.py +++ b/google_adk/orchestration/workflows/sequential_workflow.py @@ -2,7 +2,6 @@ from contextlib import aclosing from dataclasses import dataclass, field -from typing import Any from temporalio import workflow from temporalio.contrib.google_adk_agents import TemporalModel