From f8126b64ba1d5d38426b50984427e70174d0a81a Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 12:01:05 -0400 Subject: [PATCH 01/12] feat(byoc): client-side BYOC streaming + remote signer integration Adds the Python-gateway counterpart of livepeer/go-livepeer#3869: - start_byoc_job / BYOCJobRequest / BYOCJob: orchestrator selection, payment preflight, V1 BYOC job signing via /sign-byoc-job, and /process/stream/start | payment | {id}/stop lifecycle. - BYOCPaymentSession: BYOC-typed /generate-live-payment with manifestID = capability_name, timeoutSeconds, per-call RequestID, and a recurring stream-payment sender. - BasePaymentSession: shared base for LV2V (PaymentSession) and BYOC, matching the remote signer split. PaymentSession is now a slim subclass. - capabilities: add CapabilityId.BYOC (37), capability_id_from_pipeline, build_capabilities_from_queries. - errors: add PaymentRequiredError (HTTP 402 retry on stream start). - orchestrator: add _join_signer_endpoint and resolve_transcoder_http_url. - examples: byoc_text_reversal, byoc_write_text, byoc_write_frames, and a --caps PIPELINE/MODEL flag on get_orchestrator_info. Stays on the existing urllib HTTP stack; no new runtime dependencies. OIDC, the urllib->httpx refactor, and unrelated media/trickle fixes are deferred to follow-up stacked PRs. Made-with: Cursor --- examples/byoc_text_reversal.py | 164 ++++++ examples/byoc_write_frames.py | 100 ++++ examples/byoc_write_text.py | 243 +++++++++ examples/get_orchestrator_info.py | 16 + pyproject.toml | 2 +- src/livepeer_gateway/__init__.py | 14 +- src/livepeer_gateway/byoc.py | 689 ++++++++++++++++++++++++++ src/livepeer_gateway/byoc_payments.py | 128 +++++ src/livepeer_gateway/capabilities.py | 41 ++ src/livepeer_gateway/errors.py | 4 + src/livepeer_gateway/orchestrator.py | 39 ++ src/livepeer_gateway/payments_base.py | 173 +++++++ src/livepeer_gateway/remote_signer.py | 186 ++----- uv.lock | 2 +- 14 files changed, 1646 insertions(+), 155 deletions(-) create mode 100644 examples/byoc_text_reversal.py create mode 100644 examples/byoc_write_frames.py create mode 100644 examples/byoc_write_text.py create mode 100644 src/livepeer_gateway/byoc.py create mode 100644 src/livepeer_gateway/byoc_payments.py create mode 100644 src/livepeer_gateway/payments_base.py diff --git a/examples/byoc_text_reversal.py b/examples/byoc_text_reversal.py new file mode 100644 index 0000000..1d928c8 --- /dev/null +++ b/examples/byoc_text_reversal.py @@ -0,0 +1,164 @@ +import argparse +import asyncio +import json +import logging +from typing import Optional + +from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job +from livepeer_gateway.errors import LivepeerGatewayError + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run a BYOC text-reversal request through the Python gateway." + ) + parser.add_argument( + "--text", + default="hello from byoc", + help="Input text to reverse. Default: 'hello from byoc'.", + ) + parser.add_argument( + "--capability", + default="text-reversal", + help="BYOC capability name. Default: text-reversal.", + ) + parser.add_argument( + "--orchestrator", + default=None, + help="Optional orchestrator URL or comma-separated URLs.", + ) + parser.add_argument( + "--signer", + default=None, + help="Remote signer base URL.", + ) + parser.add_argument( + "--discovery", + default=None, + help="Optional discovery endpoint URL.", + ) + parser.add_argument( + "--token", + default=None, + help="Optional gateway token containing signer/discovery/orchestrator info.", + ) + parser.add_argument( + "--timeout-seconds", + type=int, + default=30, + help="Timeout encoded into the BYOC Livepeer header. Default: 30.", + ) + parser.add_argument( + "--output-grace-seconds", + type=float, + default=1.5, + help="Extra time to wait for final event/data output after stop. Default: 1.5.", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging.", + ) + return parser.parse_args() + + +def _parse_orchestrator_arg(orchestrator_arg: Optional[str]): + if orchestrator_arg is None: + return None + parts = [part.strip() for part in orchestrator_arg.split(",") if part.strip()] + if not parts: + return None + if len(parts) == 1: + return parts[0] + return parts + + +async def _amain() -> None: + args = _parse_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(levelname)s %(name)s: %(message)s", + ) + + orch_url = _parse_orchestrator_arg(args.orchestrator) + + job = None + try: + job = start_byoc_job( + orch_url, + BYOCJobRequest( + capability=args.capability, + enable_video_ingress=False, + enable_video_egress=False, + # reverse_server publishes JSON to events_url only, not data_url. + enable_data_output=False, + timeout_seconds=args.timeout_seconds, + ), + token=args.token, + signer_url=args.signer, + discovery_url=args.discovery, + ) + + print("=== BYOC text-reversal ===") + print(f"job_id: {job.job_id}") + print(f"capability: {job.capability}") + print(f"control_url: {job.control_url}") + print(f"events_url: {job.events_url}") + print() + + job.start_payment_sender() + + if job.control is None: + print("ERROR: job has no control_url; cannot send text.") + return + + result_received = asyncio.Event() + received_payload: dict = {} + + async def read_results() -> None: + if job.events is None: + print("WARN: no events channel on job; results will not be captured.") + result_received.set() + return + async for msg in job.events(): + if isinstance(msg, dict) and isinstance(msg.get("reversed"), str): + received_payload.update(msg) + result_received.set() + return + + reader_task = asyncio.create_task(read_results()) + + await asyncio.sleep(0.15) + await job.control.write({"text": args.text}) + print(f"sent: {{\"text\": {args.text!r}}}") + + try: + await asyncio.wait_for(result_received.wait(), timeout=args.timeout_seconds) + except asyncio.TimeoutError: + print("ERROR: timed out waiting for reversal result.") + return + + original = received_payload.get("original") or received_payload.get("text") or args.text + reversed_text = received_payload.get("reversed") + print(f"result: {original!r} -> {reversed_text!r}") + print("payload:", json.dumps(received_payload, indent=2, sort_keys=True)) + + stop_resp = await job.stop() + print(f"stop: status={stop_resp['status_code']}") + + await asyncio.sleep(max(0.0, args.output_grace_seconds)) + reader_task.cancel() + await asyncio.gather(reader_task, return_exceptions=True) + except LivepeerGatewayError as err: + print(f"ERROR: {err}") + finally: + if job is not None: + await job.close() + + +def main() -> None: + asyncio.run(_amain()) + + +if __name__ == "__main__": + main() diff --git a/examples/byoc_write_frames.py b/examples/byoc_write_frames.py new file mode 100644 index 0000000..ffaa19e --- /dev/null +++ b/examples/byoc_write_frames.py @@ -0,0 +1,100 @@ +import argparse +import asyncio +from fractions import Fraction + +import av + +from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job +from livepeer_gateway.errors import LivepeerGatewayError +from livepeer_gateway.media_publish import MediaPublishConfig + + +DEFAULT_CAPABILITY = "text-reversal" + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Start a BYOC stream job and publish raw frames via publish_url." + ) + p.add_argument( + "orchestrator", + nargs="?", + default=None, + help="Orchestrator (host:port). If omitted, discovery is used.", + ) + p.add_argument( + "--capability", + default=DEFAULT_CAPABILITY, + help=f"BYOC capability name (default: {DEFAULT_CAPABILITY}).", + ) + p.add_argument( + "--signer", + required=True, + help="Remote signer URL (no path). Required for BYOC job signing.", + ) + p.add_argument( + "--discovery", + default=None, + help="Discovery endpoint for orchestrators.", + ) + p.add_argument( + "--stream-id", + default=None, + help="Optional stream ID to include in BYOC request details.", + ) + p.add_argument("--width", type=int, default=320, help="Frame width (default: 320).") + p.add_argument("--height", type=int, default=180, help="Frame height (default: 180).") + p.add_argument("--fps", type=float, default=30.0, help="Frames per second (default: 30).") + p.add_argument("--count", type=int, default=90, help="Number of frames to send (default: 90).") + return p.parse_args() + + +def _solid_rgb_frame(width: int, height: int, rgb: tuple[int, int, int]) -> av.VideoFrame: + frame = av.VideoFrame(width, height, "rgb24") + r, g, b = rgb + frame.planes[0].update(bytes([r, g, b]) * (width * height)) + return frame + + +async def main() -> None: + args = _parse_args() + frame_interval = 1.0 / max(1e-6, args.fps) + + job = None + try: + job = start_byoc_job( + args.orchestrator, + BYOCJobRequest( + capability=args.capability, + stream_id=args.stream_id, + ), + signer_url=args.signer, + discovery_url=args.discovery, + ) + + print("=== BYOC stream ===") + print("job_id:", job.job_id) + print("capability:", job.capability) + print("publish_url:", job.publish_url) + print() + + media = job.start_media(MediaPublishConfig(fps=args.fps)) + job.start_payment_sender() + + time_base = Fraction(1, int(round(args.fps))) + for i in range(max(0, args.count)): + color = (i * 5) % 255 + frame = _solid_rgb_frame(args.width, args.height, (color, 0, 255 - color)) + frame.pts = i + frame.time_base = time_base + await media.write_frame(frame) + await asyncio.sleep(frame_interval) + except LivepeerGatewayError as e: + print(f"ERROR: {e}") + finally: + if job is not None: + await job.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/byoc_write_text.py b/examples/byoc_write_text.py new file mode 100644 index 0000000..512cc38 --- /dev/null +++ b/examples/byoc_write_text.py @@ -0,0 +1,243 @@ +import argparse +import asyncio +import json +from typing import Optional + +from livepeer_gateway import JSONLReader +from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job +from livepeer_gateway.errors import LivepeerGatewayError + + +DEFAULT_CAPABILITY = "text-reversal" +_DEFAULT_CONTROL_TEXTS = ("hello-byoc", "livepeer", "draw") + + +def _json_dump(data: dict) -> str: + return json.dumps(data, sort_keys=True) + + +def _render_text_result(msg: dict) -> str: + original = msg.get("original") + reversed_text = msg.get("reversed") + text = msg.get("text") + if isinstance(original, str) and isinstance(reversed_text, str): + return f"{original!r} -> {reversed_text!r}" + if isinstance(text, str) and isinstance(reversed_text, str): + return f"{text!r} -> {reversed_text!r}" + if isinstance(reversed_text, str): + return reversed_text + return _json_dump(msg) + + +def _is_text_result(msg: dict) -> bool: + return isinstance(msg.get("reversed"), str) + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Start a text-only BYOC stream and drive it entirely via control messages." + ) + p.add_argument( + "orchestrator", + nargs="?", + default=None, + help="Orchestrator (host:port). If omitted, discovery is used.", + ) + p.add_argument( + "--capability", + default=DEFAULT_CAPABILITY, + help=f"BYOC capability name (default: {DEFAULT_CAPABILITY}).", + ) + p.add_argument( + "--signer", + required=True, + help="Remote signer URL (no path). Required for BYOC job signing.", + ) + p.add_argument( + "--discovery", + default=None, + help="Discovery endpoint for orchestrators.", + ) + p.add_argument( + "--stream-id", + default=None, + help="Optional stream ID to include in BYOC request details.", + ) + p.add_argument( + "--stream-start", + default=None, + metavar="PATH_OR_URL", + help=( + "HTTP path on the orchestrator transcoder (default /ai/stream/start) or a full URL. " + "Livepeer gateway BYOC uses /process/stream/start." + ), + ) + p.add_argument( + "--stream-payment", + default=None, + metavar="PATH_OR_URL", + help=( + "Payment path (default /ai/stream/payment) or full URL; must match the server " + "that handles Livepeer-Payment for this job." + ), + ) + p.add_argument( + "--text", + action="append", + dest="control_texts", + metavar="STRING", + help=( + "Send a control JSON message {\"text\": STRING}. " + "Repeat to send multiple commands." + ), + ) + p.add_argument( + "--control-interval", + type=float, + default=1.0, + help="Seconds between successive control messages (default: 1).", + ) + p.add_argument( + "--no-control", + action="store_true", + help="Do not send any control-channel messages.", + ) + p.add_argument( + "--output-grace-seconds", + type=float, + default=1.5, + help="Extra time to wait for final event/data output after stop (default: 1.5).", + ) + return p.parse_args() + + +def _control_texts_for_run(args: argparse.Namespace) -> list[str]: + if args.no_control: + return [] + if args.control_texts: + return list(args.control_texts) + if args.capability == DEFAULT_CAPABILITY: + return list(_DEFAULT_CONTROL_TEXTS) + return [] + + +async def main() -> None: + args = _parse_args() + + job = None + try: + req_opts: dict[str, str] = {} + if args.stream_start is not None: + req_opts["stream_start_endpoint"] = args.stream_start + if args.stream_payment is not None: + req_opts["stream_payment_endpoint"] = args.stream_payment + + job = start_byoc_job( + args.orchestrator, + BYOCJobRequest( + capability=args.capability, + stream_id=args.stream_id, + enable_video_ingress=False, + enable_video_egress=False, + enable_data_output=True, + **req_opts, + ), + signer_url=args.signer, + discovery_url=args.discovery, + ) + + print("=== BYOC text-only stream ===") + print("job_id:", job.job_id) + print("capability:", job.capability) + print("control_url:", job.control_url) + print("events_url:", job.events_url) + print("data_url:", job.data_url) + print() + + job.start_payment_sender() + + texts = _control_texts_for_run(args) + expected_results = len(texts) + results_received = 0 + results_complete = asyncio.Event() + + async def print_channel(name: str, stream, *, count_results: bool = False) -> None: + nonlocal results_received + async for msg in stream: + print(f"{name}: {_render_text_result(msg)}") + if count_results and _is_text_result(msg): + results_received += 1 + if results_received >= expected_results: + results_complete.set() + + async def control_messages() -> None: + if not texts: + results_complete.set() + return + if job.control is None: + print("WARN: control messages requested but job has no control_url; skipping.") + results_complete.set() + return + interval = max(0.05, float(args.control_interval)) + await asyncio.sleep(0.15) + for i, s in enumerate(texts): + if i > 0: + await asyncio.sleep(interval) + await job.control.write({"text": s}) + print(f"control[{i}]: {{\"text\": {s!r}}}") + + control_task = asyncio.create_task(control_messages()) + output_tasks: list[asyncio.Task] = [] + primary_results_task: Optional[asyncio.Task] = None + + if job.events is not None: + event_task = asyncio.create_task( + print_channel( + "event", + job.events(), + count_results=not bool(job.data_url), + ) + ) + output_tasks.append(event_task) + if primary_results_task is None and not job.data_url: + primary_results_task = event_task + + if job.data_url: + data_task = asyncio.create_task( + print_channel( + "data", + JSONLReader(job.data_url)(), + count_results=True, + ) + ) + output_tasks.append(data_task) + primary_results_task = data_task + + stop_sent = False + try: + await control_task + if expected_results > 0 and primary_results_task is not None: + await results_complete.wait() + stop_resp = await job.stop() + stop_sent = True + print(f"stop: {_json_dump(stop_resp.get('body') or {'status_code': stop_resp['status_code']})}") + finally: + if not control_task.done(): + await control_task + if not stop_sent: + stop_resp = await job.stop() + print(f"stop: {_json_dump(stop_resp.get('body') or {'status_code': stop_resp['status_code']})}") + await asyncio.sleep(max(0.0, args.output_grace_seconds)) + for task in output_tasks: + task.cancel() + if output_tasks: + await asyncio.gather(*output_tasks, return_exceptions=True) + except LivepeerGatewayError as e: + print(f"ERROR: {e}") + finally: + if job is not None: + await job.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/get_orchestrator_info.py b/examples/get_orchestrator_info.py index a1430ad..73485b7 100644 --- a/examples/get_orchestrator_info.py +++ b/examples/get_orchestrator_info.py @@ -4,6 +4,7 @@ from typing import Any from livepeer_gateway.capabilities import ( + build_capabilities_from_queries, compute_available, format_capability, get_capacity_in_use, @@ -34,6 +35,10 @@ def _parse_args() -> argparse.Namespace: " # Signer URL\n" " python examples/get_orchestrator_info.py --signer https://signer.example.com\n" "\n" + " # Request specific capabilities (e.g. BYOC)\n" + " python examples/get_orchestrator_info.py localhost:8935 --signer https://signer.example.com --caps byoc/text-reversal\n" + " python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop byoc/my-pipeline\n" + "\n" " # JSON / JSONL output\n" " python examples/get_orchestrator_info.py localhost:8935 --format json\n" " python examples/get_orchestrator_info.py localhost:8935 --format jsonl\n" @@ -65,6 +70,13 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Enable debug logging for discovery diagnostics.", ) + p.add_argument( + "--caps", + nargs="*", + default=None, + metavar="PIPELINE/MODEL", + help="Request specific capabilities in pipeline/model form (e.g. byoc/text-reversal).", + ) p.add_argument( "--format", choices=("text", "json", "jsonl"), @@ -355,6 +367,8 @@ def main() -> None: if args.debug: logging.basicConfig(level=logging.DEBUG) + capabilities = build_capabilities_from_queries(args.caps) if args.caps else None + json_results: list[dict[str, Any]] = [] def _json_info(orch_url: str, info: Any) -> None: @@ -384,6 +398,7 @@ def _json_error(orch_url: str | None, err: Exception) -> None: signer_headers=signer_headers, discovery_url=discovery, discovery_headers=discovery_headers, + capabilities=capabilities, ) for orch_url in orch_list: @@ -392,6 +407,7 @@ def _json_error(orch_url: str | None, err: Exception) -> None: orch_url, signer_url=signer, signer_headers=signer_headers, + capabilities=capabilities, ) except LivepeerGatewayError as e: print_error(orch_url, e) diff --git a/pyproject.toml b/pyproject.toml index afa0751..5728973 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "livepeer-gateway" -version = "0.1.0" +version = "0.1.1" requires-python = ">=3.10" dependencies = [ "grpcio>=1.65.0", diff --git a/src/livepeer_gateway/__init__.py b/src/livepeer_gateway/__init__.py index 93d839c..e463c7f 100644 --- a/src/livepeer_gateway/__init__.py +++ b/src/livepeer_gateway/__init__.py @@ -1,8 +1,15 @@ +from .byoc import BYOCJob, BYOCJobRequest, start_byoc_job +from .byoc_payments import BYOCPaymentSession from .capabilities import CapabilityId, build_capabilities from .channel_reader import ChannelReader, JSONLReader from .channel_writer import ChannelWriter, JSONLWriter from .control import Control, ControlConfig, ControlMode -from .errors import LivepeerGatewayError, NoOrchestratorAvailableError, PaymentError +from .errors import ( + LivepeerGatewayError, + NoOrchestratorAvailableError, + PaymentError, + PaymentRequiredError, +) from .events import Events from .media_publish import ( AudioOutputConfig, @@ -39,6 +46,9 @@ from .trickle_subscriber import TrickleSubscriber, TrickleSubscriberStats __all__ = [ + "BYOCJob", + "BYOCJobRequest", + "BYOCPaymentSession", "Control", "ControlConfig", "ControlMode", @@ -52,6 +62,7 @@ "NoOrchestratorAvailableError", "OrchestratorRejection", "PaymentError", + "PaymentRequiredError", "MediaPublish", "MediaPublishConfig", "MediaPublishTrack", @@ -73,6 +84,7 @@ "SelectionCursor", "orchestrator_selector", "StartJobRequest", + "start_byoc_job", "start_lv2v", "start_scope", "TricklePublishError", diff --git a/src/livepeer_gateway/byoc.py b/src/livepeer_gateway/byoc.py new file mode 100644 index 0000000..aeba3c7 --- /dev/null +++ b/src/livepeer_gateway/byoc.py @@ -0,0 +1,689 @@ +from __future__ import annotations + +import asyncio +import base64 +import json +import logging +import numbers +import ssl +import uuid +from dataclasses import dataclass, field +from typing import Any, Optional, Sequence +from urllib.error import HTTPError, URLError +from urllib.parse import quote, urlparse, urlunparse +from urllib.request import Request, urlopen + +from .byoc_payments import BYOCPaymentSession +from .capabilities import CapabilityId, build_capabilities +from .control import Control +from .errors import ( + LivepeerGatewayError, + NoOrchestratorAvailableError, + OrchestratorRejection, + PaymentError, + PaymentRequiredError, + SkipPaymentCycle, +) +from .events import Events +from .media_output import LagPolicy, MediaOutput +from .media_publish import MediaPublish, MediaPublishConfig +from .orch_info import get_orch_info as _get_orch_info +from .orchestrator import _extract_error_message, resolve_transcoder_http_url +from .selection import orchestrator_selector + +_LOG = logging.getLogger(__name__) + + +def _header_get(headers: dict[str, str], key: str) -> Optional[str]: + key_lower = key.lower() + for name, value in headers.items(): + if name.lower() == key_lower: + return value + return None + + +def _field_value(obj: Any, snake: str, camel: str) -> Any: + if isinstance(obj, dict): + if snake in obj: + return obj[snake] + if camel in obj: + return obj[camel] + return None + v = getattr(obj, snake, None) + if v is not None: + return v + return getattr(obj, camel, None) + + +def _nonzero_real_scalar(val: Any) -> bool: + if val is None or isinstance(val, bool): + return False + if isinstance(val, bytes): + if not val: + return False + return int.from_bytes(val, "big") > 0 + if isinstance(val, numbers.Real): + return val != 0 + return False + + +def _orch_info_ticket_params_usable(info: Any) -> bool: + """True when ticket_params has non-zero face_value and win_prob.""" + params = getattr(info, "ticket_params", None) + if params is None: + return False + face = _field_value(params, "face_value", "faceValue") + win = _field_value(params, "win_prob", "winProb") + return _nonzero_real_scalar(face) and _nonzero_real_scalar(win) + + +def _price_info_matches_byoc(price_info: Any, capability_name: str) -> bool: + if price_info is None: + return False + capability = _field_value(price_info, "capability", "capability") + constraint = _field_value(price_info, "constraint", "constraint") + price_per_unit = _field_value(price_info, "price_per_unit", "pricePerUnit") + pixels_per_unit = _field_value(price_info, "pixels_per_unit", "pixelsPerUnit") + return ( + capability == int(CapabilityId.BYOC) + and constraint == capability_name + and _nonzero_real_scalar(price_per_unit) + and _nonzero_real_scalar(pixels_per_unit) + ) + + +def _orch_info_has_byoc_price(info: Any, capability_name: str) -> bool: + top_price = _field_value(info, "price_info", "priceInfo") + if _price_info_matches_byoc(top_price, capability_name): + return True + + caps_prices = _field_value(info, "capabilities_prices", "capabilitiesPrices") + if caps_prices is None: + return False + for price_info in caps_prices: + if _price_info_matches_byoc(price_info, capability_name): + return True + return False + + +def _orch_info_supports_byoc_payment(info: Any, capability_name: str) -> bool: + return _orch_info_ticket_params_usable(info) and _orch_info_has_byoc_price(info, capability_name) + + +def _get_payment_orch_info( + orch_url: str, + *, + signer_url: Optional[str], + signer_headers: Optional[dict[str, str]], + capabilities: Any, + capability_name: str, +) -> tuple[Any, Any]: + """ + Fetch OrchestratorInfo for BYOC payment preflight. If the capability-scoped + request comes back without usable BYOC pricing, retry without capability + filtering (some legacy orchestrators only advertise BYOC pricing on the + unfiltered path). + """ + payment_info = _get_orch_info( + orch_url, + signer_url=signer_url, + signer_headers=signer_headers, + capabilities=capabilities, + ) + if _orch_info_supports_byoc_payment(payment_info, capability_name): + return payment_info, capabilities + + legacy_info = _get_orch_info( + orch_url, + signer_url=signer_url, + signer_headers=signer_headers, + ) + if _orch_info_supports_byoc_payment(legacy_info, capability_name): + _LOG.debug( + "BYOC payment preflight: using legacy orch info response for %s", + capability_name, + ) + return legacy_info, None + + return payment_info, capabilities + + +@dataclass(frozen=True) +class BYOCJobRequest: + capability: str + request_id: Optional[str] = None + stream_id: Optional[str] = None + request: Optional[dict[str, Any]] = None + parameters: Optional[dict[str, Any]] = None + body: Optional[dict[str, Any]] = None + timeout_seconds: int = 30 + enable_video_ingress: bool = True + enable_video_egress: bool = True + enable_data_output: bool = False + # Orchestrator default in go-livepeer is POST {transcoder}/ai/stream/start; + # the gateway uses /process/stream/start. Either a path on the selected + # transcoder origin or a full http(s) URL. + stream_start_endpoint: str = "/ai/stream/start" + stream_payment_endpoint: str = "/ai/stream/payment" + + def _job_id(self) -> str: + if self.request_id and self.request_id.strip(): + return self.request_id.strip() + if self.stream_id and self.stream_id.strip(): + return self.stream_id.strip() + return uuid.uuid4().hex + + def _request_json(self, job_id: str) -> str: + payload: dict[str, Any] = {} + if self.request: + payload.update(self.request) + payload.setdefault("stream_id", self.stream_id or job_id) + return json.dumps(payload, separators=(",", ":")) + + def _parameters_json(self) -> str: + payload: dict[str, Any] = {} + if self.parameters: + payload.update(self.parameters) + payload.setdefault("enable_video_ingress", self.enable_video_ingress) + payload.setdefault("enable_video_egress", self.enable_video_egress) + payload.setdefault("enable_data_output", self.enable_data_output) + return json.dumps(payload, separators=(",", ":")) + + def _body(self, job_id: str) -> dict[str, Any]: + payload: dict[str, Any] = {} + if self.body: + payload.update(self.body) + payload.setdefault("stream_id", self.stream_id or job_id) + return payload + + +@dataclass(frozen=True) +class BYOCJob: + raw: dict[str, Any] + job_id: str + capability: str + publish_url: Optional[str] = None + subscribe_url: Optional[str] = None + control_url: Optional[str] = None + events_url: Optional[str] = None + data_url: Optional[str] = None + control: Optional[Control] = None + events: Optional[Events] = None + _media: Optional[MediaPublish] = field(default=None, repr=False, compare=False) + _payment_session: Optional[BYOCPaymentSession] = field(default=None, repr=False, compare=False) + _signed_job_header: Optional[str] = field(default=None, repr=False, compare=False) + _payment_task: Optional[asyncio.Task] = field(default=None, repr=False, compare=False) + _stream_stop_url: Optional[str] = field(default=None, repr=False, compare=False) + _stop_timeout_s: float = field(default=30.0, repr=False, compare=False) + + @staticmethod + def from_start_response( + data: dict[str, Any], + *, + job_id: str, + capability: str, + payment_session: Optional[BYOCPaymentSession] = None, + signed_job_header: Optional[str] = None, + stream_stop_url: Optional[str] = None, + stop_timeout_s: float = 30.0, + ) -> "BYOCJob": + headers = data.get("headers") + if not isinstance(headers, dict): + headers = {} + + control_url = _header_get(headers, "X-Control-Url") + events_url = _header_get(headers, "X-Events-Url") + publish_url = _header_get(headers, "X-Publish-Url") + subscribe_url = _header_get(headers, "X-Subscribe-Url") + data_url = _header_get(headers, "X-Data-Url") + + return BYOCJob( + raw=data, + job_id=job_id, + capability=capability, + publish_url=publish_url, + subscribe_url=subscribe_url, + control_url=control_url, + events_url=events_url, + data_url=data_url, + control=Control(control_url) if control_url else None, + events=Events(events_url) if events_url else None, + _payment_session=payment_session, + _signed_job_header=signed_job_header, + _stream_stop_url=stream_stop_url, + _stop_timeout_s=stop_timeout_s, + ) + + def start_media(self, config: MediaPublishConfig) -> MediaPublish: + if not self.publish_url: + raise LivepeerGatewayError("No publish_url present on this BYOC job") + if self._media is None: + media = MediaPublish(self.publish_url, config=config) + object.__setattr__(self, "_media", media) + return self._media + + def media_output( + self, + *, + start_seq: int = -2, + max_retries: int = 5, + max_segment_bytes: Optional[int] = None, + connection_close: bool = False, + chunk_size: int = 64 * 1024, + max_segments: int = 5, + on_lag: LagPolicy = LagPolicy.LATEST, + ) -> MediaOutput: + if not self.subscribe_url: + raise LivepeerGatewayError("No subscribe_url present on this BYOC job") + return MediaOutput( + self.subscribe_url, + start_seq=start_seq, + max_retries=max_retries, + max_segment_bytes=max_segment_bytes, + connection_close=connection_close, + chunk_size=chunk_size, + max_segments=max_segments, + on_lag=on_lag, + ) + + @property + def payment_session(self) -> Optional[BYOCPaymentSession]: + return self._payment_session + + def start_payment_sender(self, *, interval_s: float = 5.0) -> Optional[asyncio.Task]: + if getattr(self, "_payment_task", None) is not None: + return self._payment_task + if not self._payment_session or not self._signed_job_header: + return None + try: + loop = asyncio.get_running_loop() + except RuntimeError: + _LOG.warning( + "No running event loop; BYOC payment sender not started. " + "Call job.start_payment_sender() from async code to enable." + ) + return None + + task = loop.create_task( + _byoc_payment_sender( + self._signed_job_header, + self._payment_session, + interval_s=interval_s, + ) + ) + object.__setattr__(self, "_payment_task", task) + return task + + async def stop(self) -> dict[str, Any]: + if not self._stream_stop_url: + raise LivepeerGatewayError("No stream stop URL present on this BYOC job") + if not self._signed_job_header: + raise LivepeerGatewayError("No signed job header present on this BYOC job") + return await asyncio.to_thread( + _post_byoc_stop, + self._stream_stop_url, + payload={"stream_id": self.job_id}, + headers={"Livepeer": self._signed_job_header}, + timeout=self._stop_timeout_s, + ) + + async def close(self) -> None: + tasks = [] + payment_task = getattr(self, "_payment_task", None) + if payment_task is not None and not payment_task.done(): + payment_task.cancel() + tasks.append(payment_task) + if self.control is not None: + tasks.append(self.control.close()) + if self._media is not None: + tasks.append(self._media.close()) + if not tasks: + return + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, BaseException) and not isinstance(result, asyncio.CancelledError): + raise result + + +async def _byoc_payment_sender( + signed_job_header: str, + session: BYOCPaymentSession, + *, + interval_s: float, +) -> None: + try: + await asyncio.to_thread(session.send_stream_payment, signed_job_header) + except SkipPaymentCycle as e: + _LOG.debug("BYOC payment sender: first payment skipped (%s)", e) + except Exception: + _LOG.exception("BYOC payment sender: first immediate payment failed") + + while True: + await asyncio.sleep(interval_s) + try: + await asyncio.to_thread(session.send_stream_payment, signed_job_header) + except SkipPaymentCycle as e: + _LOG.debug("BYOC payment sender: skipping payment cycle (%s)", e) + except Exception: + _LOG.exception("BYOC payment sender failed") + + +def _get_start_payment_headers( + session: BYOCPaymentSession, + *, + payment_info: Any, + capability_name: str, + allow_skip: bool, +) -> tuple[Optional[str], str]: + payment_header: Optional[str] = None + segment_header = "" + + try: + payment = session.get_payment() + payment_header = payment.payment + segment_header = payment.seg_creds or "" + except SkipPaymentCycle as pay_skip: + if not allow_skip: + raise LivepeerGatewayError( + "BYOC start endpoint returned HTTP 402 payment required, " + "but the signer skipped payment generation; cannot retry start " + "without a payment ticket." + ) from pay_skip + if not _orch_info_ticket_params_usable(payment_info): + raise LivepeerGatewayError( + "BYOC signer returned skip-payment, but OrchestratorInfo ticket_params " + "are missing or zero (face_value and win_prob are required). " + "Ticket expected value (EV) cannot be computed; refusing to start." + ) from pay_skip + _LOG.debug("BYOC signer returned skip-payment response on start (%s)", pay_skip) + except (PaymentError, LivepeerGatewayError) as pay_err: + msg = str(pay_err) + if "priceinfo" in msg.lower() or "price" in msg.lower(): + raise LivepeerGatewayError( + f"BYOC signer pricing error: the remote signer rejected payment generation " + f"(likely missing or zero priceInfo in OrchestratorInfo for capability " + f"'{capability_name}'). Ensure the orchestrator advertises " + f"capability-specific pricing for BYOC/{capability_name}. " + f"Signer detail: {msg}" + ) from pay_err + raise + + if payment_header is not None and not payment_header: + raise LivepeerGatewayError( + f"BYOC signer returned empty payment ticket for capability " + f"'{capability_name}'. Check signer configuration and " + f"orchestrator pricing for BYOC/{capability_name}." + ) + + return payment_header, segment_header + + +def _post_byoc_json( + url: str, + *, + payload: dict[str, Any], + headers: dict[str, str], + timeout: float, + op: str, +) -> dict[str, Any]: + """ + POST a JSON payload to a BYOC endpoint and return a structured response. + + On HTTP 402, raises ``PaymentRequiredError``. Other HTTP errors raise + ``LivepeerGatewayError``. The success response always includes + ``{"status_code", "headers", "body"}``; ``body`` is decoded JSON when + possible, raw text otherwise. + """ + body_bytes = json.dumps(payload).encode("utf-8") + req_headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "User-Agent": "livepeer-python-gateway/0.1", + } + req_headers.update(headers) + + req = Request(url, data=body_bytes, headers=req_headers, method="POST") + ssl_ctx = ssl._create_unverified_context() + try: + with urlopen(req, timeout=timeout, context=ssl_ctx) as resp: + raw_body = resp.read().decode("utf-8", errors="replace") + response_headers = {k: v for k, v in resp.headers.items()} + status = resp.status + except HTTPError as e: + body = _extract_error_message(e) + body_part = f"; body={body!r}" if body else "" + if e.code == 402: + raise PaymentRequiredError( + f"HTTP BYOC {op} error: HTTP 402 from endpoint (url={url}){body_part}" + ) from e + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: HTTP {e.code} from endpoint (url={url}){body_part}" + ) from e + except ConnectionRefusedError as e: + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: connection refused (is the server running? is the host/port correct?) (url={url})" + ) from e + except URLError as e: + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" + ) from e + except LivepeerGatewayError: + raise + except Exception as e: + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: unexpected error: {e.__class__.__name__}: {e} (url={url})" + ) from e + + parsed_body: Any = None + if raw_body.strip(): + try: + parsed_body = json.loads(raw_body) + except Exception: + parsed_body = raw_body + return {"status_code": status, "headers": response_headers, "body": parsed_body} + + +def _post_byoc_start( + url: str, + *, + payload: dict[str, Any], + headers: dict[str, str], + timeout: float, +) -> dict[str, Any]: + return _post_byoc_json(url, payload=payload, headers=headers, timeout=timeout, op="start") + + +def _post_byoc_stop( + url: str, + *, + payload: dict[str, Any], + headers: dict[str, str], + timeout: float, +) -> dict[str, Any]: + return _post_byoc_json(url, payload=payload, headers=headers, timeout=timeout, op="stop") + + +def _derive_stream_stop_url(start_url: str, job_id: str) -> str: + parsed = urlparse(start_url) + path = parsed.path or "" + quoted_job_id = quote(job_id, safe="") + if path.endswith("/process/stream/start"): + path = path[: -len("/process/stream/start")] + f"/process/stream/{quoted_job_id}/stop" + elif path.endswith("/start"): + path = path[: -len("/start")] + "/stop" + else: + raise ValueError(f"Cannot derive stream stop URL from start URL: {start_url!r}") + return urlunparse(parsed._replace(path=path)) + + +def start_byoc_job( + orch_url: Optional[Sequence[str] | str], + req: BYOCJobRequest, + *, + token: Optional[str] = None, + signer_url: Optional[str] = None, + signer_headers: Optional[dict[str, str]] = None, + discovery_url: Optional[str] = None, + discovery_headers: Optional[dict[str, str]] = None, +) -> BYOCJob: + """ + Start a BYOC job through the Python gateway. + + Mirrors the LV2V ``start_lv2v`` shape: an orchestrator is selected (token + orchestrators -> explicit orch_url -> token discovery -> explicit discovery + -> signer-derived discovery), payment is preflighted, the job credential is + signed via ``POST /sign-byoc-job``, and the job is started against + ``stream_start_endpoint`` (default ``/ai/stream/start``). + """ + if not isinstance(req.capability, str) or not req.capability.strip(): + raise LivepeerGatewayError("start_byoc_job requires a non-empty capability") + if not isinstance(req.stream_start_endpoint, str) or not req.stream_start_endpoint.strip(): + raise LivepeerGatewayError("BYOCJobRequest.stream_start_endpoint must be non-empty") + if not isinstance(req.stream_payment_endpoint, str) or not req.stream_payment_endpoint.strip(): + raise LivepeerGatewayError("BYOCJobRequest.stream_payment_endpoint must be non-empty") + + resolved_signer_url = signer_url + resolved_signer_headers = signer_headers + resolved_discovery_url = discovery_url + resolved_discovery_headers = discovery_headers + if token is not None: + from .token import parse_token + + token_data = parse_token(token) + if resolved_signer_url is None: + resolved_signer_url = token_data.get("signer") + if resolved_signer_headers is None: + resolved_signer_headers = token_data.get("signer_headers") + if resolved_discovery_url is None: + resolved_discovery_url = token_data.get("discovery") + if resolved_discovery_headers is None: + resolved_discovery_headers = token_data.get("discovery_headers") + + capabilities = build_capabilities(CapabilityId.BYOC, req.capability.strip()) + cursor = orchestrator_selector( + orch_url, + signer_url=resolved_signer_url, + signer_headers=resolved_signer_headers, + discovery_url=resolved_discovery_url, + discovery_headers=resolved_discovery_headers, + capabilities=capabilities, + ) + + start_rejections: list[OrchestratorRejection] = [] + while True: + try: + selected_url, info = cursor.next() + except NoOrchestratorAvailableError as e: + all_rejections = list(e.rejections) + start_rejections + if all_rejections: + raise NoOrchestratorAvailableError( + f"All orchestrators failed ({len(all_rejections)} tried)", + rejections=all_rejections, + ) from None + raise + + try: + payment_info, payment_capabilities = _get_payment_orch_info( + selected_url, + signer_url=resolved_signer_url, + signer_headers=resolved_signer_headers, + capabilities=capabilities, + capability_name=req.capability.strip(), + ) + + session = BYOCPaymentSession( + resolved_signer_url, + payment_info, + capability_name=req.capability.strip(), + signer_headers=resolved_signer_headers, + capabilities=payment_capabilities, + stream_payment_endpoint=req.stream_payment_endpoint, + ) + + job_id = req._job_id() + request_json = req._request_json(job_id) + parameters_json = req._parameters_json() + timeout_seconds = max(1, int(req.timeout_seconds)) + signed = session.sign_byoc_job( + job_id=job_id, + capability=req.capability.strip(), + request=request_json, + parameters=parameters_json, + timeout_seconds=timeout_seconds, + ) + + signed_payload = { + "id": job_id, + "request": request_json, + "parameters": parameters_json, + "capability": req.capability.strip(), + "sender": signed.sender, + "sig": signed.signature, + "timeout_seconds": timeout_seconds, + } + signed_job_header = base64.b64encode( + json.dumps(signed_payload, separators=(",", ":")).encode("utf-8") + ).decode("ascii") + + capability_name = req.capability.strip() + payment_header, segment_header = _get_start_payment_headers( + session, + payment_info=payment_info, + capability_name=capability_name, + allow_skip=True, + ) + headers = {"Livepeer": signed_job_header} + if payment_header: + headers["Livepeer-Payment"] = payment_header + headers["Livepeer-Segment"] = segment_header + start_url = resolve_transcoder_http_url(info.transcoder, req.stream_start_endpoint) + try: + stop_url = _derive_stream_stop_url(start_url, job_id) + except ValueError as e: + raise LivepeerGatewayError(str(e)) from e + start_payload = req._body(job_id) + start_timeout = float(timeout_seconds) + try: + data = _post_byoc_start( + start_url, + payload=start_payload, + headers=headers, + timeout=start_timeout, + ) + except PaymentRequiredError: + payment_header, segment_header = _get_start_payment_headers( + session, + payment_info=payment_info, + capability_name=capability_name, + allow_skip=False, + ) + headers = { + "Livepeer": signed_job_header, + "Livepeer-Payment": payment_header, + "Livepeer-Segment": segment_header, + } + _LOG.debug("BYOC start returned HTTP 402; retrying with a fresh payment ticket") + data = _post_byoc_start( + start_url, + payload=start_payload, + headers=headers, + timeout=start_timeout, + ) + job = BYOCJob.from_start_response( + data, + job_id=job_id, + capability=capability_name, + payment_session=session, + signed_job_header=signed_job_header, + stream_stop_url=stop_url, + stop_timeout_s=float(timeout_seconds), + ) + job.start_payment_sender() + return job + except LivepeerGatewayError as e: + _LOG.debug( + "start_byoc_job candidate failed, trying fallback if available: %s (%s)", + selected_url, + str(e), + ) + start_rejections.append(OrchestratorRejection(url=selected_url, reason=str(e))) diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py new file mode 100644 index 0000000..9246b0b --- /dev/null +++ b/src/livepeer_gateway/byoc_payments.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from . import lp_rpc_pb2 +from .errors import PaymentError +from .payments_base import BasePaymentSession, GetPaymentResponse + + +@dataclass(frozen=True) +class SignedBYOCJob: + sender: str + signature: str + + +class BYOCPaymentSession(BasePaymentSession): + """ + BYOC payment session. + + - Signs job credentials via the remote signer's ``POST /sign-byoc-job`` + endpoint (V1 binary signing format, server-side flatten). + - Generates time-based BYOC payments via ``/generate-live-payment`` + (``type: "byoc"``, ``manifestID = capability_name``). + - Sends recurring stream payments to the orchestrator's + ``/process/stream/payment`` endpoint (or operator override). + """ + + def __init__( + self, + signer_url: Optional[str], + info: lp_rpc_pb2.OrchestratorInfo, + *, + capability_name: str, + signer_headers: Optional[dict[str, str]] = None, + capabilities: Optional[lp_rpc_pb2.Capabilities] = None, + max_refresh_retries: int = 3, + stream_payment_endpoint: str = "/ai/stream/payment", + ) -> None: + if not isinstance(capability_name, str) or not capability_name.strip(): + raise PaymentError("capability_name must be a non-empty string") + if not isinstance(stream_payment_endpoint, str) or not stream_payment_endpoint.strip(): + raise PaymentError("stream_payment_endpoint must be a non-empty string") + + self._capability_name = capability_name.strip() + self._stream_payment_endpoint = stream_payment_endpoint.strip() + super().__init__( + signer_url, + info, + signer_headers=signer_headers, + payment_type="byoc", + capabilities=capabilities, + max_refresh_retries=max_refresh_retries, + ) + # Use capability name as manifest id so balance tracking is shared + # across requests for the same BYOC capability (matches go-livepeer). + self.set_manifest_id(self._capability_name) + + def _offchain_payment(self) -> GetPaymentResponse: + return GetPaymentResponse(payment="", seg_creds="") + + def sign_byoc_job( + self, + job_id: str, + capability: str, + request: str, + parameters: str, + timeout_seconds: int, + ) -> SignedBYOCJob: + """ + Ask the remote signer to sign a BYOC job credential using the V1 + binary signing format. The signer is authoritative for the wire layout; + we only forward the structured fields. + """ + if not self._signer_url: + raise PaymentError("sign_byoc_job requires signer_url") + if not isinstance(request, str): + raise PaymentError("request must be a JSON string") + if not isinstance(parameters, str): + raise PaymentError("parameters must be a JSON string") + + self.set_timeout_seconds(timeout_seconds) + + from .orchestrator import _join_signer_endpoint, post_json + + url = _join_signer_endpoint(self._signer_url, "/sign-byoc-job") + data = post_json( + url, + { + "id": job_id, + "capability": capability, + "request": request, + "parameters": parameters, + "timeout_seconds": timeout_seconds, + "signature_format": "v1", + }, + headers=self._signer_headers, + ) + sender = data.get("sender") + signature = data.get("signature") + if not isinstance(sender, str) or not sender: + raise PaymentError(f"Invalid signer response: missing sender (url={url})") + if not isinstance(signature, str) or not signature: + raise PaymentError(f"Invalid signer response: missing signature (url={url})") + return SignedBYOCJob(sender=sender, signature=signature) + + def send_stream_payment(self, job_header: str) -> None: + """ + Send a single recurring BYOC stream payment to the orchestrator. + + ``job_header`` is the base64-encoded ``Livepeer:`` header established + at job start; we attach a fresh payment ticket to it. + """ + if not isinstance(job_header, str) or not job_header: + raise PaymentError("job_header must be a non-empty base64 string") + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for stream payment") + + from .orchestrator import resolve_transcoder_http_url + + p = self.get_payment() + url = resolve_transcoder_http_url(self._info.transcoder, self._stream_payment_endpoint) + headers = { + "Livepeer": job_header, + "Livepeer-Payment": p.payment, + "Livepeer-Segment": p.seg_creds or "", + } + self._post_empty(url, headers, op="stream payment") diff --git a/src/livepeer_gateway/capabilities.py b/src/livepeer_gateway/capabilities.py index 27e5688..dde24c3 100644 --- a/src/livepeer_gateway/capabilities.py +++ b/src/livepeer_gateway/capabilities.py @@ -45,6 +45,7 @@ class CapabilityId(IntEnum): IMAGE_TO_TEXT = 34 LIVE_VIDEO_TO_VIDEO = 35 TEXT_TO_SPEECH = 36 + BYOC = 37 CAPABILITY_ID_TO_NAME: dict[int, str] = { -2: "Invalid", @@ -85,6 +86,7 @@ class CapabilityId(IntEnum): 34: "Image to text", 35: "Live video to video", 36: "Text to speech", + 37: "BYOC", } @@ -152,6 +154,45 @@ def capability_pipeline_id(cap_id: int) -> Optional[str]: return enum_name.lower().replace("_", "-") +def capability_id_from_pipeline(pipeline_id: str) -> Optional[CapabilityId]: + """ + Reverse of capability_pipeline_id: convert a pipeline ID back to a CapabilityId. + + Example: + live-video-to-video -> CapabilityId.LIVE_VIDEO_TO_VIDEO + byoc -> CapabilityId.BYOC + """ + enum_name = pipeline_id.upper().replace("-", "_") + try: + return CapabilityId[enum_name] + except KeyError: + return None + + +def build_capabilities_from_queries(queries: list[str]) -> Optional[lp_rpc_pb2.Capabilities]: + """ + Build a Capabilities protobuf from pipeline-id/model query strings. + + Each query has the form "pipeline-id/model" (e.g. "byoc/text-reversal", + "live-video-to-video/noop"). Returns None if no valid queries are provided. + """ + caps = lp_rpc_pb2.Capabilities() + found = False + for q in queries: + parts = q.split("/", 1) + pipeline_id = parts[0] + model = parts[1] if len(parts) > 1 else None + cap = capability_id_from_pipeline(pipeline_id) + if cap is None: + continue + cap_id = int(cap) + caps.capacities[cap_id] = 1 + if model: + caps.constraints.PerCapability[cap_id].models[model] + found = True + return caps if found else None + + def capabilities_to_query(caps: Optional[lp_rpc_pb2.Capabilities]) -> list[str]: """ Build discovery query values in `pipeline-id/model` form. diff --git a/src/livepeer_gateway/errors.py b/src/livepeer_gateway/errors.py index 14fb8b4..6bb8112 100644 --- a/src/livepeer_gateway/errors.py +++ b/src/livepeer_gateway/errors.py @@ -31,5 +31,9 @@ class SkipPaymentCycle(LivepeerGatewayError): """Raised when the signer returns HTTP 482 to skip a payment cycle.""" +class PaymentRequiredError(LivepeerGatewayError): + """Raised when an endpoint returns HTTP 402 and requires payment.""" + + class PaymentError(LivepeerGatewayError): """Raised when a PaymentSession operation fails.""" diff --git a/src/livepeer_gateway/orchestrator.py b/src/livepeer_gateway/orchestrator.py index 844cd7b..991b295 100644 --- a/src/livepeer_gateway/orchestrator.py +++ b/src/livepeer_gateway/orchestrator.py @@ -203,6 +203,45 @@ def _http_origin(url: str) -> str: return f"{parsed.scheme}://{parsed.netloc}" +def _join_signer_endpoint(signer_url: str, path: str) -> str: + """ + Join an endpoint path onto signer_url while preserving any existing base path. + + Examples: + - https://example.com/api/signer + /sign-orchestrator-info + -> https://example.com/api/signer/sign-orchestrator-info + - https://example.com/api/signer/sign-orchestrator-info + same path + -> unchanged + """ + if not isinstance(signer_url, str) or not signer_url.strip(): + raise ValueError("signer_url must be a non-empty string") + suffix = path if path.startswith("/") else f"/{path}" + base = signer_url.strip().rstrip("/") + if base.endswith(suffix): + return base + return f"{base}{suffix}" + + +def resolve_transcoder_http_url(origin: str, path_or_absolute_url: str) -> str: + """ + Build the final HTTP URL for a BYOC (or similar) call rooted at an orchestrator + transcoder origin, or pass through an absolute URL. + + When ``path_or_absolute_url`` starts with ``http://`` or ``https://``, it is + returned unchanged. Otherwise it is treated as a path on ``origin``. + """ + o = path_or_absolute_url.strip() + if not o: + raise ValueError("path_or_absolute_url must be non-empty") + low = o.lower() + if low.startswith("http://") or low.startswith("https://"): + return o + if not o.startswith("/"): + o = "/" + o + base = _http_origin(origin) + return f"{base}{o}" + + def _append_caps(url: str, capabilities: Optional[lp_rpc_pb2.Capabilities]) -> str: """ Append repeated `caps` query parameters to a URL. diff --git a/src/livepeer_gateway/payments_base.py b/src/livepeer_gateway/payments_base.py new file mode 100644 index 0000000..5cc1e89 --- /dev/null +++ b/src/livepeer_gateway/payments_base.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +import base64 +import ssl +import uuid +from dataclasses import dataclass +from typing import Any, Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from . import lp_rpc_pb2 +from .errors import LivepeerGatewayError, PaymentError, SignerRefreshRequired + + +@dataclass(frozen=True) +class GetPaymentResponse: + payment: str + seg_creds: Optional[str] = None + + +class BasePaymentSession: + """ + Shared payment-session machinery for the remote signer's + ``/generate-live-payment`` endpoint. + + Concrete subclasses provide an ``_offchain_payment`` implementation for + the ``signer_url is None`` case (e.g. LV2V auth-token relay, BYOC noop). + """ + + def __init__( + self, + signer_url: Optional[str], + info: lp_rpc_pb2.OrchestratorInfo, + *, + signer_headers: Optional[dict[str, str]], + payment_type: str, + capabilities: Optional[lp_rpc_pb2.Capabilities], + max_refresh_retries: int = 3, + ) -> None: + self._signer_url = signer_url + self._signer_headers = signer_headers + self._info = info + self._payment_type = payment_type + self._manifest_id: Optional[str] = None + self._capabilities = capabilities + self._max_refresh_retries = max(0, int(max_refresh_retries)) + self._state: Optional[dict[str, Any]] = None + self._timeout_seconds: int = 0 + + def set_manifest_id(self, manifest_id: str) -> None: + if not isinstance(manifest_id, str) or not manifest_id.strip(): + raise PaymentError("manifest_id must be a non-empty string") + self._manifest_id = manifest_id.strip() + + def set_timeout_seconds(self, timeout_seconds: int) -> None: + # Hint the remote signer about expected job duration so BYOC initial + # ticket batches are sized to match the work (floored server-side). + self._timeout_seconds = max(0, int(timeout_seconds)) + + def _offchain_payment(self) -> GetPaymentResponse: + raise NotImplementedError + + def _build_payment_payload(self) -> dict[str, Any]: + pb = self._info.SerializeToString() + orch_b64 = base64.b64encode(pb).decode("ascii") + payload: dict[str, Any] = { + "orchestrator": orch_b64, + "type": self._payment_type, + } + if self._manifest_id is not None: + payload["manifestID"] = self._manifest_id + if self._timeout_seconds > 0: + payload["timeoutSeconds"] = self._timeout_seconds + if self._state is not None: + payload["state"] = self._state + if self._capabilities is not None: + caps_b64 = base64.b64encode(self._capabilities.SerializeToString()).decode("ascii") + payload["capabilities"] = caps_b64 + # One id per billing call so clearinghouse usage is not deduped across an entire manifest. + payload.setdefault("RequestID", str(uuid.uuid4())) + return payload + + def _refresh_orchestrator_info(self) -> None: + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for refresh") + + from .orch_info import get_orch_info + + self._info = get_orch_info( + self._info.transcoder, + signer_url=self._signer_url, + signer_headers=self._signer_headers, + capabilities=self._capabilities, + ) + + def _request_payment(self) -> GetPaymentResponse: + from .orchestrator import _join_signer_endpoint, post_json + + url = _join_signer_endpoint(self._signer_url, "/generate-live-payment") + data = post_json(url, self._build_payment_payload(), headers=self._signer_headers) + + payment = data.get("payment") + if not isinstance(payment, str) or not payment: + raise PaymentError(f"GetPayment error: missing/invalid 'payment' in response (url={url})") + + seg_creds = data.get("segCreds") + if seg_creds is not None and not isinstance(seg_creds, str): + raise PaymentError(f"GetPayment error: invalid 'segCreds' in response (url={url})") + + state = data.get("state") + if not isinstance(state, dict): + raise PaymentError(f"Remote signer response missing 'state' object (url={url})") + + self._state = state + return GetPaymentResponse(payment=payment, seg_creds=seg_creds) + + def get_payment(self) -> GetPaymentResponse: + if not self._signer_url: + return self._offchain_payment() + + attempts = 0 + while True: + try: + return self._request_payment() + except SignerRefreshRequired as e: + if attempts >= self._max_refresh_retries: + raise PaymentError(f"Signer refresh required after {attempts} retries: {e}") from e + self._refresh_orchestrator_info() + attempts += 1 + + def _post_empty(self, url: str, headers: dict[str, str], *, op: str, timeout: float = 5.0) -> None: + from .orchestrator import _extract_error_message + + req = Request(url, data=b"", headers=headers, method="POST") + ssl_ctx = ssl._create_unverified_context() + try: + with urlopen(req, timeout=timeout, context=ssl_ctx) as resp: + resp.read() + except HTTPError as e: + body = _extract_error_message(e) + body_part = f"; body={body!r}" if body else "" + raise PaymentError( + f"HTTP {op} error: HTTP {e.code} from endpoint (url={url}){body_part}" + ) from e + except ConnectionRefusedError as e: + raise PaymentError( + f"HTTP {op} error: connection refused (is the server running? is the host/port correct?) (url={url})" + ) from e + except URLError as e: + raise PaymentError( + f"HTTP {op} error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" + ) from e + except LivepeerGatewayError: + raise + except Exception as e: + raise PaymentError( + f"HTTP {op} error: unexpected error: {e.__class__.__name__}: {e} (url={url})" + ) from e + + def send_payment(self) -> None: + from .orchestrator import _http_origin + + p = self.get_payment() + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for payment") + + base = _http_origin(self._info.transcoder) + url = f"{base}/payment" + headers = { + "Livepeer-Payment": p.payment, + "Livepeer-Segment": p.seg_creds or "", + } + self._post_empty(url, headers, op="payment") diff --git a/src/livepeer_gateway/remote_signer.py b/src/livepeer_gateway/remote_signer.py index 8fecf3a..64a3a69 100644 --- a/src/livepeer_gateway/remote_signer.py +++ b/src/livepeer_gateway/remote_signer.py @@ -4,21 +4,16 @@ import json import logging import re -import ssl from dataclasses import dataclass from functools import lru_cache -from typing import Any, Optional +from typing import Optional from urllib.error import HTTPError, URLError -from urllib.request import Request, urlopen from . import lp_rpc_pb2 -from .errors import LivepeerGatewayError, PaymentError, SignerRefreshRequired, SkipPaymentCycle -_LOG = logging.getLogger(__name__) +from .errors import LivepeerGatewayError, PaymentError +from .payments_base import BasePaymentSession, GetPaymentResponse -@dataclass(frozen=True) -class GetPaymentResponse: - payment: str - seg_creds: Optional[str] = None +_LOG = logging.getLogger(__name__) @dataclass(frozen=True) @@ -78,15 +73,15 @@ def get_orch_info_sig( Fetch signer material exactly once per (signer_url, headers) combination for the lifetime of the process. Subsequent calls return cached data. """ - from .orchestrator import _extract_error_message, _http_origin, post_json + from .orchestrator import _extract_error_message, _join_signer_endpoint, post_json # check for offchain mode if not signer_url: return SignerMaterial(address=None, sig=None) - # Accept either a base URL or a full URL that includes /sign-orchestrator-info. - # Normalize to an https:// origin and append the expected path. - signer_url = f"{_http_origin(signer_url)}/sign-orchestrator-info" + # Accept either a signer base URL (which may itself include a base path + # like /api/signer) or a full URL ending with /sign-orchestrator-info. + signer_url = _join_signer_endpoint(signer_url, "/sign-orchestrator-info") headers = dict(_signer_headers) if _signer_headers else None try: @@ -152,153 +147,40 @@ def get_orch_info_sig( return SignerMaterial(address=address, sig=sig) -class PaymentSession: +class PaymentSession(BasePaymentSession): + """ + Live-Video-to-Video payment session backed by the remote signer's + ``/generate-live-payment`` endpoint. + + BYOC jobs use :class:`livepeer_gateway.byoc_payments.BYOCPaymentSession` + instead; both share :class:`BasePaymentSession`. + """ + def __init__( self, signer_url: Optional[str], info: lp_rpc_pb2.OrchestratorInfo, *, signer_headers: Optional[dict[str, str]] = None, - type: str, + type: str = "lv2v", capabilities: Optional[lp_rpc_pb2.Capabilities] = None, use_tofu: bool = True, max_refresh_retries: int = 3, ) -> None: - self._signer_url = signer_url - self._signer_headers = signer_headers - self._info = info - self._type = type - self._manifest_id: Optional[str] = None - self._capabilities = capabilities + super().__init__( + signer_url, + info, + signer_headers=signer_headers, + payment_type=type, + capabilities=capabilities, + max_refresh_retries=max_refresh_retries, + ) self._use_tofu = use_tofu - self._max_refresh_retries = max(0, int(max_refresh_retries)) - self._state: Optional[dict[str, str]] = None - - def set_manifest_id(self, manifest_id: str) -> None: - if not isinstance(manifest_id, str) or not manifest_id.strip(): - raise PaymentError("manifest_id must be a non-empty string") - self._manifest_id = manifest_id.strip() - - def get_payment(self) -> GetPaymentResponse: - """ - Generate a payment via the remote signer. - - Handles signer state round-tripping internally. - On HTTP 480, refreshes OrchestratorInfo and retries - (up to max_refresh_retries). - Returns payment + seg_creds for use as HTTP headers. - """ - - # Offchain mode: still send the expected headers, but with empty content. - if not self._signer_url: - seg = lp_rpc_pb2.SegData() - if not self._info.HasField("auth_token"): - raise PaymentError( - "Orchestrator did not provide an auth token." - ) - seg.auth_token.CopyFrom(self._info.auth_token) - seg = base64.b64encode(seg.SerializeToString()).decode("ascii") - return GetPaymentResponse(seg_creds=seg, payment="") - - def _payment_request() -> GetPaymentResponse: - from .orchestrator import _http_origin, post_json - - base = _http_origin(self._signer_url) - url = f"{base}/generate-live-payment" - - pb = self._info.SerializeToString() - orch_b64 = base64.b64encode(pb).decode("ascii") - payload: dict[str, Any] = { - "orchestrator": orch_b64, - "type": self._type, - } - if self._manifest_id is not None: - payload["ManifestID"] = self._manifest_id - if self._state is not None: - payload["state"] = self._state - - data = post_json(url, payload, headers=self._signer_headers) - payment = data.get("payment") - if not isinstance(payment, str) or not payment: - raise PaymentError( - f"GetPayment error: missing/invalid 'payment' in response (url={url})" - ) - - seg_creds = data.get("segCreds") - if seg_creds is not None and not isinstance(seg_creds, str): - raise PaymentError( - f"GetPayment error: invalid 'segCreds' in response (url={url})" - ) - - state = data.get("state") - if not isinstance(state, dict): - raise PaymentError( - f"Remote signer response missing 'state' object (url={url})" - ) - - self._state = state - return GetPaymentResponse(payment=payment, seg_creds=seg_creds) - - attempts = 0 - while True: - try: - return _payment_request() - except SignerRefreshRequired as e: - if attempts >= self._max_refresh_retries: - raise PaymentError( - f"Signer refresh required after {attempts} retries: {e}" - ) from e - if not self._info.transcoder: - raise PaymentError( - "OrchestratorInfo missing transcoder URL for refresh" - ) - from .orch_info import get_orch_info - - self._info = get_orch_info( - self._info.transcoder, - signer_url=self._signer_url, - signer_headers=self._signer_headers, - capabilities=self._capabilities, - use_tofu=self._use_tofu, - ) - attempts += 1 - - def send_payment(self) -> None: - """ - Generate a payment (via get_payment) and forward it - to the orchestrator via POST {orch}/payment. - """ - from .orchestrator import _extract_error_message, _http_origin - - p = self.get_payment() - if not self._info.transcoder: - raise PaymentError("OrchestratorInfo missing transcoder URL for payment") - base = _http_origin(self._info.transcoder) - url = f"{base}/payment" - headers = { - "Livepeer-Payment": p.payment, - "Livepeer-Segment": p.seg_creds or "", - } - req = Request(url, data=b"", headers=headers, method="POST") - ssl_ctx = ssl._create_unverified_context() - try: - with urlopen(req, timeout=5.0, context=ssl_ctx) as resp: - resp.read() - except HTTPError as e: - body = _extract_error_message(e) - body_part = f"; body={body!r}" if body else "" - raise PaymentError( - f"HTTP payment error: HTTP {e.code} from endpoint (url={url}){body_part}" - ) from e - except ConnectionRefusedError as e: - raise PaymentError( - f"HTTP payment error: connection refused (is the server running? is the host/port correct?) (url={url})" - ) from e - except URLError as e: - raise PaymentError( - f"HTTP payment error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" - ) from e - except Exception as e: - raise PaymentError( - f"HTTP payment error: unexpected error: {e.__class__.__name__}: {e} (url={url})" - ) from e + + def _offchain_payment(self) -> GetPaymentResponse: + seg = lp_rpc_pb2.SegData() + if not self._info.HasField("auth_token"): + raise PaymentError("Orchestrator did not provide an auth token.") + seg.auth_token.CopyFrom(self._info.auth_token) + seg_b64 = base64.b64encode(seg.SerializeToString()).decode("ascii") + return GetPaymentResponse(seg_creds=seg_b64, payment="") diff --git a/uv.lock b/uv.lock index 4003c14..b604b8e 100644 --- a/uv.lock +++ b/uv.lock @@ -479,7 +479,7 @@ wheels = [ [[package]] name = "livepeer-gateway" -version = "0.1.0" +version = "0.1.1" source = { editable = "." } dependencies = [ { name = "aiohttp" }, From b27f5ce20a1e3e9377a51faeb81a484d5cfa0532 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 13:18:06 -0400 Subject: [PATCH 02/12] chore(media,trickle): de-noise mid-stream segment-drop logging When the orchestrator's trickle endpoint stalls, the segment-writer queue.put() times out after 5s. The publisher already handles this: it sets _active_segment_drain and rolls into the next wall-clock segment. The chained CancelledError -> TimeoutError -> TrickleSegment WriteError stack trace was logged at WARNING level via exc_info=True, which made the (non-fatal) drain look like a crash. - media_publish: drop the exc_info=True; the warning now includes the underlying error message inline. - trickle_publisher.SegmentWriter.close(): re-raise CancelledError cleanly (don't suppress) and log other close-time failures without a stack trace. No behavioral change to the publish/drain pipeline. Made-with: Cursor --- src/livepeer_gateway/media_publish.py | 10 +++++++--- src/livepeer_gateway/trickle_publisher.py | 12 +++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/livepeer_gateway/media_publish.py b/src/livepeer_gateway/media_publish.py index bf11725..7d1fd85 100644 --- a/src/livepeer_gateway/media_publish.py +++ b/src/livepeer_gateway/media_publish.py @@ -986,15 +986,19 @@ async def _stream_pipe_to_trickle(self, read_file: BinaryIO) -> None: # lean on that instead of doing that here. try: await segment.write(chunk) - except TrickleSegmentWriteError: + except TrickleSegmentWriteError as e: self._active_segment_drain = True self._stats["segments_failed"] += 1 + # Single-line warning: write timeouts here are caused by the + # orchestrator not draining the trickle POST body. The full + # CancelledError -> TimeoutError -> TrickleSegmentWriteError + # chain is noise and obscures the actual cause. _LOG.warning( "MediaPublish[%s] dropped segment seq=%s mid-stream; " - "draining pipe until wall-clock segment ends", + "draining pipe until wall-clock segment ends (%s)", self._channel_name, segment_seq, - exc_info=True, + e, ) if self._should_close_segment_after_loop(): await self._close_active_segment_locked( diff --git a/src/livepeer_gateway/trickle_publisher.py b/src/livepeer_gateway/trickle_publisher.py index 1419720..a6ebff4 100644 --- a/src/livepeer_gateway/trickle_publisher.py +++ b/src/livepeer_gateway/trickle_publisher.py @@ -603,9 +603,15 @@ async def close(self) -> None: return try: await asyncio.wait_for(self.queue.put(None), timeout=_SEGMENT_QUEUE_PUT_TIMEOUT_S) - # BaseException to also capture cancellation errors, timeout errors, etc - except BaseException: - _LOG.warning("Trickle segment close suppressed seq=%s", self._seq, exc_info=True) + except asyncio.CancelledError: + # Cancellation during shutdown / segment rollover is expected; don't + # log a stack trace and don't swallow the cancel. + _LOG.debug("Trickle segment close cancelled seq=%s", self._seq) + raise + # BaseException to also capture timeout errors and any other unexpected + # close-time failures; logged at warning level without a stack trace. + except BaseException as e: + _LOG.warning("Trickle segment close suppressed seq=%s (%s)", self._seq, e) async def __aenter__(self) -> "SegmentWriter": return self From 1a56e25f67ea54eda4365a558c86f84ebc65d148 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 13:58:46 -0400 Subject: [PATCH 03/12] fix(byoc): improve error handling and result retrieval in text reversal example - Enhanced error handling for missing control URL in the text reversal job. - Updated the result retrieval logic to ensure proper handling of timeouts and cancellation of tasks. - Added a final cleanup step to ensure the job is stopped correctly, even if an error occurs during processing. This change improves the robustness of the text reversal example by ensuring that resources are managed correctly and errors are reported clearly. --- examples/byoc_text_reversal.py | 83 +++++++++++++++------------ examples/byoc_write_frames.py | 4 +- examples/byoc_write_text.py | 29 +++++++++- examples/get_orchestrator_info.py | 13 ++++- src/livepeer_gateway/byoc_payments.py | 2 +- src/livepeer_gateway/orchestrator.py | 4 +- 6 files changed, 94 insertions(+), 41 deletions(-) diff --git a/examples/byoc_text_reversal.py b/examples/byoc_text_reversal.py index 1d928c8..f590de9 100644 --- a/examples/byoc_text_reversal.py +++ b/examples/byoc_text_reversal.py @@ -108,47 +108,58 @@ async def _amain() -> None: job.start_payment_sender() - if job.control is None: - print("ERROR: job has no control_url; cannot send text.") - return + reader_task: Optional[asyncio.Task] = None + stop_sent = False + try: + if job.control is None: + print("ERROR: job has no control_url; cannot send text.") + return - result_received = asyncio.Event() - received_payload: dict = {} + result_received = asyncio.Event() + received_payload: dict = {} - async def read_results() -> None: - if job.events is None: - print("WARN: no events channel on job; results will not be captured.") - result_received.set() - return - async for msg in job.events(): - if isinstance(msg, dict) and isinstance(msg.get("reversed"), str): - received_payload.update(msg) + async def read_results() -> None: + if job.events is None: + print("WARN: no events channel on job; results will not be captured.") result_received.set() return + async for msg in job.events(): + if isinstance(msg, dict) and isinstance(msg.get("reversed"), str): + received_payload.update(msg) + result_received.set() + return + + reader_task = asyncio.create_task(read_results()) + + await asyncio.sleep(0.15) + await job.control.write({"text": args.text}) + print(f"sent: {{\"text\": {args.text!r}}}") + + try: + await asyncio.wait_for(result_received.wait(), timeout=args.timeout_seconds) + except asyncio.TimeoutError: + print("ERROR: timed out waiting for reversal result.") + return - reader_task = asyncio.create_task(read_results()) - - await asyncio.sleep(0.15) - await job.control.write({"text": args.text}) - print(f"sent: {{\"text\": {args.text!r}}}") - - try: - await asyncio.wait_for(result_received.wait(), timeout=args.timeout_seconds) - except asyncio.TimeoutError: - print("ERROR: timed out waiting for reversal result.") - return - - original = received_payload.get("original") or received_payload.get("text") or args.text - reversed_text = received_payload.get("reversed") - print(f"result: {original!r} -> {reversed_text!r}") - print("payload:", json.dumps(received_payload, indent=2, sort_keys=True)) - - stop_resp = await job.stop() - print(f"stop: status={stop_resp['status_code']}") - - await asyncio.sleep(max(0.0, args.output_grace_seconds)) - reader_task.cancel() - await asyncio.gather(reader_task, return_exceptions=True) + original = received_payload.get("original") or received_payload.get("text") or args.text + reversed_text = received_payload.get("reversed") + print(f"result: {original!r} -> {reversed_text!r}") + print("payload:", json.dumps(received_payload, indent=2, sort_keys=True)) + + stop_resp = await job.stop() + stop_sent = True + print(f"stop: status={stop_resp['status_code']}") + finally: + if not stop_sent: + try: + stop_resp = await job.stop() + print(f"stop: status={stop_resp['status_code']}") + except Exception as stop_err: + print(f"WARN: failed to stop BYOC job: {stop_err}") + await asyncio.sleep(max(0.0, args.output_grace_seconds)) + if reader_task is not None: + reader_task.cancel() + await asyncio.gather(reader_task, return_exceptions=True) except LivepeerGatewayError as err: print(f"ERROR: {err}") finally: diff --git a/examples/byoc_write_frames.py b/examples/byoc_write_frames.py index ffaa19e..e8ae686 100644 --- a/examples/byoc_write_frames.py +++ b/examples/byoc_write_frames.py @@ -58,7 +58,9 @@ def _solid_rgb_frame(width: int, height: int, rgb: tuple[int, int, int]) -> av.V async def main() -> None: args = _parse_args() - frame_interval = 1.0 / max(1e-6, args.fps) + if args.fps <= 0: + raise SystemExit("--fps must be > 0") + frame_interval = 1.0 / args.fps job = None try: diff --git a/examples/byoc_write_text.py b/examples/byoc_write_text.py index 512cc38..073ae92 100644 --- a/examples/byoc_write_text.py +++ b/examples/byoc_write_text.py @@ -108,6 +108,12 @@ def _parse_args() -> argparse.Namespace: default=1.5, help="Extra time to wait for final event/data output after stop (default: 1.5).", ) + p.add_argument( + "--results-timeout", + type=float, + default=30.0, + help="Max seconds to wait for expected results after control messages finish (default: 30).", + ) return p.parse_args() @@ -217,7 +223,28 @@ async def control_messages() -> None: try: await control_task if expected_results > 0 and primary_results_task is not None: - await results_complete.wait() + wait_task = asyncio.create_task(results_complete.wait()) + try: + done, _pending = await asyncio.wait( + {wait_task, primary_results_task}, + timeout=max(0.05, float(args.results_timeout)), + return_when=asyncio.FIRST_COMPLETED, + ) + finally: + if not wait_task.done(): + wait_task.cancel() + await asyncio.gather(wait_task, return_exceptions=True) + if not results_complete.is_set(): + if primary_results_task in done: + print( + f"WARN: results channel ended before {expected_results} " + f"result(s) received; stopping job." + ) + else: + print( + f"ERROR: timed out after {args.results_timeout}s waiting for " + f"{expected_results} result(s); stopping job." + ) stop_resp = await job.stop() stop_sent = True print(f"stop: {_json_dump(stop_resp.get('body') or {'status_code': stop_resp['status_code']})}") diff --git a/examples/get_orchestrator_info.py b/examples/get_orchestrator_info.py index 73485b7..be7fca1 100644 --- a/examples/get_orchestrator_info.py +++ b/examples/get_orchestrator_info.py @@ -1,6 +1,7 @@ import argparse import json import logging +import sys from typing import Any from livepeer_gateway.capabilities import ( @@ -367,7 +368,17 @@ def main() -> None: if args.debug: logging.basicConfig(level=logging.DEBUG) - capabilities = build_capabilities_from_queries(args.caps) if args.caps else None + if args.caps: + capabilities = build_capabilities_from_queries(args.caps) + if not capabilities: + print( + f"ERROR: --caps {args.caps!r} did not parse into any valid capabilities " + f"(expected pipeline-id/model entries, e.g. 'byoc/text-reversal').", + file=sys.stderr, + ) + sys.exit(1) + else: + capabilities = None json_results: list[dict[str, Any]] = [] diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py index 9246b0b..109f3b6 100644 --- a/src/livepeer_gateway/byoc_payments.py +++ b/src/livepeer_gateway/byoc_payments.py @@ -23,7 +23,7 @@ class BYOCPaymentSession(BasePaymentSession): - Generates time-based BYOC payments via ``/generate-live-payment`` (``type: "byoc"``, ``manifestID = capability_name``). - Sends recurring stream payments to the orchestrator's - ``/process/stream/payment`` endpoint (or operator override). + ``/ai/stream/payment`` endpoint (or operator override). """ def __init__( diff --git a/src/livepeer_gateway/orchestrator.py b/src/livepeer_gateway/orchestrator.py index 991b295..f51e094 100644 --- a/src/livepeer_gateway/orchestrator.py +++ b/src/livepeer_gateway/orchestrator.py @@ -215,8 +215,10 @@ def _join_signer_endpoint(signer_url: str, path: str) -> str: """ if not isinstance(signer_url, str) or not signer_url.strip(): raise ValueError("signer_url must be a non-empty string") + parsed = _parse_http_url(signer_url, context="signer_url") + base_path = (parsed.path or "").rstrip("/") + base = urlunparse(parsed._replace(path=base_path, params="", query="", fragment="")) suffix = path if path.startswith("/") else f"/{path}" - base = signer_url.strip().rstrip("/") if base.endswith(suffix): return base return f"{base}{suffix}" From 91e0ee45a687e010ecb752506fc2396844a06ebe Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 14:09:21 -0400 Subject: [PATCH 04/12] remove example streaming text --- examples/byoc_write_text.py | 270 ------------------------------------ 1 file changed, 270 deletions(-) delete mode 100644 examples/byoc_write_text.py diff --git a/examples/byoc_write_text.py b/examples/byoc_write_text.py deleted file mode 100644 index 073ae92..0000000 --- a/examples/byoc_write_text.py +++ /dev/null @@ -1,270 +0,0 @@ -import argparse -import asyncio -import json -from typing import Optional - -from livepeer_gateway import JSONLReader -from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job -from livepeer_gateway.errors import LivepeerGatewayError - - -DEFAULT_CAPABILITY = "text-reversal" -_DEFAULT_CONTROL_TEXTS = ("hello-byoc", "livepeer", "draw") - - -def _json_dump(data: dict) -> str: - return json.dumps(data, sort_keys=True) - - -def _render_text_result(msg: dict) -> str: - original = msg.get("original") - reversed_text = msg.get("reversed") - text = msg.get("text") - if isinstance(original, str) and isinstance(reversed_text, str): - return f"{original!r} -> {reversed_text!r}" - if isinstance(text, str) and isinstance(reversed_text, str): - return f"{text!r} -> {reversed_text!r}" - if isinstance(reversed_text, str): - return reversed_text - return _json_dump(msg) - - -def _is_text_result(msg: dict) -> bool: - return isinstance(msg.get("reversed"), str) - - -def _parse_args() -> argparse.Namespace: - p = argparse.ArgumentParser( - description="Start a text-only BYOC stream and drive it entirely via control messages." - ) - p.add_argument( - "orchestrator", - nargs="?", - default=None, - help="Orchestrator (host:port). If omitted, discovery is used.", - ) - p.add_argument( - "--capability", - default=DEFAULT_CAPABILITY, - help=f"BYOC capability name (default: {DEFAULT_CAPABILITY}).", - ) - p.add_argument( - "--signer", - required=True, - help="Remote signer URL (no path). Required for BYOC job signing.", - ) - p.add_argument( - "--discovery", - default=None, - help="Discovery endpoint for orchestrators.", - ) - p.add_argument( - "--stream-id", - default=None, - help="Optional stream ID to include in BYOC request details.", - ) - p.add_argument( - "--stream-start", - default=None, - metavar="PATH_OR_URL", - help=( - "HTTP path on the orchestrator transcoder (default /ai/stream/start) or a full URL. " - "Livepeer gateway BYOC uses /process/stream/start." - ), - ) - p.add_argument( - "--stream-payment", - default=None, - metavar="PATH_OR_URL", - help=( - "Payment path (default /ai/stream/payment) or full URL; must match the server " - "that handles Livepeer-Payment for this job." - ), - ) - p.add_argument( - "--text", - action="append", - dest="control_texts", - metavar="STRING", - help=( - "Send a control JSON message {\"text\": STRING}. " - "Repeat to send multiple commands." - ), - ) - p.add_argument( - "--control-interval", - type=float, - default=1.0, - help="Seconds between successive control messages (default: 1).", - ) - p.add_argument( - "--no-control", - action="store_true", - help="Do not send any control-channel messages.", - ) - p.add_argument( - "--output-grace-seconds", - type=float, - default=1.5, - help="Extra time to wait for final event/data output after stop (default: 1.5).", - ) - p.add_argument( - "--results-timeout", - type=float, - default=30.0, - help="Max seconds to wait for expected results after control messages finish (default: 30).", - ) - return p.parse_args() - - -def _control_texts_for_run(args: argparse.Namespace) -> list[str]: - if args.no_control: - return [] - if args.control_texts: - return list(args.control_texts) - if args.capability == DEFAULT_CAPABILITY: - return list(_DEFAULT_CONTROL_TEXTS) - return [] - - -async def main() -> None: - args = _parse_args() - - job = None - try: - req_opts: dict[str, str] = {} - if args.stream_start is not None: - req_opts["stream_start_endpoint"] = args.stream_start - if args.stream_payment is not None: - req_opts["stream_payment_endpoint"] = args.stream_payment - - job = start_byoc_job( - args.orchestrator, - BYOCJobRequest( - capability=args.capability, - stream_id=args.stream_id, - enable_video_ingress=False, - enable_video_egress=False, - enable_data_output=True, - **req_opts, - ), - signer_url=args.signer, - discovery_url=args.discovery, - ) - - print("=== BYOC text-only stream ===") - print("job_id:", job.job_id) - print("capability:", job.capability) - print("control_url:", job.control_url) - print("events_url:", job.events_url) - print("data_url:", job.data_url) - print() - - job.start_payment_sender() - - texts = _control_texts_for_run(args) - expected_results = len(texts) - results_received = 0 - results_complete = asyncio.Event() - - async def print_channel(name: str, stream, *, count_results: bool = False) -> None: - nonlocal results_received - async for msg in stream: - print(f"{name}: {_render_text_result(msg)}") - if count_results and _is_text_result(msg): - results_received += 1 - if results_received >= expected_results: - results_complete.set() - - async def control_messages() -> None: - if not texts: - results_complete.set() - return - if job.control is None: - print("WARN: control messages requested but job has no control_url; skipping.") - results_complete.set() - return - interval = max(0.05, float(args.control_interval)) - await asyncio.sleep(0.15) - for i, s in enumerate(texts): - if i > 0: - await asyncio.sleep(interval) - await job.control.write({"text": s}) - print(f"control[{i}]: {{\"text\": {s!r}}}") - - control_task = asyncio.create_task(control_messages()) - output_tasks: list[asyncio.Task] = [] - primary_results_task: Optional[asyncio.Task] = None - - if job.events is not None: - event_task = asyncio.create_task( - print_channel( - "event", - job.events(), - count_results=not bool(job.data_url), - ) - ) - output_tasks.append(event_task) - if primary_results_task is None and not job.data_url: - primary_results_task = event_task - - if job.data_url: - data_task = asyncio.create_task( - print_channel( - "data", - JSONLReader(job.data_url)(), - count_results=True, - ) - ) - output_tasks.append(data_task) - primary_results_task = data_task - - stop_sent = False - try: - await control_task - if expected_results > 0 and primary_results_task is not None: - wait_task = asyncio.create_task(results_complete.wait()) - try: - done, _pending = await asyncio.wait( - {wait_task, primary_results_task}, - timeout=max(0.05, float(args.results_timeout)), - return_when=asyncio.FIRST_COMPLETED, - ) - finally: - if not wait_task.done(): - wait_task.cancel() - await asyncio.gather(wait_task, return_exceptions=True) - if not results_complete.is_set(): - if primary_results_task in done: - print( - f"WARN: results channel ended before {expected_results} " - f"result(s) received; stopping job." - ) - else: - print( - f"ERROR: timed out after {args.results_timeout}s waiting for " - f"{expected_results} result(s); stopping job." - ) - stop_resp = await job.stop() - stop_sent = True - print(f"stop: {_json_dump(stop_resp.get('body') or {'status_code': stop_resp['status_code']})}") - finally: - if not control_task.done(): - await control_task - if not stop_sent: - stop_resp = await job.stop() - print(f"stop: {_json_dump(stop_resp.get('body') or {'status_code': stop_resp['status_code']})}") - await asyncio.sleep(max(0.0, args.output_grace_seconds)) - for task in output_tasks: - task.cancel() - if output_tasks: - await asyncio.gather(*output_tasks, return_exceptions=True) - except LivepeerGatewayError as e: - print(f"ERROR: {e}") - finally: - if job is not None: - await job.close() - - -if __name__ == "__main__": - asyncio.run(main()) From 634107859d3624c254c85b30367ebe2f59c2278c Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 14:20:04 -0400 Subject: [PATCH 05/12] fix(byoc): update media publishing configuration in frame writing example - Modified the media publishing configuration to include VideoOutputConfig for better handling of frame rates. - Ensured that the job starts with the correct media settings, improving the robustness of the example. This change enhances the functionality of the byoc_write_frames example by aligning it with the latest media publishing standards. --- examples/byoc_write_frames.py | 6 ++++-- src/livepeer_gateway/trickle_publisher.py | 8 +++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/byoc_write_frames.py b/examples/byoc_write_frames.py index e8ae686..86f035d 100644 --- a/examples/byoc_write_frames.py +++ b/examples/byoc_write_frames.py @@ -6,7 +6,7 @@ from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job from livepeer_gateway.errors import LivepeerGatewayError -from livepeer_gateway.media_publish import MediaPublishConfig +from livepeer_gateway.media_publish import MediaPublishConfig, VideoOutputConfig DEFAULT_CAPABILITY = "text-reversal" @@ -80,7 +80,9 @@ async def main() -> None: print("publish_url:", job.publish_url) print() - media = job.start_media(MediaPublishConfig(fps=args.fps)) + media = job.start_media( + MediaPublishConfig(tracks=[VideoOutputConfig(fps=args.fps)]) + ) job.start_payment_sender() time_base = Fraction(1, int(round(args.fps))) diff --git a/src/livepeer_gateway/trickle_publisher.py b/src/livepeer_gateway/trickle_publisher.py index a6ebff4..423df47 100644 --- a/src/livepeer_gateway/trickle_publisher.py +++ b/src/livepeer_gateway/trickle_publisher.py @@ -608,9 +608,11 @@ async def close(self) -> None: # log a stack trace and don't swallow the cancel. _LOG.debug("Trickle segment close cancelled seq=%s", self._seq) raise - # BaseException to also capture timeout errors and any other unexpected - # close-time failures; logged at warning level without a stack trace. - except BaseException as e: + # Capture timeout errors and any other unexpected close-time failures; + # logged at warning level without a stack trace. KeyboardInterrupt / + # SystemExit are intentionally not caught so process-control exceptions + # still propagate. + except Exception as e: _LOG.warning("Trickle segment close suppressed seq=%s (%s)", self._seq, e) async def __aenter__(self) -> "SegmentWriter": From ee9bf5f1091a23d6a555bae58400a73740abf19a Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 14:32:44 -0400 Subject: [PATCH 06/12] refactor(payments): standardize payment type parameter naming - Changed parameter name from `payment_type` to `type` in BYOCPaymentSession and BasePaymentSession classes for consistency. - Updated corresponding references in the remote_signer module. This change improves code clarity and aligns parameter naming conventions across the payment session classes. --- src/livepeer_gateway/byoc_payments.py | 2 +- src/livepeer_gateway/payments_base.py | 4 ++-- src/livepeer_gateway/remote_signer.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py index 109f3b6..0707727 100644 --- a/src/livepeer_gateway/byoc_payments.py +++ b/src/livepeer_gateway/byoc_payments.py @@ -48,7 +48,7 @@ def __init__( signer_url, info, signer_headers=signer_headers, - payment_type="byoc", + type="byoc", capabilities=capabilities, max_refresh_retries=max_refresh_retries, ) diff --git a/src/livepeer_gateway/payments_base.py b/src/livepeer_gateway/payments_base.py index 5cc1e89..f2ac498 100644 --- a/src/livepeer_gateway/payments_base.py +++ b/src/livepeer_gateway/payments_base.py @@ -33,14 +33,14 @@ def __init__( info: lp_rpc_pb2.OrchestratorInfo, *, signer_headers: Optional[dict[str, str]], - payment_type: str, + type: str, capabilities: Optional[lp_rpc_pb2.Capabilities], max_refresh_retries: int = 3, ) -> None: self._signer_url = signer_url self._signer_headers = signer_headers self._info = info - self._payment_type = payment_type + self._payment_type = type self._manifest_id: Optional[str] = None self._capabilities = capabilities self._max_refresh_retries = max(0, int(max_refresh_retries)) diff --git a/src/livepeer_gateway/remote_signer.py b/src/livepeer_gateway/remote_signer.py index 64a3a69..514bcee 100644 --- a/src/livepeer_gateway/remote_signer.py +++ b/src/livepeer_gateway/remote_signer.py @@ -171,7 +171,7 @@ def __init__( signer_url, info, signer_headers=signer_headers, - payment_type=type, + type=type, capabilities=capabilities, max_refresh_retries=max_refresh_retries, ) From 24d32fc726e0b18e047f6c0290963fd1cd001921 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 14:51:11 -0400 Subject: [PATCH 07/12] fix(byoc): ensure minimum frame rate in byoc_write_frames example - Updated the calculation of `time_base` to prevent division by zero by using `max(1, int(round(args.fps)))`. - This change ensures that the frame rate is always set to a valid minimum value, improving the stability of the example. This fix enhances the robustness of the byoc_write_frames example by safeguarding against invalid frame rate inputs. --- examples/byoc_write_frames.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/byoc_write_frames.py b/examples/byoc_write_frames.py index 86f035d..c781bf7 100644 --- a/examples/byoc_write_frames.py +++ b/examples/byoc_write_frames.py @@ -85,7 +85,7 @@ async def main() -> None: ) job.start_payment_sender() - time_base = Fraction(1, int(round(args.fps))) + time_base = Fraction(1, max(1, int(round(args.fps)))) for i in range(max(0, args.count)): color = (i * 5) % 255 frame = _solid_rgb_frame(args.width, args.height, (color, 0, 255 - color)) From 766ee55d9c9d7dff888aa3667a1f8719ba31d273 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 28 Apr 2026 15:05:44 -0400 Subject: [PATCH 08/12] feat(payments): introduce use_tofu parameter for payment sessions - Added a new boolean parameter `use_tofu` to the BYOCPaymentSession, BasePaymentSession, and related functions to enhance payment session configuration. - Updated relevant methods to utilize the `use_tofu` parameter, ensuring consistent behavior across payment processing. This change improves flexibility in payment session management by allowing the option to enable or disable tofu usage. --- src/livepeer_gateway/byoc.py | 7 +++++++ src/livepeer_gateway/byoc_payments.py | 4 +++- src/livepeer_gateway/media_publish.py | 14 ++++++++++++-- src/livepeer_gateway/payments_base.py | 3 +++ src/livepeer_gateway/remote_signer.py | 2 +- src/livepeer_gateway/trickle_publisher.py | 6 ++++-- 6 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/livepeer_gateway/byoc.py b/src/livepeer_gateway/byoc.py index aeba3c7..b712bfe 100644 --- a/src/livepeer_gateway/byoc.py +++ b/src/livepeer_gateway/byoc.py @@ -117,6 +117,7 @@ def _get_payment_orch_info( signer_headers: Optional[dict[str, str]], capabilities: Any, capability_name: str, + use_tofu: bool = True, ) -> tuple[Any, Any]: """ Fetch OrchestratorInfo for BYOC payment preflight. If the capability-scoped @@ -129,6 +130,7 @@ def _get_payment_orch_info( signer_url=signer_url, signer_headers=signer_headers, capabilities=capabilities, + use_tofu=use_tofu, ) if _orch_info_supports_byoc_payment(payment_info, capability_name): return payment_info, capabilities @@ -137,6 +139,7 @@ def _get_payment_orch_info( orch_url, signer_url=signer_url, signer_headers=signer_headers, + use_tofu=use_tofu, ) if _orch_info_supports_byoc_payment(legacy_info, capability_name): _LOG.debug( @@ -525,6 +528,7 @@ def start_byoc_job( signer_headers: Optional[dict[str, str]] = None, discovery_url: Optional[str] = None, discovery_headers: Optional[dict[str, str]] = None, + use_tofu: bool = True, ) -> BYOCJob: """ Start a BYOC job through the Python gateway. @@ -567,6 +571,7 @@ def start_byoc_job( discovery_url=resolved_discovery_url, discovery_headers=resolved_discovery_headers, capabilities=capabilities, + use_tofu=use_tofu, ) start_rejections: list[OrchestratorRejection] = [] @@ -589,6 +594,7 @@ def start_byoc_job( signer_headers=resolved_signer_headers, capabilities=capabilities, capability_name=req.capability.strip(), + use_tofu=use_tofu, ) session = BYOCPaymentSession( @@ -598,6 +604,7 @@ def start_byoc_job( signer_headers=resolved_signer_headers, capabilities=payment_capabilities, stream_payment_endpoint=req.stream_payment_endpoint, + use_tofu=use_tofu, ) job_id = req._job_id() diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py index 0707727..36fde6c 100644 --- a/src/livepeer_gateway/byoc_payments.py +++ b/src/livepeer_gateway/byoc_payments.py @@ -36,6 +36,7 @@ def __init__( capabilities: Optional[lp_rpc_pb2.Capabilities] = None, max_refresh_retries: int = 3, stream_payment_endpoint: str = "/ai/stream/payment", + use_tofu: bool = True, ) -> None: if not isinstance(capability_name, str) or not capability_name.strip(): raise PaymentError("capability_name must be a non-empty string") @@ -51,6 +52,7 @@ def __init__( type="byoc", capabilities=capabilities, max_refresh_retries=max_refresh_retries, + use_tofu=use_tofu, ) # Use capability name as manifest id so balance tracking is shared # across requests for the same BYOC capability (matches go-livepeer). @@ -91,7 +93,7 @@ def sign_byoc_job( "capability": capability, "request": request, "parameters": parameters, - "timeout_seconds": timeout_seconds, + "timeout_seconds": self._timeout_seconds, "signature_format": "v1", }, headers=self._signer_headers, diff --git a/src/livepeer_gateway/media_publish.py b/src/livepeer_gateway/media_publish.py index 7d1fd85..7d0ca0f 100644 --- a/src/livepeer_gateway/media_publish.py +++ b/src/livepeer_gateway/media_publish.py @@ -950,7 +950,12 @@ async def _stream_pipe_to_trickle(self, read_file: BinaryIO) -> None: # NB: This intentionally keeps trickle rolling # accommodate long stalls / inactivity from the # PyAV end, eg during model loading - _LOG.warning( + log_fn = ( + _LOG.debug + if segment_seq == 0 + else _LOG.warning + ) + log_fn( "MediaPublish[%s] trickle segment seq=%s idle for " "%.1fs; rolling over to a fresh empty segment", self._channel_name, @@ -993,7 +998,12 @@ async def _stream_pipe_to_trickle(self, read_file: BinaryIO) -> None: # orchestrator not draining the trickle POST body. The full # CancelledError -> TimeoutError -> TrickleSegmentWriteError # chain is noise and obscures the actual cause. - _LOG.warning( + log_fn = ( + _LOG.debug + if segment_seq == 0 + else _LOG.warning + ) + log_fn( "MediaPublish[%s] dropped segment seq=%s mid-stream; " "draining pipe until wall-clock segment ends (%s)", self._channel_name, diff --git a/src/livepeer_gateway/payments_base.py b/src/livepeer_gateway/payments_base.py index f2ac498..1070e23 100644 --- a/src/livepeer_gateway/payments_base.py +++ b/src/livepeer_gateway/payments_base.py @@ -36,6 +36,7 @@ def __init__( type: str, capabilities: Optional[lp_rpc_pb2.Capabilities], max_refresh_retries: int = 3, + use_tofu: bool = True, ) -> None: self._signer_url = signer_url self._signer_headers = signer_headers @@ -46,6 +47,7 @@ def __init__( self._max_refresh_retries = max(0, int(max_refresh_retries)) self._state: Optional[dict[str, Any]] = None self._timeout_seconds: int = 0 + self._use_tofu = use_tofu def set_manifest_id(self, manifest_id: str) -> None: if not isinstance(manifest_id, str) or not manifest_id.strip(): @@ -91,6 +93,7 @@ def _refresh_orchestrator_info(self) -> None: signer_url=self._signer_url, signer_headers=self._signer_headers, capabilities=self._capabilities, + use_tofu=self._use_tofu, ) def _request_payment(self) -> GetPaymentResponse: diff --git a/src/livepeer_gateway/remote_signer.py b/src/livepeer_gateway/remote_signer.py index 514bcee..198bc4a 100644 --- a/src/livepeer_gateway/remote_signer.py +++ b/src/livepeer_gateway/remote_signer.py @@ -174,8 +174,8 @@ def __init__( type=type, capabilities=capabilities, max_refresh_retries=max_refresh_retries, + use_tofu=use_tofu, ) - self._use_tofu = use_tofu def _offchain_payment(self) -> GetPaymentResponse: seg = lp_rpc_pb2.SegData() diff --git a/src/livepeer_gateway/trickle_publisher.py b/src/livepeer_gateway/trickle_publisher.py index 423df47..8936a7c 100644 --- a/src/livepeer_gateway/trickle_publisher.py +++ b/src/livepeer_gateway/trickle_publisher.py @@ -356,7 +356,8 @@ async def _resolve_next_seq(self) -> int: _LOG.debug("Trickle resolved seq from %s: %s", url, resolved_seq) return resolved_seq else: - _LOG.warning("Trickle /next missing Lp-Trickle-Latest header") + # Common on first /next before orchestrator sets the header. + _LOG.debug("Trickle /next missing Lp-Trickle-Latest header") except Exception: _LOG.warning("Trickle /next request failed", exc_info=True) return -1 @@ -613,7 +614,8 @@ async def close(self) -> None: # SystemExit are intentionally not caught so process-control exceptions # still propagate. except Exception as e: - _LOG.warning("Trickle segment close suppressed seq=%s (%s)", self._seq, e) + log_fn = _LOG.debug if self._seq == 0 else _LOG.warning + log_fn("Trickle segment close suppressed seq=%s (%s)", self._seq, e) async def __aenter__(self) -> "SegmentWriter": return self From 2e7f2678dd9d11aac11e67e38f329ee19b1a8385 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 30 Apr 2026 14:03:46 -0400 Subject: [PATCH 09/12] feat(payments): refactor BYOCPaymentSession and remove BasePaymentSession - Refactored BYOCPaymentSession to remove inheritance from BasePaymentSession, consolidating payment session logic directly within BYOCPaymentSession. - Introduced new methods for building payment payloads and refreshing orchestrator information. - Enhanced error handling and state management during payment requests. - Deleted the now-redundant BasePaymentSession class, streamlining the payment processing architecture. This change improves the clarity and maintainability of the payment session implementation by reducing complexity and focusing on the BYOC-specific logic. --- src/livepeer_gateway/byoc_payments.py | 132 +++++++++++++++--- src/livepeer_gateway/payments_base.py | 176 ------------------------ src/livepeer_gateway/remote_signer.py | 188 +++++++++++++++++++++----- 3 files changed, 267 insertions(+), 229 deletions(-) delete mode 100644 src/livepeer_gateway/payments_base.py diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py index 36fde6c..989d363 100644 --- a/src/livepeer_gateway/byoc_payments.py +++ b/src/livepeer_gateway/byoc_payments.py @@ -1,11 +1,16 @@ from __future__ import annotations +import base64 +import ssl +import uuid from dataclasses import dataclass -from typing import Optional +from typing import Any, Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen from . import lp_rpc_pb2 -from .errors import PaymentError -from .payments_base import BasePaymentSession, GetPaymentResponse +from .errors import LivepeerGatewayError, PaymentError, SignerRefreshRequired +from .remote_signer import GetPaymentResponse @dataclass(frozen=True) @@ -14,14 +19,14 @@ class SignedBYOCJob: signature: str -class BYOCPaymentSession(BasePaymentSession): +class BYOCPaymentSession: """ BYOC payment session. - Signs job credentials via the remote signer's ``POST /sign-byoc-job`` endpoint (V1 binary signing format, server-side flatten). - Generates time-based BYOC payments via ``/generate-live-payment`` - (``type: "byoc"``, ``manifestID = capability_name``). + using the BYOC capability name as the payment ``type``. - Sends recurring stream payments to the orchestrator's ``/ai/stream/payment`` endpoint (or operator override). """ @@ -43,23 +48,114 @@ def __init__( if not isinstance(stream_payment_endpoint, str) or not stream_payment_endpoint.strip(): raise PaymentError("stream_payment_endpoint must be a non-empty string") + self._signer_url = signer_url + self._signer_headers = signer_headers + self._info = info self._capability_name = capability_name.strip() + self._capabilities = capabilities + self._max_refresh_retries = max(0, int(max_refresh_retries)) + self._state: Optional[dict[str, Any]] = None self._stream_payment_endpoint = stream_payment_endpoint.strip() - super().__init__( - signer_url, - info, - signer_headers=signer_headers, - type="byoc", - capabilities=capabilities, - max_refresh_retries=max_refresh_retries, - use_tofu=use_tofu, + self._timeout_seconds: int = 0 + self._use_tofu = use_tofu + + def set_timeout_seconds(self, timeout_seconds: int) -> None: + self._timeout_seconds = max(0, int(timeout_seconds)) + + def _build_payment_payload(self) -> dict[str, Any]: + pb = self._info.SerializeToString() + payload: dict[str, Any] = { + "orchestrator": base64.b64encode(pb).decode("ascii"), + "type": self._capability_name, + "RequestID": str(uuid.uuid4()), + } + if self._timeout_seconds > 0: + payload["timeoutSeconds"] = self._timeout_seconds + if self._state is not None: + payload["state"] = self._state + if self._capabilities is not None: + payload["capabilities"] = base64.b64encode( + self._capabilities.SerializeToString() + ).decode("ascii") + return payload + + def _refresh_orchestrator_info(self) -> None: + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for refresh") + + from .orch_info import get_orch_info + + self._info = get_orch_info( + self._info.transcoder, + signer_url=self._signer_url, + signer_headers=self._signer_headers, + capabilities=self._capabilities, + use_tofu=self._use_tofu, ) - # Use capability name as manifest id so balance tracking is shared - # across requests for the same BYOC capability (matches go-livepeer). - self.set_manifest_id(self._capability_name) - def _offchain_payment(self) -> GetPaymentResponse: - return GetPaymentResponse(payment="", seg_creds="") + def _request_payment(self) -> GetPaymentResponse: + from .orchestrator import _join_signer_endpoint, post_json + + url = _join_signer_endpoint(self._signer_url, "/generate-live-payment") + data = post_json(url, self._build_payment_payload(), headers=self._signer_headers) + + payment = data.get("payment") + if not isinstance(payment, str) or not payment: + raise PaymentError(f"GetPayment error: missing/invalid 'payment' in response (url={url})") + + seg_creds = data.get("segCreds") + if seg_creds is not None and not isinstance(seg_creds, str): + raise PaymentError(f"GetPayment error: invalid 'segCreds' in response (url={url})") + + state = data.get("state") + if not isinstance(state, dict): + raise PaymentError(f"Remote signer response missing 'state' object (url={url})") + + self._state = state + return GetPaymentResponse(payment=payment, seg_creds=seg_creds) + + def get_payment(self) -> GetPaymentResponse: + if not self._signer_url: + return GetPaymentResponse(payment="", seg_creds="") + + attempts = 0 + while True: + try: + return self._request_payment() + except SignerRefreshRequired as e: + if attempts >= self._max_refresh_retries: + raise PaymentError(f"Signer refresh required after {attempts} retries: {e}") from e + self._refresh_orchestrator_info() + attempts += 1 + + def _post_empty(self, url: str, headers: dict[str, str], *, op: str, timeout: float = 5.0) -> None: + from .orchestrator import _extract_error_message + + req = Request(url, data=b"", headers=headers, method="POST") + ssl_ctx = ssl._create_unverified_context() + try: + with urlopen(req, timeout=timeout, context=ssl_ctx) as resp: + resp.read() + except HTTPError as e: + body = _extract_error_message(e) + body_part = f"; body={body!r}" if body else "" + raise PaymentError( + f"HTTP {op} error: HTTP {e.code} from endpoint (url={url}){body_part}" + ) from e + except ConnectionRefusedError as e: + raise PaymentError( + f"HTTP {op} error: connection refused (is the server running? is the host/port correct?) (url={url})" + ) from e + except URLError as e: + raise PaymentError( + f"HTTP {op} error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" + ) from e + except LivepeerGatewayError: + raise + except Exception as e: + raise PaymentError( + f"HTTP {op} error: unexpected error: {e.__class__.__name__}: {e} (url={url})" + ) from e def sign_byoc_job( self, diff --git a/src/livepeer_gateway/payments_base.py b/src/livepeer_gateway/payments_base.py deleted file mode 100644 index 1070e23..0000000 --- a/src/livepeer_gateway/payments_base.py +++ /dev/null @@ -1,176 +0,0 @@ -from __future__ import annotations - -import base64 -import ssl -import uuid -from dataclasses import dataclass -from typing import Any, Optional -from urllib.error import HTTPError, URLError -from urllib.request import Request, urlopen - -from . import lp_rpc_pb2 -from .errors import LivepeerGatewayError, PaymentError, SignerRefreshRequired - - -@dataclass(frozen=True) -class GetPaymentResponse: - payment: str - seg_creds: Optional[str] = None - - -class BasePaymentSession: - """ - Shared payment-session machinery for the remote signer's - ``/generate-live-payment`` endpoint. - - Concrete subclasses provide an ``_offchain_payment`` implementation for - the ``signer_url is None`` case (e.g. LV2V auth-token relay, BYOC noop). - """ - - def __init__( - self, - signer_url: Optional[str], - info: lp_rpc_pb2.OrchestratorInfo, - *, - signer_headers: Optional[dict[str, str]], - type: str, - capabilities: Optional[lp_rpc_pb2.Capabilities], - max_refresh_retries: int = 3, - use_tofu: bool = True, - ) -> None: - self._signer_url = signer_url - self._signer_headers = signer_headers - self._info = info - self._payment_type = type - self._manifest_id: Optional[str] = None - self._capabilities = capabilities - self._max_refresh_retries = max(0, int(max_refresh_retries)) - self._state: Optional[dict[str, Any]] = None - self._timeout_seconds: int = 0 - self._use_tofu = use_tofu - - def set_manifest_id(self, manifest_id: str) -> None: - if not isinstance(manifest_id, str) or not manifest_id.strip(): - raise PaymentError("manifest_id must be a non-empty string") - self._manifest_id = manifest_id.strip() - - def set_timeout_seconds(self, timeout_seconds: int) -> None: - # Hint the remote signer about expected job duration so BYOC initial - # ticket batches are sized to match the work (floored server-side). - self._timeout_seconds = max(0, int(timeout_seconds)) - - def _offchain_payment(self) -> GetPaymentResponse: - raise NotImplementedError - - def _build_payment_payload(self) -> dict[str, Any]: - pb = self._info.SerializeToString() - orch_b64 = base64.b64encode(pb).decode("ascii") - payload: dict[str, Any] = { - "orchestrator": orch_b64, - "type": self._payment_type, - } - if self._manifest_id is not None: - payload["manifestID"] = self._manifest_id - if self._timeout_seconds > 0: - payload["timeoutSeconds"] = self._timeout_seconds - if self._state is not None: - payload["state"] = self._state - if self._capabilities is not None: - caps_b64 = base64.b64encode(self._capabilities.SerializeToString()).decode("ascii") - payload["capabilities"] = caps_b64 - # One id per billing call so clearinghouse usage is not deduped across an entire manifest. - payload.setdefault("RequestID", str(uuid.uuid4())) - return payload - - def _refresh_orchestrator_info(self) -> None: - if not self._info.transcoder: - raise PaymentError("OrchestratorInfo missing transcoder URL for refresh") - - from .orch_info import get_orch_info - - self._info = get_orch_info( - self._info.transcoder, - signer_url=self._signer_url, - signer_headers=self._signer_headers, - capabilities=self._capabilities, - use_tofu=self._use_tofu, - ) - - def _request_payment(self) -> GetPaymentResponse: - from .orchestrator import _join_signer_endpoint, post_json - - url = _join_signer_endpoint(self._signer_url, "/generate-live-payment") - data = post_json(url, self._build_payment_payload(), headers=self._signer_headers) - - payment = data.get("payment") - if not isinstance(payment, str) or not payment: - raise PaymentError(f"GetPayment error: missing/invalid 'payment' in response (url={url})") - - seg_creds = data.get("segCreds") - if seg_creds is not None and not isinstance(seg_creds, str): - raise PaymentError(f"GetPayment error: invalid 'segCreds' in response (url={url})") - - state = data.get("state") - if not isinstance(state, dict): - raise PaymentError(f"Remote signer response missing 'state' object (url={url})") - - self._state = state - return GetPaymentResponse(payment=payment, seg_creds=seg_creds) - - def get_payment(self) -> GetPaymentResponse: - if not self._signer_url: - return self._offchain_payment() - - attempts = 0 - while True: - try: - return self._request_payment() - except SignerRefreshRequired as e: - if attempts >= self._max_refresh_retries: - raise PaymentError(f"Signer refresh required after {attempts} retries: {e}") from e - self._refresh_orchestrator_info() - attempts += 1 - - def _post_empty(self, url: str, headers: dict[str, str], *, op: str, timeout: float = 5.0) -> None: - from .orchestrator import _extract_error_message - - req = Request(url, data=b"", headers=headers, method="POST") - ssl_ctx = ssl._create_unverified_context() - try: - with urlopen(req, timeout=timeout, context=ssl_ctx) as resp: - resp.read() - except HTTPError as e: - body = _extract_error_message(e) - body_part = f"; body={body!r}" if body else "" - raise PaymentError( - f"HTTP {op} error: HTTP {e.code} from endpoint (url={url}){body_part}" - ) from e - except ConnectionRefusedError as e: - raise PaymentError( - f"HTTP {op} error: connection refused (is the server running? is the host/port correct?) (url={url})" - ) from e - except URLError as e: - raise PaymentError( - f"HTTP {op} error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" - ) from e - except LivepeerGatewayError: - raise - except Exception as e: - raise PaymentError( - f"HTTP {op} error: unexpected error: {e.__class__.__name__}: {e} (url={url})" - ) from e - - def send_payment(self) -> None: - from .orchestrator import _http_origin - - p = self.get_payment() - if not self._info.transcoder: - raise PaymentError("OrchestratorInfo missing transcoder URL for payment") - - base = _http_origin(self._info.transcoder) - url = f"{base}/payment" - headers = { - "Livepeer-Payment": p.payment, - "Livepeer-Segment": p.seg_creds or "", - } - self._post_empty(url, headers, op="payment") diff --git a/src/livepeer_gateway/remote_signer.py b/src/livepeer_gateway/remote_signer.py index 198bc4a..8fecf3a 100644 --- a/src/livepeer_gateway/remote_signer.py +++ b/src/livepeer_gateway/remote_signer.py @@ -4,17 +4,22 @@ import json import logging import re +import ssl from dataclasses import dataclass from functools import lru_cache -from typing import Optional +from typing import Any, Optional from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen from . import lp_rpc_pb2 -from .errors import LivepeerGatewayError, PaymentError -from .payments_base import BasePaymentSession, GetPaymentResponse - +from .errors import LivepeerGatewayError, PaymentError, SignerRefreshRequired, SkipPaymentCycle _LOG = logging.getLogger(__name__) +@dataclass(frozen=True) +class GetPaymentResponse: + payment: str + seg_creds: Optional[str] = None + @dataclass(frozen=True) class SignerMaterial: @@ -73,15 +78,15 @@ def get_orch_info_sig( Fetch signer material exactly once per (signer_url, headers) combination for the lifetime of the process. Subsequent calls return cached data. """ - from .orchestrator import _extract_error_message, _join_signer_endpoint, post_json + from .orchestrator import _extract_error_message, _http_origin, post_json # check for offchain mode if not signer_url: return SignerMaterial(address=None, sig=None) - # Accept either a signer base URL (which may itself include a base path - # like /api/signer) or a full URL ending with /sign-orchestrator-info. - signer_url = _join_signer_endpoint(signer_url, "/sign-orchestrator-info") + # Accept either a base URL or a full URL that includes /sign-orchestrator-info. + # Normalize to an https:// origin and append the expected path. + signer_url = f"{_http_origin(signer_url)}/sign-orchestrator-info" headers = dict(_signer_headers) if _signer_headers else None try: @@ -147,40 +152,153 @@ def get_orch_info_sig( return SignerMaterial(address=address, sig=sig) -class PaymentSession(BasePaymentSession): - """ - Live-Video-to-Video payment session backed by the remote signer's - ``/generate-live-payment`` endpoint. - - BYOC jobs use :class:`livepeer_gateway.byoc_payments.BYOCPaymentSession` - instead; both share :class:`BasePaymentSession`. - """ - +class PaymentSession: def __init__( self, signer_url: Optional[str], info: lp_rpc_pb2.OrchestratorInfo, *, signer_headers: Optional[dict[str, str]] = None, - type: str = "lv2v", + type: str, capabilities: Optional[lp_rpc_pb2.Capabilities] = None, use_tofu: bool = True, max_refresh_retries: int = 3, ) -> None: - super().__init__( - signer_url, - info, - signer_headers=signer_headers, - type=type, - capabilities=capabilities, - max_refresh_retries=max_refresh_retries, - use_tofu=use_tofu, - ) - - def _offchain_payment(self) -> GetPaymentResponse: - seg = lp_rpc_pb2.SegData() - if not self._info.HasField("auth_token"): - raise PaymentError("Orchestrator did not provide an auth token.") - seg.auth_token.CopyFrom(self._info.auth_token) - seg_b64 = base64.b64encode(seg.SerializeToString()).decode("ascii") - return GetPaymentResponse(seg_creds=seg_b64, payment="") + self._signer_url = signer_url + self._signer_headers = signer_headers + self._info = info + self._type = type + self._manifest_id: Optional[str] = None + self._capabilities = capabilities + self._use_tofu = use_tofu + self._max_refresh_retries = max(0, int(max_refresh_retries)) + self._state: Optional[dict[str, str]] = None + + def set_manifest_id(self, manifest_id: str) -> None: + if not isinstance(manifest_id, str) or not manifest_id.strip(): + raise PaymentError("manifest_id must be a non-empty string") + self._manifest_id = manifest_id.strip() + + def get_payment(self) -> GetPaymentResponse: + """ + Generate a payment via the remote signer. + + Handles signer state round-tripping internally. + On HTTP 480, refreshes OrchestratorInfo and retries + (up to max_refresh_retries). + Returns payment + seg_creds for use as HTTP headers. + """ + + # Offchain mode: still send the expected headers, but with empty content. + if not self._signer_url: + seg = lp_rpc_pb2.SegData() + if not self._info.HasField("auth_token"): + raise PaymentError( + "Orchestrator did not provide an auth token." + ) + seg.auth_token.CopyFrom(self._info.auth_token) + seg = base64.b64encode(seg.SerializeToString()).decode("ascii") + return GetPaymentResponse(seg_creds=seg, payment="") + + def _payment_request() -> GetPaymentResponse: + from .orchestrator import _http_origin, post_json + + base = _http_origin(self._signer_url) + url = f"{base}/generate-live-payment" + + pb = self._info.SerializeToString() + orch_b64 = base64.b64encode(pb).decode("ascii") + payload: dict[str, Any] = { + "orchestrator": orch_b64, + "type": self._type, + } + if self._manifest_id is not None: + payload["ManifestID"] = self._manifest_id + if self._state is not None: + payload["state"] = self._state + + data = post_json(url, payload, headers=self._signer_headers) + payment = data.get("payment") + if not isinstance(payment, str) or not payment: + raise PaymentError( + f"GetPayment error: missing/invalid 'payment' in response (url={url})" + ) + + seg_creds = data.get("segCreds") + if seg_creds is not None and not isinstance(seg_creds, str): + raise PaymentError( + f"GetPayment error: invalid 'segCreds' in response (url={url})" + ) + + state = data.get("state") + if not isinstance(state, dict): + raise PaymentError( + f"Remote signer response missing 'state' object (url={url})" + ) + + self._state = state + return GetPaymentResponse(payment=payment, seg_creds=seg_creds) + + attempts = 0 + while True: + try: + return _payment_request() + except SignerRefreshRequired as e: + if attempts >= self._max_refresh_retries: + raise PaymentError( + f"Signer refresh required after {attempts} retries: {e}" + ) from e + if not self._info.transcoder: + raise PaymentError( + "OrchestratorInfo missing transcoder URL for refresh" + ) + from .orch_info import get_orch_info + + self._info = get_orch_info( + self._info.transcoder, + signer_url=self._signer_url, + signer_headers=self._signer_headers, + capabilities=self._capabilities, + use_tofu=self._use_tofu, + ) + attempts += 1 + + def send_payment(self) -> None: + """ + Generate a payment (via get_payment) and forward it + to the orchestrator via POST {orch}/payment. + """ + from .orchestrator import _extract_error_message, _http_origin + + p = self.get_payment() + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for payment") + base = _http_origin(self._info.transcoder) + url = f"{base}/payment" + headers = { + "Livepeer-Payment": p.payment, + "Livepeer-Segment": p.seg_creds or "", + } + req = Request(url, data=b"", headers=headers, method="POST") + ssl_ctx = ssl._create_unverified_context() + try: + with urlopen(req, timeout=5.0, context=ssl_ctx) as resp: + resp.read() + except HTTPError as e: + body = _extract_error_message(e) + body_part = f"; body={body!r}" if body else "" + raise PaymentError( + f"HTTP payment error: HTTP {e.code} from endpoint (url={url}){body_part}" + ) from e + except ConnectionRefusedError as e: + raise PaymentError( + f"HTTP payment error: connection refused (is the server running? is the host/port correct?) (url={url})" + ) from e + except URLError as e: + raise PaymentError( + f"HTTP payment error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" + ) from e + except Exception as e: + raise PaymentError( + f"HTTP payment error: unexpected error: {e.__class__.__name__}: {e} (url={url})" + ) from e From 015067061188ef1b6bc3b6c70b273f95cef3e682 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 30 Apr 2026 14:05:44 -0400 Subject: [PATCH 10/12] feat(examples): enhance capability argument parsing in get_orchestrator_info - Updated the argument parsing for the `--caps` option to support both comma-delimited and repeated entries, improving flexibility in capability requests. - Introduced a new helper function `_split_capability_queries` to streamline the processing of capability inputs. - Enhanced help documentation for the `--caps` argument to clarify usage. --- examples/get_orchestrator_info.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/examples/get_orchestrator_info.py b/examples/get_orchestrator_info.py index be7fca1..21c2dd1 100644 --- a/examples/get_orchestrator_info.py +++ b/examples/get_orchestrator_info.py @@ -38,7 +38,8 @@ def _parse_args() -> argparse.Namespace: "\n" " # Request specific capabilities (e.g. BYOC)\n" " python examples/get_orchestrator_info.py localhost:8935 --signer https://signer.example.com --caps byoc/text-reversal\n" - " python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop byoc/my-pipeline\n" + " python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop,byoc/my-pipeline\n" + " python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop --caps byoc/my-pipeline\n" "\n" " # JSON / JSONL output\n" " python examples/get_orchestrator_info.py localhost:8935 --format json\n" @@ -73,10 +74,13 @@ def _parse_args() -> argparse.Namespace: ) p.add_argument( "--caps", - nargs="*", + action="append", default=None, metavar="PIPELINE/MODEL", - help="Request specific capabilities in pipeline/model form (e.g. byoc/text-reversal).", + help=( + "Request specific capabilities in pipeline/model form. " + "Can be comma-delimited or repeated (e.g. --caps byoc/text-reversal --caps live-video-to-video/noop)." + ), ) p.add_argument( "--format", @@ -362,17 +366,31 @@ def _resolve_discovery_args(args: argparse.Namespace) -> tuple[Any, str | None, return orchestrators, signer, signer_headers, discovery, discovery_headers +def _split_capability_queries(raw_caps: list[str] | None) -> list[str]: + if not raw_caps: + return [] + queries: list[str] = [] + for raw_cap in raw_caps: + queries.extend( + part.strip() + for part in raw_cap.split(",") + if part.strip() + ) + return queries + + def main() -> None: args = _parse_args() if args.debug: logging.basicConfig(level=logging.DEBUG) - if args.caps: - capabilities = build_capabilities_from_queries(args.caps) + cap_queries = _split_capability_queries(args.caps) + if cap_queries: + capabilities = build_capabilities_from_queries(cap_queries) if not capabilities: print( - f"ERROR: --caps {args.caps!r} did not parse into any valid capabilities " + f"ERROR: --caps {cap_queries!r} did not parse into any valid capabilities " f"(expected pipeline-id/model entries, e.g. 'byoc/text-reversal').", file=sys.stderr, ) From 07c45b4c5c7f8c4654b993a8894b9d09d90abe7b Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 30 Apr 2026 15:09:23 -0400 Subject: [PATCH 11/12] fix(payments): update BYOCPaymentSession payment type handling - Changed the payment type in BYOCPaymentSession from using the BYOC capability name to a fixed value of "byoc" for clarity and consistency. - Updated the return value in the `_get_payment_orch_info` function to include capabilities alongside legacy information, enhancing the payment orchestration process. This change improves the clarity of payment type usage and ensures that the orchestration information is more comprehensive. --- src/livepeer_gateway/byoc.py | 2 +- src/livepeer_gateway/byoc_payments.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/livepeer_gateway/byoc.py b/src/livepeer_gateway/byoc.py index b712bfe..8bd0c5f 100644 --- a/src/livepeer_gateway/byoc.py +++ b/src/livepeer_gateway/byoc.py @@ -146,7 +146,7 @@ def _get_payment_orch_info( "BYOC payment preflight: using legacy orch info response for %s", capability_name, ) - return legacy_info, None + return legacy_info, capabilities return payment_info, capabilities diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py index 989d363..93689ea 100644 --- a/src/livepeer_gateway/byoc_payments.py +++ b/src/livepeer_gateway/byoc_payments.py @@ -26,7 +26,7 @@ class BYOCPaymentSession: - Signs job credentials via the remote signer's ``POST /sign-byoc-job`` endpoint (V1 binary signing format, server-side flatten). - Generates time-based BYOC payments via ``/generate-live-payment`` - using the BYOC capability name as the payment ``type``. + using ``type=byoc`` plus the BYOC capability constraint. - Sends recurring stream payments to the orchestrator's ``/ai/stream/payment`` endpoint (or operator override). """ @@ -66,7 +66,7 @@ def _build_payment_payload(self) -> dict[str, Any]: pb = self._info.SerializeToString() payload: dict[str, Any] = { "orchestrator": base64.b64encode(pb).decode("ascii"), - "type": self._capability_name, + "type": "byoc", "RequestID": str(uuid.uuid4()), } if self._timeout_seconds > 0: From c92e879342f42944dab435f9a7ca460295b4f806 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 30 Apr 2026 20:43:27 -0400 Subject: [PATCH 12/12] feat(payments): enhance BYOCPaymentSession with manifest ID handling - Introduced a new `_manifest_id` attribute in BYOCPaymentSession to store the job ID required for payment requests. - Updated the `_build_payment_payload` method to include the manifest ID in the payment payload and added error handling for missing job IDs. - Refined the timeout handling by renaming `timeoutSeconds` to `preloadSeconds` for clarity. These changes improve the robustness of payment processing by ensuring that the necessary job ID is provided and enhancing the clarity of timeout parameters. --- src/livepeer_gateway/byoc.py | 40 +++++++++++++++++++++++---- src/livepeer_gateway/byoc_payments.py | 9 +++++- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/livepeer_gateway/byoc.py b/src/livepeer_gateway/byoc.py index 8bd0c5f..67268af 100644 --- a/src/livepeer_gateway/byoc.py +++ b/src/livepeer_gateway/byoc.py @@ -106,8 +106,32 @@ def _orch_info_has_byoc_price(info: Any, capability_name: str) -> bool: return False +def _orch_info_aggregate_price_usable(info: Any) -> bool: + """ + True when top-level price_info has a non-zero per-pixel quote. + + go-livepeer omits ``capabilities_prices`` on GetOrchestrator responses built + via ``orchestratorInfoWithCaps`` (non-nil request capabilities). The effective + job price is only the aggregate ``price_info`` without BYOC capability / + constraint fields — see ``PriceInfoForCaps`` vs ``GetCapabilitiesPrices``. + """ + top = _field_value(info, "price_info", "priceInfo") + if top is None: + return False + return _nonzero_real_scalar( + _field_value(top, "price_per_unit", "pricePerUnit") + ) and _nonzero_real_scalar( + _field_value(top, "pixels_per_unit", "pixelsPerUnit") + ) + + def _orch_info_supports_byoc_payment(info: Any, capability_name: str) -> bool: - return _orch_info_ticket_params_usable(info) and _orch_info_has_byoc_price(info, capability_name) + if not _orch_info_ticket_params_usable(info): + return False + if _orch_info_has_byoc_price(info, capability_name): + return True + # Capped-path OrchestratorInfo: aggregate price only, no BYOC-tagged rows. + return _orch_info_aggregate_price_usable(info) def _get_payment_orch_info( @@ -120,10 +144,16 @@ def _get_payment_orch_info( use_tofu: bool = True, ) -> tuple[Any, Any]: """ - Fetch OrchestratorInfo for BYOC payment preflight. If the capability-scoped - request comes back without usable BYOC pricing, retry without capability - filtering (some legacy orchestrators only advertise BYOC pricing on the - unfiltered path). + Fetch OrchestratorInfo for BYOC payment preflight. + + First calls ``GetOrchestrator`` with BYOC capability constraints. go-livepeer + then returns aggregate ``price_info`` and often omits ``capabilities_prices`` + (see ``orchestratorInfoWithCaps``); that shape is accepted without a second + RPC. + + If that response still fails preflight, retries once **without** capability + constraints for legacy orchestrators that only surface BYOC pricing on the + unfiltered path. """ payment_info = _get_orch_info( orch_url, diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py index 93689ea..a3955f3 100644 --- a/src/livepeer_gateway/byoc_payments.py +++ b/src/livepeer_gateway/byoc_payments.py @@ -57,20 +57,24 @@ def __init__( self._state: Optional[dict[str, Any]] = None self._stream_payment_endpoint = stream_payment_endpoint.strip() self._timeout_seconds: int = 0 + self._manifest_id: Optional[str] = None self._use_tofu = use_tofu def set_timeout_seconds(self, timeout_seconds: int) -> None: self._timeout_seconds = max(0, int(timeout_seconds)) def _build_payment_payload(self) -> dict[str, Any]: + if not self._manifest_id: + raise PaymentError("BYOC payment requires a signed job_id before requesting payment") pb = self._info.SerializeToString() payload: dict[str, Any] = { "orchestrator": base64.b64encode(pb).decode("ascii"), "type": "byoc", "RequestID": str(uuid.uuid4()), + "manifestID": self._manifest_id, } if self._timeout_seconds > 0: - payload["timeoutSeconds"] = self._timeout_seconds + payload["preloadSeconds"] = self._timeout_seconds if self._state is not None: payload["state"] = self._state if self._capabilities is not None: @@ -176,8 +180,11 @@ def sign_byoc_job( raise PaymentError("request must be a JSON string") if not isinstance(parameters, str): raise PaymentError("parameters must be a JSON string") + if not isinstance(job_id, str) or not job_id.strip(): + raise PaymentError("job_id must be a non-empty string") self.set_timeout_seconds(timeout_seconds) + self._manifest_id = job_id.strip() from .orchestrator import _join_signer_endpoint, post_json