diff --git a/examples/byoc_text_reversal.py b/examples/byoc_text_reversal.py new file mode 100644 index 0000000..f590de9 --- /dev/null +++ b/examples/byoc_text_reversal.py @@ -0,0 +1,175 @@ +import argparse +import asyncio +import json +import logging +from typing import Optional + +from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job +from livepeer_gateway.errors import LivepeerGatewayError + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run a BYOC text-reversal request through the Python gateway." + ) + parser.add_argument( + "--text", + default="hello from byoc", + help="Input text to reverse. Default: 'hello from byoc'.", + ) + parser.add_argument( + "--capability", + default="text-reversal", + help="BYOC capability name. Default: text-reversal.", + ) + parser.add_argument( + "--orchestrator", + default=None, + help="Optional orchestrator URL or comma-separated URLs.", + ) + parser.add_argument( + "--signer", + default=None, + help="Remote signer base URL.", + ) + parser.add_argument( + "--discovery", + default=None, + help="Optional discovery endpoint URL.", + ) + parser.add_argument( + "--token", + default=None, + help="Optional gateway token containing signer/discovery/orchestrator info.", + ) + parser.add_argument( + "--timeout-seconds", + type=int, + default=30, + help="Timeout encoded into the BYOC Livepeer header. Default: 30.", + ) + parser.add_argument( + "--output-grace-seconds", + type=float, + default=1.5, + help="Extra time to wait for final event/data output after stop. Default: 1.5.", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging.", + ) + return parser.parse_args() + + +def _parse_orchestrator_arg(orchestrator_arg: Optional[str]): + if orchestrator_arg is None: + return None + parts = [part.strip() for part in orchestrator_arg.split(",") if part.strip()] + if not parts: + return None + if len(parts) == 1: + return parts[0] + return parts + + +async def _amain() -> None: + args = _parse_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(levelname)s %(name)s: %(message)s", + ) + + orch_url = _parse_orchestrator_arg(args.orchestrator) + + job = None + try: + job = start_byoc_job( + orch_url, + BYOCJobRequest( + capability=args.capability, + enable_video_ingress=False, + enable_video_egress=False, + # reverse_server publishes JSON to events_url only, not data_url. + enable_data_output=False, + timeout_seconds=args.timeout_seconds, + ), + token=args.token, + signer_url=args.signer, + discovery_url=args.discovery, + ) + + print("=== BYOC text-reversal ===") + print(f"job_id: {job.job_id}") + print(f"capability: {job.capability}") + print(f"control_url: {job.control_url}") + print(f"events_url: {job.events_url}") + print() + + job.start_payment_sender() + + reader_task: Optional[asyncio.Task] = None + stop_sent = False + try: + if job.control is None: + print("ERROR: job has no control_url; cannot send text.") + return + + result_received = asyncio.Event() + received_payload: dict = {} + + async def read_results() -> None: + if job.events is None: + print("WARN: no events channel on job; results will not be captured.") + result_received.set() + return + async for msg in job.events(): + if isinstance(msg, dict) and isinstance(msg.get("reversed"), str): + received_payload.update(msg) + result_received.set() + return + + reader_task = asyncio.create_task(read_results()) + + await asyncio.sleep(0.15) + await job.control.write({"text": args.text}) + print(f"sent: {{\"text\": {args.text!r}}}") + + try: + await asyncio.wait_for(result_received.wait(), timeout=args.timeout_seconds) + except asyncio.TimeoutError: + print("ERROR: timed out waiting for reversal result.") + return + + original = received_payload.get("original") or received_payload.get("text") or args.text + reversed_text = received_payload.get("reversed") + print(f"result: {original!r} -> {reversed_text!r}") + print("payload:", json.dumps(received_payload, indent=2, sort_keys=True)) + + stop_resp = await job.stop() + stop_sent = True + print(f"stop: status={stop_resp['status_code']}") + finally: + if not stop_sent: + try: + stop_resp = await job.stop() + print(f"stop: status={stop_resp['status_code']}") + except Exception as stop_err: + print(f"WARN: failed to stop BYOC job: {stop_err}") + await asyncio.sleep(max(0.0, args.output_grace_seconds)) + if reader_task is not None: + reader_task.cancel() + await asyncio.gather(reader_task, return_exceptions=True) + except LivepeerGatewayError as err: + print(f"ERROR: {err}") + finally: + if job is not None: + await job.close() + + +def main() -> None: + asyncio.run(_amain()) + + +if __name__ == "__main__": + main() diff --git a/examples/byoc_write_frames.py b/examples/byoc_write_frames.py new file mode 100644 index 0000000..c781bf7 --- /dev/null +++ b/examples/byoc_write_frames.py @@ -0,0 +1,104 @@ +import argparse +import asyncio +from fractions import Fraction + +import av + +from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job +from livepeer_gateway.errors import LivepeerGatewayError +from livepeer_gateway.media_publish import MediaPublishConfig, VideoOutputConfig + + +DEFAULT_CAPABILITY = "text-reversal" + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Start a BYOC stream job and publish raw frames via publish_url." + ) + p.add_argument( + "orchestrator", + nargs="?", + default=None, + help="Orchestrator (host:port). If omitted, discovery is used.", + ) + p.add_argument( + "--capability", + default=DEFAULT_CAPABILITY, + help=f"BYOC capability name (default: {DEFAULT_CAPABILITY}).", + ) + p.add_argument( + "--signer", + required=True, + help="Remote signer URL (no path). Required for BYOC job signing.", + ) + p.add_argument( + "--discovery", + default=None, + help="Discovery endpoint for orchestrators.", + ) + p.add_argument( + "--stream-id", + default=None, + help="Optional stream ID to include in BYOC request details.", + ) + p.add_argument("--width", type=int, default=320, help="Frame width (default: 320).") + p.add_argument("--height", type=int, default=180, help="Frame height (default: 180).") + p.add_argument("--fps", type=float, default=30.0, help="Frames per second (default: 30).") + p.add_argument("--count", type=int, default=90, help="Number of frames to send (default: 90).") + return p.parse_args() + + +def _solid_rgb_frame(width: int, height: int, rgb: tuple[int, int, int]) -> av.VideoFrame: + frame = av.VideoFrame(width, height, "rgb24") + r, g, b = rgb + frame.planes[0].update(bytes([r, g, b]) * (width * height)) + return frame + + +async def main() -> None: + args = _parse_args() + if args.fps <= 0: + raise SystemExit("--fps must be > 0") + frame_interval = 1.0 / args.fps + + job = None + try: + job = start_byoc_job( + args.orchestrator, + BYOCJobRequest( + capability=args.capability, + stream_id=args.stream_id, + ), + signer_url=args.signer, + discovery_url=args.discovery, + ) + + print("=== BYOC stream ===") + print("job_id:", job.job_id) + print("capability:", job.capability) + print("publish_url:", job.publish_url) + print() + + media = job.start_media( + MediaPublishConfig(tracks=[VideoOutputConfig(fps=args.fps)]) + ) + job.start_payment_sender() + + time_base = Fraction(1, max(1, int(round(args.fps)))) + for i in range(max(0, args.count)): + color = (i * 5) % 255 + frame = _solid_rgb_frame(args.width, args.height, (color, 0, 255 - color)) + frame.pts = i + frame.time_base = time_base + await media.write_frame(frame) + await asyncio.sleep(frame_interval) + except LivepeerGatewayError as e: + print(f"ERROR: {e}") + finally: + if job is not None: + await job.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/get_orchestrator_info.py b/examples/get_orchestrator_info.py index a1430ad..21c2dd1 100644 --- a/examples/get_orchestrator_info.py +++ b/examples/get_orchestrator_info.py @@ -1,9 +1,11 @@ import argparse import json import logging +import sys from typing import Any from livepeer_gateway.capabilities import ( + build_capabilities_from_queries, compute_available, format_capability, get_capacity_in_use, @@ -34,6 +36,11 @@ def _parse_args() -> argparse.Namespace: " # Signer URL\n" " python examples/get_orchestrator_info.py --signer https://signer.example.com\n" "\n" + " # Request specific capabilities (e.g. BYOC)\n" + " python examples/get_orchestrator_info.py localhost:8935 --signer https://signer.example.com --caps byoc/text-reversal\n" + " python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop,byoc/my-pipeline\n" + " python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop --caps byoc/my-pipeline\n" + "\n" " # JSON / JSONL output\n" " python examples/get_orchestrator_info.py localhost:8935 --format json\n" " python examples/get_orchestrator_info.py localhost:8935 --format jsonl\n" @@ -65,6 +72,16 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Enable debug logging for discovery diagnostics.", ) + p.add_argument( + "--caps", + action="append", + default=None, + metavar="PIPELINE/MODEL", + help=( + "Request specific capabilities in pipeline/model form. " + "Can be comma-delimited or repeated (e.g. --caps byoc/text-reversal --caps live-video-to-video/noop)." + ), + ) p.add_argument( "--format", choices=("text", "json", "jsonl"), @@ -349,12 +366,38 @@ def _resolve_discovery_args(args: argparse.Namespace) -> tuple[Any, str | None, return orchestrators, signer, signer_headers, discovery, discovery_headers +def _split_capability_queries(raw_caps: list[str] | None) -> list[str]: + if not raw_caps: + return [] + queries: list[str] = [] + for raw_cap in raw_caps: + queries.extend( + part.strip() + for part in raw_cap.split(",") + if part.strip() + ) + return queries + + def main() -> None: args = _parse_args() if args.debug: logging.basicConfig(level=logging.DEBUG) + cap_queries = _split_capability_queries(args.caps) + if cap_queries: + capabilities = build_capabilities_from_queries(cap_queries) + if not capabilities: + print( + f"ERROR: --caps {cap_queries!r} did not parse into any valid capabilities " + f"(expected pipeline-id/model entries, e.g. 'byoc/text-reversal').", + file=sys.stderr, + ) + sys.exit(1) + else: + capabilities = None + json_results: list[dict[str, Any]] = [] def _json_info(orch_url: str, info: Any) -> None: @@ -384,6 +427,7 @@ def _json_error(orch_url: str | None, err: Exception) -> None: signer_headers=signer_headers, discovery_url=discovery, discovery_headers=discovery_headers, + capabilities=capabilities, ) for orch_url in orch_list: @@ -392,6 +436,7 @@ def _json_error(orch_url: str | None, err: Exception) -> None: orch_url, signer_url=signer, signer_headers=signer_headers, + capabilities=capabilities, ) except LivepeerGatewayError as e: print_error(orch_url, e) diff --git a/pyproject.toml b/pyproject.toml index afa0751..5728973 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "livepeer-gateway" -version = "0.1.0" +version = "0.1.1" requires-python = ">=3.10" dependencies = [ "grpcio>=1.65.0", diff --git a/src/livepeer_gateway/__init__.py b/src/livepeer_gateway/__init__.py index 93d839c..e463c7f 100644 --- a/src/livepeer_gateway/__init__.py +++ b/src/livepeer_gateway/__init__.py @@ -1,8 +1,15 @@ +from .byoc import BYOCJob, BYOCJobRequest, start_byoc_job +from .byoc_payments import BYOCPaymentSession from .capabilities import CapabilityId, build_capabilities from .channel_reader import ChannelReader, JSONLReader from .channel_writer import ChannelWriter, JSONLWriter from .control import Control, ControlConfig, ControlMode -from .errors import LivepeerGatewayError, NoOrchestratorAvailableError, PaymentError +from .errors import ( + LivepeerGatewayError, + NoOrchestratorAvailableError, + PaymentError, + PaymentRequiredError, +) from .events import Events from .media_publish import ( AudioOutputConfig, @@ -39,6 +46,9 @@ from .trickle_subscriber import TrickleSubscriber, TrickleSubscriberStats __all__ = [ + "BYOCJob", + "BYOCJobRequest", + "BYOCPaymentSession", "Control", "ControlConfig", "ControlMode", @@ -52,6 +62,7 @@ "NoOrchestratorAvailableError", "OrchestratorRejection", "PaymentError", + "PaymentRequiredError", "MediaPublish", "MediaPublishConfig", "MediaPublishTrack", @@ -73,6 +84,7 @@ "SelectionCursor", "orchestrator_selector", "StartJobRequest", + "start_byoc_job", "start_lv2v", "start_scope", "TricklePublishError", diff --git a/src/livepeer_gateway/byoc.py b/src/livepeer_gateway/byoc.py new file mode 100644 index 0000000..67268af --- /dev/null +++ b/src/livepeer_gateway/byoc.py @@ -0,0 +1,726 @@ +from __future__ import annotations + +import asyncio +import base64 +import json +import logging +import numbers +import ssl +import uuid +from dataclasses import dataclass, field +from typing import Any, Optional, Sequence +from urllib.error import HTTPError, URLError +from urllib.parse import quote, urlparse, urlunparse +from urllib.request import Request, urlopen + +from .byoc_payments import BYOCPaymentSession +from .capabilities import CapabilityId, build_capabilities +from .control import Control +from .errors import ( + LivepeerGatewayError, + NoOrchestratorAvailableError, + OrchestratorRejection, + PaymentError, + PaymentRequiredError, + SkipPaymentCycle, +) +from .events import Events +from .media_output import LagPolicy, MediaOutput +from .media_publish import MediaPublish, MediaPublishConfig +from .orch_info import get_orch_info as _get_orch_info +from .orchestrator import _extract_error_message, resolve_transcoder_http_url +from .selection import orchestrator_selector + +_LOG = logging.getLogger(__name__) + + +def _header_get(headers: dict[str, str], key: str) -> Optional[str]: + key_lower = key.lower() + for name, value in headers.items(): + if name.lower() == key_lower: + return value + return None + + +def _field_value(obj: Any, snake: str, camel: str) -> Any: + if isinstance(obj, dict): + if snake in obj: + return obj[snake] + if camel in obj: + return obj[camel] + return None + v = getattr(obj, snake, None) + if v is not None: + return v + return getattr(obj, camel, None) + + +def _nonzero_real_scalar(val: Any) -> bool: + if val is None or isinstance(val, bool): + return False + if isinstance(val, bytes): + if not val: + return False + return int.from_bytes(val, "big") > 0 + if isinstance(val, numbers.Real): + return val != 0 + return False + + +def _orch_info_ticket_params_usable(info: Any) -> bool: + """True when ticket_params has non-zero face_value and win_prob.""" + params = getattr(info, "ticket_params", None) + if params is None: + return False + face = _field_value(params, "face_value", "faceValue") + win = _field_value(params, "win_prob", "winProb") + return _nonzero_real_scalar(face) and _nonzero_real_scalar(win) + + +def _price_info_matches_byoc(price_info: Any, capability_name: str) -> bool: + if price_info is None: + return False + capability = _field_value(price_info, "capability", "capability") + constraint = _field_value(price_info, "constraint", "constraint") + price_per_unit = _field_value(price_info, "price_per_unit", "pricePerUnit") + pixels_per_unit = _field_value(price_info, "pixels_per_unit", "pixelsPerUnit") + return ( + capability == int(CapabilityId.BYOC) + and constraint == capability_name + and _nonzero_real_scalar(price_per_unit) + and _nonzero_real_scalar(pixels_per_unit) + ) + + +def _orch_info_has_byoc_price(info: Any, capability_name: str) -> bool: + top_price = _field_value(info, "price_info", "priceInfo") + if _price_info_matches_byoc(top_price, capability_name): + return True + + caps_prices = _field_value(info, "capabilities_prices", "capabilitiesPrices") + if caps_prices is None: + return False + for price_info in caps_prices: + if _price_info_matches_byoc(price_info, capability_name): + return True + return False + + +def _orch_info_aggregate_price_usable(info: Any) -> bool: + """ + True when top-level price_info has a non-zero per-pixel quote. + + go-livepeer omits ``capabilities_prices`` on GetOrchestrator responses built + via ``orchestratorInfoWithCaps`` (non-nil request capabilities). The effective + job price is only the aggregate ``price_info`` without BYOC capability / + constraint fields — see ``PriceInfoForCaps`` vs ``GetCapabilitiesPrices``. + """ + top = _field_value(info, "price_info", "priceInfo") + if top is None: + return False + return _nonzero_real_scalar( + _field_value(top, "price_per_unit", "pricePerUnit") + ) and _nonzero_real_scalar( + _field_value(top, "pixels_per_unit", "pixelsPerUnit") + ) + + +def _orch_info_supports_byoc_payment(info: Any, capability_name: str) -> bool: + if not _orch_info_ticket_params_usable(info): + return False + if _orch_info_has_byoc_price(info, capability_name): + return True + # Capped-path OrchestratorInfo: aggregate price only, no BYOC-tagged rows. + return _orch_info_aggregate_price_usable(info) + + +def _get_payment_orch_info( + orch_url: str, + *, + signer_url: Optional[str], + signer_headers: Optional[dict[str, str]], + capabilities: Any, + capability_name: str, + use_tofu: bool = True, +) -> tuple[Any, Any]: + """ + Fetch OrchestratorInfo for BYOC payment preflight. + + First calls ``GetOrchestrator`` with BYOC capability constraints. go-livepeer + then returns aggregate ``price_info`` and often omits ``capabilities_prices`` + (see ``orchestratorInfoWithCaps``); that shape is accepted without a second + RPC. + + If that response still fails preflight, retries once **without** capability + constraints for legacy orchestrators that only surface BYOC pricing on the + unfiltered path. + """ + payment_info = _get_orch_info( + orch_url, + signer_url=signer_url, + signer_headers=signer_headers, + capabilities=capabilities, + use_tofu=use_tofu, + ) + if _orch_info_supports_byoc_payment(payment_info, capability_name): + return payment_info, capabilities + + legacy_info = _get_orch_info( + orch_url, + signer_url=signer_url, + signer_headers=signer_headers, + use_tofu=use_tofu, + ) + if _orch_info_supports_byoc_payment(legacy_info, capability_name): + _LOG.debug( + "BYOC payment preflight: using legacy orch info response for %s", + capability_name, + ) + return legacy_info, capabilities + + return payment_info, capabilities + + +@dataclass(frozen=True) +class BYOCJobRequest: + capability: str + request_id: Optional[str] = None + stream_id: Optional[str] = None + request: Optional[dict[str, Any]] = None + parameters: Optional[dict[str, Any]] = None + body: Optional[dict[str, Any]] = None + timeout_seconds: int = 30 + enable_video_ingress: bool = True + enable_video_egress: bool = True + enable_data_output: bool = False + # Orchestrator default in go-livepeer is POST {transcoder}/ai/stream/start; + # the gateway uses /process/stream/start. Either a path on the selected + # transcoder origin or a full http(s) URL. + stream_start_endpoint: str = "/ai/stream/start" + stream_payment_endpoint: str = "/ai/stream/payment" + + def _job_id(self) -> str: + if self.request_id and self.request_id.strip(): + return self.request_id.strip() + if self.stream_id and self.stream_id.strip(): + return self.stream_id.strip() + return uuid.uuid4().hex + + def _request_json(self, job_id: str) -> str: + payload: dict[str, Any] = {} + if self.request: + payload.update(self.request) + payload.setdefault("stream_id", self.stream_id or job_id) + return json.dumps(payload, separators=(",", ":")) + + def _parameters_json(self) -> str: + payload: dict[str, Any] = {} + if self.parameters: + payload.update(self.parameters) + payload.setdefault("enable_video_ingress", self.enable_video_ingress) + payload.setdefault("enable_video_egress", self.enable_video_egress) + payload.setdefault("enable_data_output", self.enable_data_output) + return json.dumps(payload, separators=(",", ":")) + + def _body(self, job_id: str) -> dict[str, Any]: + payload: dict[str, Any] = {} + if self.body: + payload.update(self.body) + payload.setdefault("stream_id", self.stream_id or job_id) + return payload + + +@dataclass(frozen=True) +class BYOCJob: + raw: dict[str, Any] + job_id: str + capability: str + publish_url: Optional[str] = None + subscribe_url: Optional[str] = None + control_url: Optional[str] = None + events_url: Optional[str] = None + data_url: Optional[str] = None + control: Optional[Control] = None + events: Optional[Events] = None + _media: Optional[MediaPublish] = field(default=None, repr=False, compare=False) + _payment_session: Optional[BYOCPaymentSession] = field(default=None, repr=False, compare=False) + _signed_job_header: Optional[str] = field(default=None, repr=False, compare=False) + _payment_task: Optional[asyncio.Task] = field(default=None, repr=False, compare=False) + _stream_stop_url: Optional[str] = field(default=None, repr=False, compare=False) + _stop_timeout_s: float = field(default=30.0, repr=False, compare=False) + + @staticmethod + def from_start_response( + data: dict[str, Any], + *, + job_id: str, + capability: str, + payment_session: Optional[BYOCPaymentSession] = None, + signed_job_header: Optional[str] = None, + stream_stop_url: Optional[str] = None, + stop_timeout_s: float = 30.0, + ) -> "BYOCJob": + headers = data.get("headers") + if not isinstance(headers, dict): + headers = {} + + control_url = _header_get(headers, "X-Control-Url") + events_url = _header_get(headers, "X-Events-Url") + publish_url = _header_get(headers, "X-Publish-Url") + subscribe_url = _header_get(headers, "X-Subscribe-Url") + data_url = _header_get(headers, "X-Data-Url") + + return BYOCJob( + raw=data, + job_id=job_id, + capability=capability, + publish_url=publish_url, + subscribe_url=subscribe_url, + control_url=control_url, + events_url=events_url, + data_url=data_url, + control=Control(control_url) if control_url else None, + events=Events(events_url) if events_url else None, + _payment_session=payment_session, + _signed_job_header=signed_job_header, + _stream_stop_url=stream_stop_url, + _stop_timeout_s=stop_timeout_s, + ) + + def start_media(self, config: MediaPublishConfig) -> MediaPublish: + if not self.publish_url: + raise LivepeerGatewayError("No publish_url present on this BYOC job") + if self._media is None: + media = MediaPublish(self.publish_url, config=config) + object.__setattr__(self, "_media", media) + return self._media + + def media_output( + self, + *, + start_seq: int = -2, + max_retries: int = 5, + max_segment_bytes: Optional[int] = None, + connection_close: bool = False, + chunk_size: int = 64 * 1024, + max_segments: int = 5, + on_lag: LagPolicy = LagPolicy.LATEST, + ) -> MediaOutput: + if not self.subscribe_url: + raise LivepeerGatewayError("No subscribe_url present on this BYOC job") + return MediaOutput( + self.subscribe_url, + start_seq=start_seq, + max_retries=max_retries, + max_segment_bytes=max_segment_bytes, + connection_close=connection_close, + chunk_size=chunk_size, + max_segments=max_segments, + on_lag=on_lag, + ) + + @property + def payment_session(self) -> Optional[BYOCPaymentSession]: + return self._payment_session + + def start_payment_sender(self, *, interval_s: float = 5.0) -> Optional[asyncio.Task]: + if getattr(self, "_payment_task", None) is not None: + return self._payment_task + if not self._payment_session or not self._signed_job_header: + return None + try: + loop = asyncio.get_running_loop() + except RuntimeError: + _LOG.warning( + "No running event loop; BYOC payment sender not started. " + "Call job.start_payment_sender() from async code to enable." + ) + return None + + task = loop.create_task( + _byoc_payment_sender( + self._signed_job_header, + self._payment_session, + interval_s=interval_s, + ) + ) + object.__setattr__(self, "_payment_task", task) + return task + + async def stop(self) -> dict[str, Any]: + if not self._stream_stop_url: + raise LivepeerGatewayError("No stream stop URL present on this BYOC job") + if not self._signed_job_header: + raise LivepeerGatewayError("No signed job header present on this BYOC job") + return await asyncio.to_thread( + _post_byoc_stop, + self._stream_stop_url, + payload={"stream_id": self.job_id}, + headers={"Livepeer": self._signed_job_header}, + timeout=self._stop_timeout_s, + ) + + async def close(self) -> None: + tasks = [] + payment_task = getattr(self, "_payment_task", None) + if payment_task is not None and not payment_task.done(): + payment_task.cancel() + tasks.append(payment_task) + if self.control is not None: + tasks.append(self.control.close()) + if self._media is not None: + tasks.append(self._media.close()) + if not tasks: + return + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, BaseException) and not isinstance(result, asyncio.CancelledError): + raise result + + +async def _byoc_payment_sender( + signed_job_header: str, + session: BYOCPaymentSession, + *, + interval_s: float, +) -> None: + try: + await asyncio.to_thread(session.send_stream_payment, signed_job_header) + except SkipPaymentCycle as e: + _LOG.debug("BYOC payment sender: first payment skipped (%s)", e) + except Exception: + _LOG.exception("BYOC payment sender: first immediate payment failed") + + while True: + await asyncio.sleep(interval_s) + try: + await asyncio.to_thread(session.send_stream_payment, signed_job_header) + except SkipPaymentCycle as e: + _LOG.debug("BYOC payment sender: skipping payment cycle (%s)", e) + except Exception: + _LOG.exception("BYOC payment sender failed") + + +def _get_start_payment_headers( + session: BYOCPaymentSession, + *, + payment_info: Any, + capability_name: str, + allow_skip: bool, +) -> tuple[Optional[str], str]: + payment_header: Optional[str] = None + segment_header = "" + + try: + payment = session.get_payment() + payment_header = payment.payment + segment_header = payment.seg_creds or "" + except SkipPaymentCycle as pay_skip: + if not allow_skip: + raise LivepeerGatewayError( + "BYOC start endpoint returned HTTP 402 payment required, " + "but the signer skipped payment generation; cannot retry start " + "without a payment ticket." + ) from pay_skip + if not _orch_info_ticket_params_usable(payment_info): + raise LivepeerGatewayError( + "BYOC signer returned skip-payment, but OrchestratorInfo ticket_params " + "are missing or zero (face_value and win_prob are required). " + "Ticket expected value (EV) cannot be computed; refusing to start." + ) from pay_skip + _LOG.debug("BYOC signer returned skip-payment response on start (%s)", pay_skip) + except (PaymentError, LivepeerGatewayError) as pay_err: + msg = str(pay_err) + if "priceinfo" in msg.lower() or "price" in msg.lower(): + raise LivepeerGatewayError( + f"BYOC signer pricing error: the remote signer rejected payment generation " + f"(likely missing or zero priceInfo in OrchestratorInfo for capability " + f"'{capability_name}'). Ensure the orchestrator advertises " + f"capability-specific pricing for BYOC/{capability_name}. " + f"Signer detail: {msg}" + ) from pay_err + raise + + if payment_header is not None and not payment_header: + raise LivepeerGatewayError( + f"BYOC signer returned empty payment ticket for capability " + f"'{capability_name}'. Check signer configuration and " + f"orchestrator pricing for BYOC/{capability_name}." + ) + + return payment_header, segment_header + + +def _post_byoc_json( + url: str, + *, + payload: dict[str, Any], + headers: dict[str, str], + timeout: float, + op: str, +) -> dict[str, Any]: + """ + POST a JSON payload to a BYOC endpoint and return a structured response. + + On HTTP 402, raises ``PaymentRequiredError``. Other HTTP errors raise + ``LivepeerGatewayError``. The success response always includes + ``{"status_code", "headers", "body"}``; ``body`` is decoded JSON when + possible, raw text otherwise. + """ + body_bytes = json.dumps(payload).encode("utf-8") + req_headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "User-Agent": "livepeer-python-gateway/0.1", + } + req_headers.update(headers) + + req = Request(url, data=body_bytes, headers=req_headers, method="POST") + ssl_ctx = ssl._create_unverified_context() + try: + with urlopen(req, timeout=timeout, context=ssl_ctx) as resp: + raw_body = resp.read().decode("utf-8", errors="replace") + response_headers = {k: v for k, v in resp.headers.items()} + status = resp.status + except HTTPError as e: + body = _extract_error_message(e) + body_part = f"; body={body!r}" if body else "" + if e.code == 402: + raise PaymentRequiredError( + f"HTTP BYOC {op} error: HTTP 402 from endpoint (url={url}){body_part}" + ) from e + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: HTTP {e.code} from endpoint (url={url}){body_part}" + ) from e + except ConnectionRefusedError as e: + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: connection refused (is the server running? is the host/port correct?) (url={url})" + ) from e + except URLError as e: + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" + ) from e + except LivepeerGatewayError: + raise + except Exception as e: + raise LivepeerGatewayError( + f"HTTP BYOC {op} error: unexpected error: {e.__class__.__name__}: {e} (url={url})" + ) from e + + parsed_body: Any = None + if raw_body.strip(): + try: + parsed_body = json.loads(raw_body) + except Exception: + parsed_body = raw_body + return {"status_code": status, "headers": response_headers, "body": parsed_body} + + +def _post_byoc_start( + url: str, + *, + payload: dict[str, Any], + headers: dict[str, str], + timeout: float, +) -> dict[str, Any]: + return _post_byoc_json(url, payload=payload, headers=headers, timeout=timeout, op="start") + + +def _post_byoc_stop( + url: str, + *, + payload: dict[str, Any], + headers: dict[str, str], + timeout: float, +) -> dict[str, Any]: + return _post_byoc_json(url, payload=payload, headers=headers, timeout=timeout, op="stop") + + +def _derive_stream_stop_url(start_url: str, job_id: str) -> str: + parsed = urlparse(start_url) + path = parsed.path or "" + quoted_job_id = quote(job_id, safe="") + if path.endswith("/process/stream/start"): + path = path[: -len("/process/stream/start")] + f"/process/stream/{quoted_job_id}/stop" + elif path.endswith("/start"): + path = path[: -len("/start")] + "/stop" + else: + raise ValueError(f"Cannot derive stream stop URL from start URL: {start_url!r}") + return urlunparse(parsed._replace(path=path)) + + +def start_byoc_job( + orch_url: Optional[Sequence[str] | str], + req: BYOCJobRequest, + *, + token: Optional[str] = None, + signer_url: Optional[str] = None, + signer_headers: Optional[dict[str, str]] = None, + discovery_url: Optional[str] = None, + discovery_headers: Optional[dict[str, str]] = None, + use_tofu: bool = True, +) -> BYOCJob: + """ + Start a BYOC job through the Python gateway. + + Mirrors the LV2V ``start_lv2v`` shape: an orchestrator is selected (token + orchestrators -> explicit orch_url -> token discovery -> explicit discovery + -> signer-derived discovery), payment is preflighted, the job credential is + signed via ``POST /sign-byoc-job``, and the job is started against + ``stream_start_endpoint`` (default ``/ai/stream/start``). + """ + if not isinstance(req.capability, str) or not req.capability.strip(): + raise LivepeerGatewayError("start_byoc_job requires a non-empty capability") + if not isinstance(req.stream_start_endpoint, str) or not req.stream_start_endpoint.strip(): + raise LivepeerGatewayError("BYOCJobRequest.stream_start_endpoint must be non-empty") + if not isinstance(req.stream_payment_endpoint, str) or not req.stream_payment_endpoint.strip(): + raise LivepeerGatewayError("BYOCJobRequest.stream_payment_endpoint must be non-empty") + + resolved_signer_url = signer_url + resolved_signer_headers = signer_headers + resolved_discovery_url = discovery_url + resolved_discovery_headers = discovery_headers + if token is not None: + from .token import parse_token + + token_data = parse_token(token) + if resolved_signer_url is None: + resolved_signer_url = token_data.get("signer") + if resolved_signer_headers is None: + resolved_signer_headers = token_data.get("signer_headers") + if resolved_discovery_url is None: + resolved_discovery_url = token_data.get("discovery") + if resolved_discovery_headers is None: + resolved_discovery_headers = token_data.get("discovery_headers") + + capabilities = build_capabilities(CapabilityId.BYOC, req.capability.strip()) + cursor = orchestrator_selector( + orch_url, + signer_url=resolved_signer_url, + signer_headers=resolved_signer_headers, + discovery_url=resolved_discovery_url, + discovery_headers=resolved_discovery_headers, + capabilities=capabilities, + use_tofu=use_tofu, + ) + + start_rejections: list[OrchestratorRejection] = [] + while True: + try: + selected_url, info = cursor.next() + except NoOrchestratorAvailableError as e: + all_rejections = list(e.rejections) + start_rejections + if all_rejections: + raise NoOrchestratorAvailableError( + f"All orchestrators failed ({len(all_rejections)} tried)", + rejections=all_rejections, + ) from None + raise + + try: + payment_info, payment_capabilities = _get_payment_orch_info( + selected_url, + signer_url=resolved_signer_url, + signer_headers=resolved_signer_headers, + capabilities=capabilities, + capability_name=req.capability.strip(), + use_tofu=use_tofu, + ) + + session = BYOCPaymentSession( + resolved_signer_url, + payment_info, + capability_name=req.capability.strip(), + signer_headers=resolved_signer_headers, + capabilities=payment_capabilities, + stream_payment_endpoint=req.stream_payment_endpoint, + use_tofu=use_tofu, + ) + + job_id = req._job_id() + request_json = req._request_json(job_id) + parameters_json = req._parameters_json() + timeout_seconds = max(1, int(req.timeout_seconds)) + signed = session.sign_byoc_job( + job_id=job_id, + capability=req.capability.strip(), + request=request_json, + parameters=parameters_json, + timeout_seconds=timeout_seconds, + ) + + signed_payload = { + "id": job_id, + "request": request_json, + "parameters": parameters_json, + "capability": req.capability.strip(), + "sender": signed.sender, + "sig": signed.signature, + "timeout_seconds": timeout_seconds, + } + signed_job_header = base64.b64encode( + json.dumps(signed_payload, separators=(",", ":")).encode("utf-8") + ).decode("ascii") + + capability_name = req.capability.strip() + payment_header, segment_header = _get_start_payment_headers( + session, + payment_info=payment_info, + capability_name=capability_name, + allow_skip=True, + ) + headers = {"Livepeer": signed_job_header} + if payment_header: + headers["Livepeer-Payment"] = payment_header + headers["Livepeer-Segment"] = segment_header + start_url = resolve_transcoder_http_url(info.transcoder, req.stream_start_endpoint) + try: + stop_url = _derive_stream_stop_url(start_url, job_id) + except ValueError as e: + raise LivepeerGatewayError(str(e)) from e + start_payload = req._body(job_id) + start_timeout = float(timeout_seconds) + try: + data = _post_byoc_start( + start_url, + payload=start_payload, + headers=headers, + timeout=start_timeout, + ) + except PaymentRequiredError: + payment_header, segment_header = _get_start_payment_headers( + session, + payment_info=payment_info, + capability_name=capability_name, + allow_skip=False, + ) + headers = { + "Livepeer": signed_job_header, + "Livepeer-Payment": payment_header, + "Livepeer-Segment": segment_header, + } + _LOG.debug("BYOC start returned HTTP 402; retrying with a fresh payment ticket") + data = _post_byoc_start( + start_url, + payload=start_payload, + headers=headers, + timeout=start_timeout, + ) + job = BYOCJob.from_start_response( + data, + job_id=job_id, + capability=capability_name, + payment_session=session, + signed_job_header=signed_job_header, + stream_stop_url=stop_url, + stop_timeout_s=float(timeout_seconds), + ) + job.start_payment_sender() + return job + except LivepeerGatewayError as e: + _LOG.debug( + "start_byoc_job candidate failed, trying fallback if available: %s (%s)", + selected_url, + str(e), + ) + start_rejections.append(OrchestratorRejection(url=selected_url, reason=str(e))) diff --git a/src/livepeer_gateway/byoc_payments.py b/src/livepeer_gateway/byoc_payments.py new file mode 100644 index 0000000..a3955f3 --- /dev/null +++ b/src/livepeer_gateway/byoc_payments.py @@ -0,0 +1,233 @@ +from __future__ import annotations + +import base64 +import ssl +import uuid +from dataclasses import dataclass +from typing import Any, Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from . import lp_rpc_pb2 +from .errors import LivepeerGatewayError, PaymentError, SignerRefreshRequired +from .remote_signer import GetPaymentResponse + + +@dataclass(frozen=True) +class SignedBYOCJob: + sender: str + signature: str + + +class BYOCPaymentSession: + """ + BYOC payment session. + + - Signs job credentials via the remote signer's ``POST /sign-byoc-job`` + endpoint (V1 binary signing format, server-side flatten). + - Generates time-based BYOC payments via ``/generate-live-payment`` + using ``type=byoc`` plus the BYOC capability constraint. + - Sends recurring stream payments to the orchestrator's + ``/ai/stream/payment`` endpoint (or operator override). + """ + + def __init__( + self, + signer_url: Optional[str], + info: lp_rpc_pb2.OrchestratorInfo, + *, + capability_name: str, + signer_headers: Optional[dict[str, str]] = None, + capabilities: Optional[lp_rpc_pb2.Capabilities] = None, + max_refresh_retries: int = 3, + stream_payment_endpoint: str = "/ai/stream/payment", + use_tofu: bool = True, + ) -> None: + if not isinstance(capability_name, str) or not capability_name.strip(): + raise PaymentError("capability_name must be a non-empty string") + if not isinstance(stream_payment_endpoint, str) or not stream_payment_endpoint.strip(): + raise PaymentError("stream_payment_endpoint must be a non-empty string") + + self._signer_url = signer_url + self._signer_headers = signer_headers + self._info = info + self._capability_name = capability_name.strip() + self._capabilities = capabilities + self._max_refresh_retries = max(0, int(max_refresh_retries)) + self._state: Optional[dict[str, Any]] = None + self._stream_payment_endpoint = stream_payment_endpoint.strip() + self._timeout_seconds: int = 0 + self._manifest_id: Optional[str] = None + self._use_tofu = use_tofu + + def set_timeout_seconds(self, timeout_seconds: int) -> None: + self._timeout_seconds = max(0, int(timeout_seconds)) + + def _build_payment_payload(self) -> dict[str, Any]: + if not self._manifest_id: + raise PaymentError("BYOC payment requires a signed job_id before requesting payment") + pb = self._info.SerializeToString() + payload: dict[str, Any] = { + "orchestrator": base64.b64encode(pb).decode("ascii"), + "type": "byoc", + "RequestID": str(uuid.uuid4()), + "manifestID": self._manifest_id, + } + if self._timeout_seconds > 0: + payload["preloadSeconds"] = self._timeout_seconds + if self._state is not None: + payload["state"] = self._state + if self._capabilities is not None: + payload["capabilities"] = base64.b64encode( + self._capabilities.SerializeToString() + ).decode("ascii") + return payload + + def _refresh_orchestrator_info(self) -> None: + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for refresh") + + from .orch_info import get_orch_info + + self._info = get_orch_info( + self._info.transcoder, + signer_url=self._signer_url, + signer_headers=self._signer_headers, + capabilities=self._capabilities, + use_tofu=self._use_tofu, + ) + + def _request_payment(self) -> GetPaymentResponse: + from .orchestrator import _join_signer_endpoint, post_json + + url = _join_signer_endpoint(self._signer_url, "/generate-live-payment") + data = post_json(url, self._build_payment_payload(), headers=self._signer_headers) + + payment = data.get("payment") + if not isinstance(payment, str) or not payment: + raise PaymentError(f"GetPayment error: missing/invalid 'payment' in response (url={url})") + + seg_creds = data.get("segCreds") + if seg_creds is not None and not isinstance(seg_creds, str): + raise PaymentError(f"GetPayment error: invalid 'segCreds' in response (url={url})") + + state = data.get("state") + if not isinstance(state, dict): + raise PaymentError(f"Remote signer response missing 'state' object (url={url})") + + self._state = state + return GetPaymentResponse(payment=payment, seg_creds=seg_creds) + + def get_payment(self) -> GetPaymentResponse: + if not self._signer_url: + return GetPaymentResponse(payment="", seg_creds="") + + attempts = 0 + while True: + try: + return self._request_payment() + except SignerRefreshRequired as e: + if attempts >= self._max_refresh_retries: + raise PaymentError(f"Signer refresh required after {attempts} retries: {e}") from e + self._refresh_orchestrator_info() + attempts += 1 + + def _post_empty(self, url: str, headers: dict[str, str], *, op: str, timeout: float = 5.0) -> None: + from .orchestrator import _extract_error_message + + req = Request(url, data=b"", headers=headers, method="POST") + ssl_ctx = ssl._create_unverified_context() + try: + with urlopen(req, timeout=timeout, context=ssl_ctx) as resp: + resp.read() + except HTTPError as e: + body = _extract_error_message(e) + body_part = f"; body={body!r}" if body else "" + raise PaymentError( + f"HTTP {op} error: HTTP {e.code} from endpoint (url={url}){body_part}" + ) from e + except ConnectionRefusedError as e: + raise PaymentError( + f"HTTP {op} error: connection refused (is the server running? is the host/port correct?) (url={url})" + ) from e + except URLError as e: + raise PaymentError( + f"HTTP {op} error: failed to reach endpoint: {getattr(e, 'reason', e)} (url={url})" + ) from e + except LivepeerGatewayError: + raise + except Exception as e: + raise PaymentError( + f"HTTP {op} error: unexpected error: {e.__class__.__name__}: {e} (url={url})" + ) from e + + def sign_byoc_job( + self, + job_id: str, + capability: str, + request: str, + parameters: str, + timeout_seconds: int, + ) -> SignedBYOCJob: + """ + Ask the remote signer to sign a BYOC job credential using the V1 + binary signing format. The signer is authoritative for the wire layout; + we only forward the structured fields. + """ + if not self._signer_url: + raise PaymentError("sign_byoc_job requires signer_url") + if not isinstance(request, str): + raise PaymentError("request must be a JSON string") + if not isinstance(parameters, str): + raise PaymentError("parameters must be a JSON string") + if not isinstance(job_id, str) or not job_id.strip(): + raise PaymentError("job_id must be a non-empty string") + + self.set_timeout_seconds(timeout_seconds) + self._manifest_id = job_id.strip() + + from .orchestrator import _join_signer_endpoint, post_json + + url = _join_signer_endpoint(self._signer_url, "/sign-byoc-job") + data = post_json( + url, + { + "id": job_id, + "capability": capability, + "request": request, + "parameters": parameters, + "timeout_seconds": self._timeout_seconds, + "signature_format": "v1", + }, + headers=self._signer_headers, + ) + sender = data.get("sender") + signature = data.get("signature") + if not isinstance(sender, str) or not sender: + raise PaymentError(f"Invalid signer response: missing sender (url={url})") + if not isinstance(signature, str) or not signature: + raise PaymentError(f"Invalid signer response: missing signature (url={url})") + return SignedBYOCJob(sender=sender, signature=signature) + + def send_stream_payment(self, job_header: str) -> None: + """ + Send a single recurring BYOC stream payment to the orchestrator. + + ``job_header`` is the base64-encoded ``Livepeer:`` header established + at job start; we attach a fresh payment ticket to it. + """ + if not isinstance(job_header, str) or not job_header: + raise PaymentError("job_header must be a non-empty base64 string") + if not self._info.transcoder: + raise PaymentError("OrchestratorInfo missing transcoder URL for stream payment") + + from .orchestrator import resolve_transcoder_http_url + + p = self.get_payment() + url = resolve_transcoder_http_url(self._info.transcoder, self._stream_payment_endpoint) + headers = { + "Livepeer": job_header, + "Livepeer-Payment": p.payment, + "Livepeer-Segment": p.seg_creds or "", + } + self._post_empty(url, headers, op="stream payment") diff --git a/src/livepeer_gateway/capabilities.py b/src/livepeer_gateway/capabilities.py index 27e5688..dde24c3 100644 --- a/src/livepeer_gateway/capabilities.py +++ b/src/livepeer_gateway/capabilities.py @@ -45,6 +45,7 @@ class CapabilityId(IntEnum): IMAGE_TO_TEXT = 34 LIVE_VIDEO_TO_VIDEO = 35 TEXT_TO_SPEECH = 36 + BYOC = 37 CAPABILITY_ID_TO_NAME: dict[int, str] = { -2: "Invalid", @@ -85,6 +86,7 @@ class CapabilityId(IntEnum): 34: "Image to text", 35: "Live video to video", 36: "Text to speech", + 37: "BYOC", } @@ -152,6 +154,45 @@ def capability_pipeline_id(cap_id: int) -> Optional[str]: return enum_name.lower().replace("_", "-") +def capability_id_from_pipeline(pipeline_id: str) -> Optional[CapabilityId]: + """ + Reverse of capability_pipeline_id: convert a pipeline ID back to a CapabilityId. + + Example: + live-video-to-video -> CapabilityId.LIVE_VIDEO_TO_VIDEO + byoc -> CapabilityId.BYOC + """ + enum_name = pipeline_id.upper().replace("-", "_") + try: + return CapabilityId[enum_name] + except KeyError: + return None + + +def build_capabilities_from_queries(queries: list[str]) -> Optional[lp_rpc_pb2.Capabilities]: + """ + Build a Capabilities protobuf from pipeline-id/model query strings. + + Each query has the form "pipeline-id/model" (e.g. "byoc/text-reversal", + "live-video-to-video/noop"). Returns None if no valid queries are provided. + """ + caps = lp_rpc_pb2.Capabilities() + found = False + for q in queries: + parts = q.split("/", 1) + pipeline_id = parts[0] + model = parts[1] if len(parts) > 1 else None + cap = capability_id_from_pipeline(pipeline_id) + if cap is None: + continue + cap_id = int(cap) + caps.capacities[cap_id] = 1 + if model: + caps.constraints.PerCapability[cap_id].models[model] + found = True + return caps if found else None + + def capabilities_to_query(caps: Optional[lp_rpc_pb2.Capabilities]) -> list[str]: """ Build discovery query values in `pipeline-id/model` form. diff --git a/src/livepeer_gateway/errors.py b/src/livepeer_gateway/errors.py index 14fb8b4..6bb8112 100644 --- a/src/livepeer_gateway/errors.py +++ b/src/livepeer_gateway/errors.py @@ -31,5 +31,9 @@ class SkipPaymentCycle(LivepeerGatewayError): """Raised when the signer returns HTTP 482 to skip a payment cycle.""" +class PaymentRequiredError(LivepeerGatewayError): + """Raised when an endpoint returns HTTP 402 and requires payment.""" + + class PaymentError(LivepeerGatewayError): """Raised when a PaymentSession operation fails.""" diff --git a/src/livepeer_gateway/media_publish.py b/src/livepeer_gateway/media_publish.py index bf11725..7d0ca0f 100644 --- a/src/livepeer_gateway/media_publish.py +++ b/src/livepeer_gateway/media_publish.py @@ -950,7 +950,12 @@ async def _stream_pipe_to_trickle(self, read_file: BinaryIO) -> None: # NB: This intentionally keeps trickle rolling # accommodate long stalls / inactivity from the # PyAV end, eg during model loading - _LOG.warning( + log_fn = ( + _LOG.debug + if segment_seq == 0 + else _LOG.warning + ) + log_fn( "MediaPublish[%s] trickle segment seq=%s idle for " "%.1fs; rolling over to a fresh empty segment", self._channel_name, @@ -986,15 +991,24 @@ async def _stream_pipe_to_trickle(self, read_file: BinaryIO) -> None: # lean on that instead of doing that here. try: await segment.write(chunk) - except TrickleSegmentWriteError: + except TrickleSegmentWriteError as e: self._active_segment_drain = True self._stats["segments_failed"] += 1 - _LOG.warning( + # Single-line warning: write timeouts here are caused by the + # orchestrator not draining the trickle POST body. The full + # CancelledError -> TimeoutError -> TrickleSegmentWriteError + # chain is noise and obscures the actual cause. + log_fn = ( + _LOG.debug + if segment_seq == 0 + else _LOG.warning + ) + log_fn( "MediaPublish[%s] dropped segment seq=%s mid-stream; " - "draining pipe until wall-clock segment ends", + "draining pipe until wall-clock segment ends (%s)", self._channel_name, segment_seq, - exc_info=True, + e, ) if self._should_close_segment_after_loop(): await self._close_active_segment_locked( diff --git a/src/livepeer_gateway/orchestrator.py b/src/livepeer_gateway/orchestrator.py index 844cd7b..f51e094 100644 --- a/src/livepeer_gateway/orchestrator.py +++ b/src/livepeer_gateway/orchestrator.py @@ -203,6 +203,47 @@ def _http_origin(url: str) -> str: return f"{parsed.scheme}://{parsed.netloc}" +def _join_signer_endpoint(signer_url: str, path: str) -> str: + """ + Join an endpoint path onto signer_url while preserving any existing base path. + + Examples: + - https://example.com/api/signer + /sign-orchestrator-info + -> https://example.com/api/signer/sign-orchestrator-info + - https://example.com/api/signer/sign-orchestrator-info + same path + -> unchanged + """ + if not isinstance(signer_url, str) or not signer_url.strip(): + raise ValueError("signer_url must be a non-empty string") + parsed = _parse_http_url(signer_url, context="signer_url") + base_path = (parsed.path or "").rstrip("/") + base = urlunparse(parsed._replace(path=base_path, params="", query="", fragment="")) + suffix = path if path.startswith("/") else f"/{path}" + if base.endswith(suffix): + return base + return f"{base}{suffix}" + + +def resolve_transcoder_http_url(origin: str, path_or_absolute_url: str) -> str: + """ + Build the final HTTP URL for a BYOC (or similar) call rooted at an orchestrator + transcoder origin, or pass through an absolute URL. + + When ``path_or_absolute_url`` starts with ``http://`` or ``https://``, it is + returned unchanged. Otherwise it is treated as a path on ``origin``. + """ + o = path_or_absolute_url.strip() + if not o: + raise ValueError("path_or_absolute_url must be non-empty") + low = o.lower() + if low.startswith("http://") or low.startswith("https://"): + return o + if not o.startswith("/"): + o = "/" + o + base = _http_origin(origin) + return f"{base}{o}" + + def _append_caps(url: str, capabilities: Optional[lp_rpc_pb2.Capabilities]) -> str: """ Append repeated `caps` query parameters to a URL. diff --git a/src/livepeer_gateway/trickle_publisher.py b/src/livepeer_gateway/trickle_publisher.py index 1419720..8936a7c 100644 --- a/src/livepeer_gateway/trickle_publisher.py +++ b/src/livepeer_gateway/trickle_publisher.py @@ -356,7 +356,8 @@ async def _resolve_next_seq(self) -> int: _LOG.debug("Trickle resolved seq from %s: %s", url, resolved_seq) return resolved_seq else: - _LOG.warning("Trickle /next missing Lp-Trickle-Latest header") + # Common on first /next before orchestrator sets the header. + _LOG.debug("Trickle /next missing Lp-Trickle-Latest header") except Exception: _LOG.warning("Trickle /next request failed", exc_info=True) return -1 @@ -603,9 +604,18 @@ async def close(self) -> None: return try: await asyncio.wait_for(self.queue.put(None), timeout=_SEGMENT_QUEUE_PUT_TIMEOUT_S) - # BaseException to also capture cancellation errors, timeout errors, etc - except BaseException: - _LOG.warning("Trickle segment close suppressed seq=%s", self._seq, exc_info=True) + except asyncio.CancelledError: + # Cancellation during shutdown / segment rollover is expected; don't + # log a stack trace and don't swallow the cancel. + _LOG.debug("Trickle segment close cancelled seq=%s", self._seq) + raise + # Capture timeout errors and any other unexpected close-time failures; + # logged at warning level without a stack trace. KeyboardInterrupt / + # SystemExit are intentionally not caught so process-control exceptions + # still propagate. + except Exception as e: + log_fn = _LOG.debug if self._seq == 0 else _LOG.warning + log_fn("Trickle segment close suppressed seq=%s (%s)", self._seq, e) async def __aenter__(self) -> "SegmentWriter": return self diff --git a/uv.lock b/uv.lock index 4003c14..b604b8e 100644 --- a/uv.lock +++ b/uv.lock @@ -479,7 +479,7 @@ wheels = [ [[package]] name = "livepeer-gateway" -version = "0.1.0" +version = "0.1.1" source = { editable = "." } dependencies = [ { name = "aiohttp" },