Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added src/mcp/__init__.py
Empty file.
95 changes: 95 additions & 0 deletions src/mcp/mcp_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import json


class MCPToolError(Exception):
"""Raised when MCP server returns a JSON-RPC error response."""
pass


class MCPClient:
"""
Minimal JSON-RPC client for communicating with the Reactome MCP server
over stdin/stdout.
"""

def __init__(self, process: asyncio.subprocess.Process, timeout: float = 30.0):
self.process = process
self.timeout = timeout
self.request_id = 0

async def call(self, method: str, params: dict | None = None) -> dict:
"""
Send a JSON-RPC request and return the result.

Raises
------
MCPToolError
If the server returns a JSON-RPC error response.
asyncio.TimeoutError
If the server does not respond within timeout seconds.
RuntimeError
If the server closes the connection unexpectedly.
"""
if params is None:
params = {}

self.request_id += 1

request = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": method,
"params": params,
}

message = json.dumps(request) + "\n"
self.process.stdin.write(message.encode("utf-8"))
await self.process.stdin.drain()

# Wait for response with timeout so chatbot never hangs indefinitely
response_line = await asyncio.wait_for(
self.process.stdout.readline(),
timeout=self.timeout,
)

if not response_line:
raise RuntimeError("MCP server closed the connection.")

try:
response = json.loads(response_line.decode("utf-8").strip())
except json.JSONDecodeError as e:
raise RuntimeError(f"MCP server returned invalid JSON: {e}")

# JSON-RPC error response — server understood request but returned an error
if "error" in response:
error = response["error"]
raise MCPToolError(
f"MCP error {error.get('code')}: {error.get('message')}"
)

return response.get("result", {})

async def call_tool(self, tool_name: str, arguments: dict | None = None) -> str:
"""
Call a specific MCP tool and return the text result.

Parameters
----------
tool_name : str
Name of the tool (e.g. "reactome_search").
arguments : dict | None
Tool arguments.
"""
if arguments is None:
arguments = {}

result = await self.call(
"tools/call",
{"name": tool_name, "arguments": arguments},
)

# MCP returns content as list of typed blocks — extract text blocks
content = result.get("content", [])
text_parts = [block["text"] for block in content if block.get("type") == "text"]
return "\n".join(text_parts)
66 changes: 66 additions & 0 deletions src/mcp/mcp_process_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
from pathlib import Path


class MCPConnectionError(Exception):
"""Raised when MCP server fails to start or crashes."""
pass


class MCPProcessManager:
"""Manages lifecycle of the Reactome MCP server process."""

def __init__(self, mcp_server_path: str):
self.mcp_server_path = Path(mcp_server_path)
if not self.mcp_server_path.exists():
raise FileNotFoundError(
f"MCP server not found at: {self.mcp_server_path}\n"
f"Make sure reactome-mcp is cloned and built with 'npm run build'"
)
self.process = None

async def start(self) -> asyncio.subprocess.Process:
"""Start the MCP server process."""
self.process = await asyncio.create_subprocess_exec(
"node",
str(self.mcp_server_path),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# Allow server time to initialize before checking if it survived
await asyncio.sleep(1)

if self.process.returncode is not None:
# Process already exited — read stderr to find out why
stderr_output = await self.process.stderr.read()
raise MCPConnectionError(
f"MCP server failed to start:\n{stderr_output.decode('utf-8')}"
)

return self.process

async def stop(self) -> None:
"""Stop the MCP server — graceful terminate, falls back to kill."""
if not self.process:
return

try:
self.process.terminate()
await asyncio.wait_for(self.process.wait(), timeout=5.0)

except asyncio.TimeoutError:
self.process.kill()
await self.process.wait()

finally:
self.process = None

async def __aenter__(self):
await self.start()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
return False