From 150b632c95d6c7281a7012ed6597f689d6920d65 Mon Sep 17 00:00:00 2001 From: Delega Bot Date: Tue, 14 Apr 2026 12:26:03 -0500 Subject: [PATCH] =?UTF-8?q?feat:=201.2.0=20multi-agent=20coordination=20me?= =?UTF-8?q?thods=20+=20usage()=20fix=20=E2=86=92=200.2.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brings the Python SDK in line with @delega-dev/mcp 1.2.0. Agents using this SDK can now delegate, inspect chains, pass shared context, and dedup — the surfaces that turn Delega from a task tracker into an actual coordination layer. New methods (sync + async): - tasks.assign(task_id, agent_id) — PUT /tasks/:id with assigned_to_agent_id (None to unassign). For multi-agent handoffs use delegate() instead — assign() doesn't record a chain. - tasks.chain(task_id) — GET /tasks/:id/chain. Returns DelegationChain dataclass. Client normalizes hosted {root_id} vs self-hosted {root: Task} shape divergence so root_id is always populated. - tasks.update_context(task_id, context) — PATCH /tasks/:id/context. Deep-merges keys (not replace). Normalizes hosted bare-dict vs self-hosted full-task response; always returns the merged dict. - tasks.find_duplicates(content, threshold=None) — POST /tasks/dedup. Returns DedupResult with DuplicateMatch entries. Existing method updates: - tasks.delegate() gains project_id / labels / due_date / assigned_to_agent_id kwargs matching the delega-mcp tool signature. Existing callers unaffected (all additions are optional kwargs). - Delega.usage() BUG FIX: was hitting /stats, now correctly /usage. Raises DelegaError before any HTTP call on self-hosted backends (which don't expose /usage — mirrors the delega-mcp client gate). Async usage() also gets the gate. Model updates: - Task now surfaces parent_task_id, root_task_id, delegation_depth, status, assigned_to_agent_id, created_by_agent_id, completed_by_agent_id, context. from_dict() auto-parses the JSON-encoded context string that hosted returns (D1/SQLite text column) vs the dict self-hosted returns (SQLAlchemy JSON). - New: DelegationChain, DedupResult, DuplicateMatch dataclasses. All exposed from the package root. Test updates: - 10 new unit tests covering all new methods and both shape branches of chain() and update_context(). Existing test_usage split into _hosted + _self_hosted_raises_before_fetch (asserts mock never called). - HTTPClient.path_prefix property added to both sync and async transports so the usage gate can inspect the namespace. Version: 0.1.3 → 0.2.0 (minor — new methods, usage() endpoint change is a semantic fix not a signature change). 63/63 pytest tests passing. Out of scope (follow-up): async test coverage for the new methods. The async paths are line-by-line mirrors of sync; a later PR can add async-specific tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- pyproject.toml | 2 +- src/delega/__init__.py | 13 +++- src/delega/_http.py | 5 ++ src/delega/_version.py | 2 +- src/delega/async_client.py | 80 +++++++++++++++++++- src/delega/client.py | 131 ++++++++++++++++++++++++++++++-- src/delega/models.py | 97 ++++++++++++++++++++++++ tests/test_client.py | 149 ++++++++++++++++++++++++++++++++++++- 8 files changed, 463 insertions(+), 16 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a1e71dd..07a19f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "delega" -version = "0.1.3" +version = "0.2.0" description = "Official Python SDK for the Delega API" readme = "README.md" license = "MIT" diff --git a/src/delega/__init__.py b/src/delega/__init__.py index dc0f569..8bd1a49 100644 --- a/src/delega/__init__.py +++ b/src/delega/__init__.py @@ -9,19 +9,30 @@ DelegaNotFoundError, DelegaRateLimitError, ) -from .models import Agent, Comment, Project, Task +from .models import ( + Agent, + Comment, + DedupResult, + DelegationChain, + DuplicateMatch, + Project, + Task, +) from .webhooks import verify_webhook __all__ = [ "Agent", "AsyncDelega", "Comment", + "DedupResult", "Delega", "DelegaAPIError", "DelegaAuthError", "DelegaError", "DelegaNotFoundError", "DelegaRateLimitError", + "DelegationChain", + "DuplicateMatch", "Project", "Task", "verify_webhook", diff --git a/src/delega/_http.py b/src/delega/_http.py index 71ddfbd..57a6c7d 100644 --- a/src/delega/_http.py +++ b/src/delega/_http.py @@ -60,6 +60,11 @@ def __init__(self, base_url: str, api_key: str, timeout: int = _DEFAULT_TIMEOUT) self._api_key = api_key self._timeout = timeout + @property + def path_prefix(self) -> str: + """Return the API namespace path ("/v1" for hosted, "/api" for self-hosted).""" + return urllib.parse.urlparse(self._base_url).path or "" + def _headers(self) -> dict[str, str]: return { "X-Agent-Key": self._api_key, diff --git a/src/delega/_version.py b/src/delega/_version.py index 01a5f44..b835363 100644 --- a/src/delega/_version.py +++ b/src/delega/_version.py @@ -1,4 +1,4 @@ """Package version metadata.""" -__version__ = "0.1.3" +__version__ = "0.2.0" USER_AGENT = f"delega-python/{__version__}" diff --git a/src/delega/async_client.py b/src/delega/async_client.py index 1c25b5a..e07035b 100644 --- a/src/delega/async_client.py +++ b/src/delega/async_client.py @@ -13,7 +13,7 @@ DelegaNotFoundError, DelegaRateLimitError, ) -from .models import Agent, Comment, Project, Task +from .models import Agent, Comment, DedupResult, DelegationChain, Project, Task from ._version import USER_AGENT _DEFAULT_BASE_URL = "https://api.delega.dev" @@ -49,6 +49,12 @@ def __init__(self, base_url: str, api_key: str, timeout: int = 30) -> None: timeout=timeout, ) + @property + def path_prefix(self) -> str: + """Return the API namespace path ("/v1" for hosted, "/api" for self-hosted).""" + import urllib.parse + return urllib.parse.urlparse(self._base_url).path or "" + async def request( self, method: str, @@ -201,16 +207,74 @@ async def delegate( *, description: Optional[str] = None, priority: Optional[int] = None, + project_id: Optional[str] = None, + labels: Optional[list[str]] = None, + due_date: Optional[str] = None, + assigned_to_agent_id: Optional[str] = None, ) -> Task: - """Create a delegated sub-task under a parent task.""" + """Create a delegated child task under a parent. + + The parent's ``status`` flips to ``"delegated"``. Use this — not + ``assign()`` — for multi-agent handoffs so the parent/child + accountability chain is recorded. + """ body: dict[str, Any] = {"content": content} if description is not None: body["description"] = description if priority is not None: body["priority"] = priority + if project_id is not None: + body["project_id"] = project_id + if labels is not None: + body["labels"] = labels + if due_date is not None: + body["due_date"] = due_date + if assigned_to_agent_id is not None: + body["assigned_to_agent_id"] = assigned_to_agent_id data = await self._http.post(f"/tasks/{parent_task_id}/delegate", body=body) return Task.from_dict(data) + async def assign(self, task_id: str, agent_id: Optional[str]) -> Task: + """Assign a task to an agent (or ``None`` to unassign).""" + data = await self._http.put( + f"/tasks/{task_id}", body={"assigned_to_agent_id": agent_id} + ) + return Task.from_dict(data) + + async def chain(self, task_id: str) -> DelegationChain: + """Get the full parent/child delegation chain for a task.""" + data = await self._http.get(f"/tasks/{task_id}/chain") + return DelegationChain.from_dict(data) + + async def update_context( + self, task_id: str, context: dict[str, Any] + ) -> dict[str, Any]: + """Deep-merge keys into a task's persistent context blob. + + Existing keys are preserved; supplied keys are added or overwritten. + """ + data = await self._http.patch(f"/tasks/{task_id}/context", body=context) + if isinstance(data, dict) and "content" in data and "id" in data: + raw_ctx = data.get("context") or {} + if isinstance(raw_ctx, str): + import json as _json + try: + raw_ctx = _json.loads(raw_ctx) if raw_ctx.strip() else {} + except Exception: + raw_ctx = {} + return raw_ctx if isinstance(raw_ctx, dict) else {} + return data if isinstance(data, dict) else {} + + async def find_duplicates( + self, content: str, *, threshold: Optional[float] = None + ) -> DedupResult: + """Check whether content is similar to existing open tasks.""" + body: dict[str, Any] = {"content": content} + if threshold is not None: + body["threshold"] = threshold + data = await self._http.post("/tasks/dedup", body=body) + return DedupResult.from_dict(data) + async def add_comment(self, task_id: str, content: str) -> Comment: """Add a comment to a task.""" data = await self._http.post(f"/tasks/{task_id}/comments", body={"content": content}) @@ -368,7 +432,17 @@ async def me(self) -> dict[str, Any]: return await self._http.get("/agent/me") # type: ignore[no-any-return] async def usage(self) -> dict[str, Any]: - """Get API usage information.""" + """Get quota and rate-limit information for the current plan. + + Hosted API only (``api.delega.dev``). Self-hosted deployments + will raise :class:`DelegaError` before making a request. + """ + if self._http.path_prefix != "/v1": + raise DelegaError( + "usage() is only available on the hosted Delega API " + "(api.delega.dev). Self-hosted deployments do not expose " + "a usage endpoint." + ) return await self._http.get("/usage") # type: ignore[no-any-return] async def aclose(self) -> None: diff --git a/src/delega/client.py b/src/delega/client.py index 0ba254e..92420dd 100644 --- a/src/delega/client.py +++ b/src/delega/client.py @@ -7,7 +7,14 @@ from ._http import HTTPClient from .exceptions import DelegaError -from .models import Agent, Comment, Project, Task +from .models import ( + Agent, + Comment, + DedupResult, + DelegationChain, + Project, + Task, +) _DEFAULT_BASE_URL = "https://api.delega.dev" @@ -153,23 +160,121 @@ def delegate( *, description: Optional[str] = None, priority: Optional[int] = None, + project_id: Optional[str] = None, + labels: Optional[list[str]] = None, + due_date: Optional[str] = None, + assigned_to_agent_id: Optional[str] = None, ) -> Task: - """Create a delegated sub-task under a parent task. + """Create a delegated child task under a parent. + + The parent's ``status`` flips to ``"delegated"``. Use this — not + ``assign()`` — for multi-agent handoffs so the parent/child + accountability chain is recorded (inspectable via ``chain()``). Args: parent_task_id: The parent task identifier. - content: The sub-task content/title. + content: The child task content/title. description: Optional longer description. - priority: Optional priority level. + priority: Optional priority level (1-4). + project_id: Optional project to attach the child to. + labels: Optional labels for the child. + due_date: Optional due date (YYYY-MM-DD). + assigned_to_agent_id: Optional agent to assign the child to. """ body: dict[str, Any] = {"content": content} if description is not None: body["description"] = description if priority is not None: body["priority"] = priority + if project_id is not None: + body["project_id"] = project_id + if labels is not None: + body["labels"] = labels + if due_date is not None: + body["due_date"] = due_date + if assigned_to_agent_id is not None: + body["assigned_to_agent_id"] = assigned_to_agent_id data = self._http.post(f"/tasks/{parent_task_id}/delegate", body=body) return Task.from_dict(data) + def assign(self, task_id: str, agent_id: Optional[str]) -> Task: + """Assign a task to an agent (or ``None`` to unassign). + + For multi-agent handoffs where you want the parent/child chain + recorded, use ``delegate()`` instead — ``assign()`` only updates + the assignee on an existing task. + + Args: + task_id: The task identifier. + agent_id: The agent identifier, or ``None`` to unassign. + """ + data = self._http.put( + f"/tasks/{task_id}", body={"assigned_to_agent_id": agent_id} + ) + return Task.from_dict(data) + + def chain(self, task_id: str) -> DelegationChain: + """Get the full parent/child delegation chain for a task. + + Normalizes hosted (``{root_id}``) vs self-hosted (``{root: Task}``) + response shapes so ``DelegationChain.root_id`` is always populated. + + Args: + task_id: Any task identifier in the chain. + """ + data = self._http.get(f"/tasks/{task_id}/chain") + return DelegationChain.from_dict(data) + + def update_context( + self, task_id: str, context: dict[str, Any] + ) -> dict[str, Any]: + """Deep-merge keys into a task's persistent context blob. + + Existing keys are preserved; supplied keys are added or overwritten. + Use this to pass shared state between delegated agents instead of + re-describing context in task descriptions. + + Args: + task_id: The task identifier. + context: Keys to merge into the existing context. + + Returns: + The merged context dict. + """ + # Hosted returns the bare merged context dict; self-hosted returns + # the full Task with a ``context`` field. Normalize to always + # return the merged context. + data = self._http.patch(f"/tasks/{task_id}/context", body=context) + if isinstance(data, dict) and "content" in data and "id" in data: + # Looks like a full Task. + raw_ctx = data.get("context") or {} + if isinstance(raw_ctx, str): + import json as _json + try: + raw_ctx = _json.loads(raw_ctx) if raw_ctx.strip() else {} + except Exception: + raw_ctx = {} + return raw_ctx if isinstance(raw_ctx, dict) else {} + return data if isinstance(data, dict) else {} + + def find_duplicates( + self, content: str, *, threshold: Optional[float] = None + ) -> DedupResult: + """Check whether content is similar to existing open tasks. + + Call before ``create()`` to avoid redundant work. Uses Jaccard + similarity against open tasks. + + Args: + content: The proposed task content to check. + threshold: Similarity threshold 0-1 (default 0.6 server-side). + """ + body: dict[str, Any] = {"content": content} + if threshold is not None: + body["threshold"] = threshold + data = self._http.post("/tasks/dedup", body=body) + return DedupResult.from_dict(data) + def add_comment(self, task_id: str, content: str) -> Comment: """Add a comment to a task. @@ -386,9 +491,21 @@ def me(self) -> dict[str, Any]: return self._http.get("/agent/me") # type: ignore[no-any-return] def usage(self) -> dict[str, Any]: - """Get API usage information. + """Get quota and rate-limit information for the current plan. + + Hosted API only (``api.delega.dev``). Self-hosted deployments + will raise :class:`DelegaError` before making a request. Returns: - Dictionary with usage statistics. + Dict with ``plan``, ``task_count_month``, ``task_limit``, + ``reset_date``, ``agent_count``, ``agent_limit``, + ``webhook_count``, ``webhook_limit``, ``project_count``, + ``project_limit``, ``rate_limit_rpm``, ``max_content_chars``. """ - return self._http.get("/stats") # type: ignore[no-any-return] + if self._http.path_prefix != "/v1": + raise DelegaError( + "usage() is only available on the hosted Delega API " + "(api.delega.dev). Self-hosted deployments do not expose " + "a usage endpoint." + ) + return self._http.get("/usage") # type: ignore[no-any-return] diff --git a/src/delega/models.py b/src/delega/models.py index 9e509d4..f65107b 100644 --- a/src/delega/models.py +++ b/src/delega/models.py @@ -19,12 +19,29 @@ class Task: completed: bool = False project_id: Optional[str] = None parent_id: Optional[str] = None + parent_task_id: Optional[str] = None + root_task_id: Optional[str] = None + delegation_depth: int = 0 + status: Optional[str] = None + assigned_to_agent_id: Optional[str] = None + created_by_agent_id: Optional[str] = None + completed_by_agent_id: Optional[str] = None + context: Optional[dict[str, Any]] = None created_at: Optional[str] = None updated_at: Optional[str] = None @classmethod def from_dict(cls, data: dict[str, Any]) -> Task: """Create a Task from an API response dictionary.""" + # `context` ships as a JSON-encoded string on hosted (D1/SQLite) + # and a dict on self-hosted (SQLAlchemy JSON). Normalize to dict. + import json as _json + raw_ctx = data.get("context") + if isinstance(raw_ctx, str): + try: + raw_ctx = _json.loads(raw_ctx) if raw_ctx.strip() else None + except Exception: + raw_ctx = None return cls( id=data["id"], content=data["content"], @@ -35,6 +52,14 @@ def from_dict(cls, data: dict[str, Any]) -> Task: completed=data.get("completed", False), project_id=data.get("project_id"), parent_id=data.get("parent_id"), + parent_task_id=data.get("parent_task_id"), + root_task_id=data.get("root_task_id"), + delegation_depth=data.get("delegation_depth", 0) or 0, + status=data.get("status"), + assigned_to_agent_id=data.get("assigned_to_agent_id"), + created_by_agent_id=data.get("created_by_agent_id"), + completed_by_agent_id=data.get("completed_by_agent_id"), + context=raw_ctx if isinstance(raw_ctx, dict) else None, created_at=data.get("created_at"), updated_at=data.get("updated_at"), ) @@ -108,3 +133,75 @@ def from_dict(cls, data: dict[str, Any]) -> Project: created_at=data.get("created_at"), updated_at=data.get("updated_at"), ) + + +@dataclass +class DelegationChain: + """The full parent/child delegation chain for a task. + + Normalized across hosted (returns ``root_id: str``) and self-hosted + (returns ``root: Task``) response shapes — the client layer ensures + ``root_id`` is always populated. + """ + + root_id: str + chain: list[Task] = field(default_factory=list) + depth: int = 0 + completed_count: int = 0 + total_count: int = 0 + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> DelegationChain: + """Create a DelegationChain from an API response dictionary.""" + # Hosted: {root_id, chain, ...}. Self-hosted: {root: Task, chain, ...}. + root_id = data.get("root_id") + if root_id is None: + root = data.get("root") + if isinstance(root, dict): + root_id = root.get("id") + chain_raw = data.get("chain") or [] + return cls( + root_id=str(root_id) if root_id is not None else "", + chain=[Task.from_dict(t) for t in chain_raw if isinstance(t, dict)], + depth=data.get("depth", 0) or 0, + completed_count=data.get("completed_count", 0) or 0, + total_count=data.get("total_count", 0) or 0, + ) + + +@dataclass +class DuplicateMatch: + """A single duplicate-match result from ``find_duplicates``.""" + + task_id: str + content: str + score: float + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> DuplicateMatch: + """Create a DuplicateMatch from an API response dictionary.""" + return cls( + task_id=str(data["task_id"]), + content=data["content"], + score=float(data.get("score", 0.0)), + ) + + +@dataclass +class DedupResult: + """Result of calling ``tasks.find_duplicates``.""" + + has_duplicates: bool + matches: list[DuplicateMatch] = field(default_factory=list) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> DedupResult: + """Create a DedupResult from an API response dictionary.""" + return cls( + has_duplicates=bool(data.get("has_duplicates", False)), + matches=[ + DuplicateMatch.from_dict(m) + for m in (data.get("matches") or []) + if isinstance(m, dict) + ], + ) diff --git a/tests/test_client.py b/tests/test_client.py index c03696e..a2ec1e5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -11,12 +11,14 @@ from delega import ( Agent, Comment, + DedupResult, Delega, DelegaAPIError, DelegaAuthError, DelegaError, DelegaNotFoundError, DelegaRateLimitError, + DelegationChain, Project, Task, ) @@ -233,6 +235,128 @@ def test_list_comments(self, mock_urlopen: MagicMock) -> None: self.assertEqual(len(comments), 2) self.assertIsInstance(comments[0], Comment) + # ── 1.2.0 coordination methods ── + + @patch("urllib.request.urlopen") + def test_delegate_task_with_assignee(self, mock_urlopen: MagicMock) -> None: + mock_urlopen.return_value = _mock_response({ + "id": "t_child", + "content": "Child", + "parent_task_id": "t1", + "root_task_id": "t1", + "delegation_depth": 1, + "status": "open", + "assigned_to_agent_id": "a2", + }) + task = self.client.tasks.delegate( + "t1", "Child", assigned_to_agent_id="a2", priority=2 + ) + self.assertEqual(task.parent_task_id, "t1") + self.assertEqual(task.delegation_depth, 1) + self.assertEqual(task.assigned_to_agent_id, "a2") + request = mock_urlopen.call_args[0][0] + self.assertTrue(request.full_url.endswith("/v1/tasks/t1/delegate")) + body = json.loads(request.data.decode("utf-8")) + self.assertEqual(body["assigned_to_agent_id"], "a2") + self.assertEqual(body["priority"], 2) + + @patch("urllib.request.urlopen") + def test_assign_task(self, mock_urlopen: MagicMock) -> None: + mock_urlopen.return_value = _mock_response({ + "id": "t1", + "content": "x", + "assigned_to_agent_id": "a5", + }) + task = self.client.tasks.assign("t1", "a5") + self.assertEqual(task.assigned_to_agent_id, "a5") + request = mock_urlopen.call_args[0][0] + self.assertTrue(request.full_url.endswith("/v1/tasks/t1")) + self.assertEqual(request.get_method(), "PUT") + body = json.loads(request.data.decode("utf-8")) + self.assertEqual(body, {"assigned_to_agent_id": "a5"}) + + @patch("urllib.request.urlopen") + def test_assign_task_unassign(self, mock_urlopen: MagicMock) -> None: + mock_urlopen.return_value = _mock_response({"id": "t1", "content": "x"}) + self.client.tasks.assign("t1", None) + body = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + self.assertIsNone(body["assigned_to_agent_id"]) + + @patch("urllib.request.urlopen") + def test_chain_hosted_shape(self, mock_urlopen: MagicMock) -> None: + # Hosted returns {root_id, chain, ...}. + mock_urlopen.return_value = _mock_response({ + "root_id": "abc", + "chain": [{"id": "abc", "content": "root", "delegation_depth": 0}], + "depth": 0, + "completed_count": 0, + "total_count": 1, + }) + chain = self.client.tasks.chain("abc") + self.assertIsInstance(chain, DelegationChain) + self.assertEqual(chain.root_id, "abc") + self.assertEqual(len(chain.chain), 1) + self.assertEqual(chain.chain[0].id, "abc") + + @patch("urllib.request.urlopen") + def test_chain_self_hosted_shape(self, mock_urlopen: MagicMock) -> None: + # Self-hosted returns {root: Task, chain, ...} with no root_id. + mock_urlopen.return_value = _mock_response({ + "root": {"id": 42, "content": "root"}, + "chain": [{"id": 42, "content": "root", "delegation_depth": 0}], + "depth": 0, + "completed_count": 0, + "total_count": 1, + }) + chain = self.client.tasks.chain("42") + # Client layer normalizes to root_id. + self.assertEqual(chain.root_id, "42") + + @patch("urllib.request.urlopen") + def test_update_context_hosted_bare_dict(self, mock_urlopen: MagicMock) -> None: + # Hosted returns the merged context dict. + mock_urlopen.return_value = _mock_response({"step": "done", "count": 2}) + merged = self.client.tasks.update_context("t1", {"count": 2}) + self.assertEqual(merged, {"step": "done", "count": 2}) + request = mock_urlopen.call_args[0][0] + self.assertTrue(request.full_url.endswith("/v1/tasks/t1/context")) + self.assertEqual(request.get_method(), "PATCH") + + @patch("urllib.request.urlopen") + def test_update_context_self_hosted_full_task(self, mock_urlopen: MagicMock) -> None: + # Self-hosted returns the full Task; we extract just the context. + mock_urlopen.return_value = _mock_response({ + "id": 42, + "content": "x", + "completed": False, + "context": {"step": "done", "count": 2}, + }) + merged = self.client.tasks.update_context("42", {"count": 2}) + self.assertEqual(merged, {"step": "done", "count": 2}) + + @patch("urllib.request.urlopen") + def test_find_duplicates(self, mock_urlopen: MagicMock) -> None: + mock_urlopen.return_value = _mock_response({ + "has_duplicates": True, + "matches": [ + {"task_id": "abc", "content": "research pricing", "score": 0.85}, + ], + }) + result = self.client.tasks.find_duplicates("Research pricing", threshold=0.7) + self.assertIsInstance(result, DedupResult) + self.assertTrue(result.has_duplicates) + self.assertEqual(len(result.matches), 1) + self.assertEqual(result.matches[0].score, 0.85) + body = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + self.assertEqual(body, {"content": "Research pricing", "threshold": 0.7}) + + @patch("urllib.request.urlopen") + def test_find_duplicates_no_matches(self, mock_urlopen: MagicMock) -> None: + mock_urlopen.return_value = _mock_response({"has_duplicates": False, "matches": []}) + result = self.client.tasks.find_duplicates("unique content") + self.assertFalse(result.has_duplicates) + self.assertEqual(result.matches, []) + class TestAgentsMethods(unittest.TestCase): def setUp(self) -> None: @@ -330,10 +454,29 @@ def test_me(self, mock_urlopen: MagicMock) -> None: self.assertTrue(request.full_url.endswith("/v1/agent/me")) @patch("urllib.request.urlopen") - def test_usage(self, mock_urlopen: MagicMock) -> None: - mock_urlopen.return_value = _mock_response({"requests": 42}) + def test_usage_hosted(self, mock_urlopen: MagicMock) -> None: + mock_urlopen.return_value = _mock_response({ + "plan": "free", + "task_count_month": 42, + "task_limit": 1000, + "rate_limit_rpm": 60, + }) result = self.client.usage() - self.assertEqual(result["requests"], 42) + self.assertEqual(result["plan"], "free") + request = mock_urlopen.call_args[0][0] + # Post-0.2.0 bug fix: was hitting /stats, now correctly /usage. + self.assertTrue(request.full_url.endswith("/v1/usage")) + + @patch("urllib.request.urlopen") + def test_usage_self_hosted_raises_before_fetch( + self, mock_urlopen: MagicMock + ) -> None: + client = Delega(base_url="http://127.0.0.1:18890", api_key="dlg_test") + with self.assertRaises(DelegaError) as ctx: + client.usage() + self.assertIn("only available on the hosted", str(ctx.exception)) + # Critical: no HTTP call should have been made. + mock_urlopen.assert_not_called() class TestErrorHandling(unittest.TestCase):