Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions examples/byoc_text_reversal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import argparse
import asyncio
import json
import logging
from typing import Optional

from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job
from livepeer_gateway.errors import LivepeerGatewayError


def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run a BYOC text-reversal request through the Python gateway."
)
parser.add_argument(
"--text",
default="hello from byoc",
help="Input text to reverse. Default: 'hello from byoc'.",
)
parser.add_argument(
"--capability",
default="text-reversal",
help="BYOC capability name. Default: text-reversal.",
)
parser.add_argument(
"--orchestrator",
default=None,
help="Optional orchestrator URL or comma-separated URLs.",
)
parser.add_argument(
"--signer",
default=None,
help="Remote signer base URL.",
)
parser.add_argument(
"--discovery",
default=None,
help="Optional discovery endpoint URL.",
)
parser.add_argument(
"--token",
default=None,
help="Optional gateway token containing signer/discovery/orchestrator info.",
)
parser.add_argument(
"--timeout-seconds",
type=int,
default=30,
help="Timeout encoded into the BYOC Livepeer header. Default: 30.",
)
parser.add_argument(
"--output-grace-seconds",
type=float,
default=1.5,
help="Extra time to wait for final event/data output after stop. Default: 1.5.",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging.",
)
return parser.parse_args()


def _parse_orchestrator_arg(orchestrator_arg: Optional[str]):
if orchestrator_arg is None:
return None
parts = [part.strip() for part in orchestrator_arg.split(",") if part.strip()]
if not parts:
return None
if len(parts) == 1:
return parts[0]
return parts


async def _amain() -> None:
args = _parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(levelname)s %(name)s: %(message)s",
)

orch_url = _parse_orchestrator_arg(args.orchestrator)

job = None
try:
job = start_byoc_job(
orch_url,
BYOCJobRequest(
capability=args.capability,
enable_video_ingress=False,
enable_video_egress=False,
# reverse_server publishes JSON to events_url only, not data_url.
enable_data_output=False,
timeout_seconds=args.timeout_seconds,
),
token=args.token,
signer_url=args.signer,
discovery_url=args.discovery,
)

print("=== BYOC text-reversal ===")
print(f"job_id: {job.job_id}")
print(f"capability: {job.capability}")
print(f"control_url: {job.control_url}")
print(f"events_url: {job.events_url}")
print()

job.start_payment_sender()

reader_task: Optional[asyncio.Task] = None
stop_sent = False
try:
if job.control is None:
print("ERROR: job has no control_url; cannot send text.")
return

result_received = asyncio.Event()
received_payload: dict = {}

async def read_results() -> None:
if job.events is None:
print("WARN: no events channel on job; results will not be captured.")
result_received.set()
return
async for msg in job.events():
if isinstance(msg, dict) and isinstance(msg.get("reversed"), str):
received_payload.update(msg)
result_received.set()
return

reader_task = asyncio.create_task(read_results())

await asyncio.sleep(0.15)
await job.control.write({"text": args.text})
print(f"sent: {{\"text\": {args.text!r}}}")

try:
await asyncio.wait_for(result_received.wait(), timeout=args.timeout_seconds)
except asyncio.TimeoutError:
print("ERROR: timed out waiting for reversal result.")
return

original = received_payload.get("original") or received_payload.get("text") or args.text
reversed_text = received_payload.get("reversed")
print(f"result: {original!r} -> {reversed_text!r}")
print("payload:", json.dumps(received_payload, indent=2, sort_keys=True))

stop_resp = await job.stop()
stop_sent = True
print(f"stop: status={stop_resp['status_code']}")
finally:
if not stop_sent:
try:
stop_resp = await job.stop()
print(f"stop: status={stop_resp['status_code']}")
except Exception as stop_err:
print(f"WARN: failed to stop BYOC job: {stop_err}")
await asyncio.sleep(max(0.0, args.output_grace_seconds))
if reader_task is not None:
reader_task.cancel()
await asyncio.gather(reader_task, return_exceptions=True)
except LivepeerGatewayError as err:
print(f"ERROR: {err}")
finally:
if job is not None:
await job.close()


def main() -> None:
asyncio.run(_amain())


if __name__ == "__main__":
main()
104 changes: 104 additions & 0 deletions examples/byoc_write_frames.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import argparse
import asyncio
from fractions import Fraction

import av

from livepeer_gateway.byoc import BYOCJobRequest, start_byoc_job
from livepeer_gateway.errors import LivepeerGatewayError
from livepeer_gateway.media_publish import MediaPublishConfig, VideoOutputConfig


DEFAULT_CAPABILITY = "text-reversal"


def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Start a BYOC stream job and publish raw frames via publish_url."
)
p.add_argument(
"orchestrator",
nargs="?",
default=None,
help="Orchestrator (host:port). If omitted, discovery is used.",
)
p.add_argument(
"--capability",
default=DEFAULT_CAPABILITY,
help=f"BYOC capability name (default: {DEFAULT_CAPABILITY}).",
)
p.add_argument(
"--signer",
required=True,
help="Remote signer URL (no path). Required for BYOC job signing.",
)
p.add_argument(
"--discovery",
default=None,
help="Discovery endpoint for orchestrators.",
)
p.add_argument(
"--stream-id",
default=None,
help="Optional stream ID to include in BYOC request details.",
)
p.add_argument("--width", type=int, default=320, help="Frame width (default: 320).")
p.add_argument("--height", type=int, default=180, help="Frame height (default: 180).")
p.add_argument("--fps", type=float, default=30.0, help="Frames per second (default: 30).")
p.add_argument("--count", type=int, default=90, help="Number of frames to send (default: 90).")
return p.parse_args()


def _solid_rgb_frame(width: int, height: int, rgb: tuple[int, int, int]) -> av.VideoFrame:
frame = av.VideoFrame(width, height, "rgb24")
r, g, b = rgb
frame.planes[0].update(bytes([r, g, b]) * (width * height))
return frame


async def main() -> None:
args = _parse_args()
if args.fps <= 0:
raise SystemExit("--fps must be > 0")
frame_interval = 1.0 / args.fps

job = None
try:
job = start_byoc_job(
args.orchestrator,
BYOCJobRequest(
capability=args.capability,
stream_id=args.stream_id,
),
signer_url=args.signer,
discovery_url=args.discovery,
)

print("=== BYOC stream ===")
print("job_id:", job.job_id)
print("capability:", job.capability)
print("publish_url:", job.publish_url)
print()

media = job.start_media(
MediaPublishConfig(tracks=[VideoOutputConfig(fps=args.fps)])
)
job.start_payment_sender()

time_base = Fraction(1, max(1, int(round(args.fps))))
for i in range(max(0, args.count)):
color = (i * 5) % 255
frame = _solid_rgb_frame(args.width, args.height, (color, 0, 255 - color))
frame.pts = i
frame.time_base = time_base
await media.write_frame(frame)
await asyncio.sleep(frame_interval)
except LivepeerGatewayError as e:
print(f"ERROR: {e}")
finally:
if job is not None:
await job.close()


if __name__ == "__main__":
asyncio.run(main())
45 changes: 45 additions & 0 deletions examples/get_orchestrator_info.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import json
import logging
import sys
from typing import Any

from livepeer_gateway.capabilities import (
build_capabilities_from_queries,
compute_available,
format_capability,
get_capacity_in_use,
Expand Down Expand Up @@ -34,6 +36,11 @@ def _parse_args() -> argparse.Namespace:
" # Signer URL\n"
" python examples/get_orchestrator_info.py --signer https://signer.example.com\n"
"\n"
" # Request specific capabilities (e.g. BYOC)\n"
" python examples/get_orchestrator_info.py localhost:8935 --signer https://signer.example.com --caps byoc/text-reversal\n"
" python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop,byoc/my-pipeline\n"
" python examples/get_orchestrator_info.py localhost:8935 --caps live-video-to-video/noop --caps byoc/my-pipeline\n"
"\n"
" # JSON / JSONL output\n"
" python examples/get_orchestrator_info.py localhost:8935 --format json\n"
" python examples/get_orchestrator_info.py localhost:8935 --format jsonl\n"
Expand Down Expand Up @@ -65,6 +72,16 @@ def _parse_args() -> argparse.Namespace:
action="store_true",
help="Enable debug logging for discovery diagnostics.",
)
p.add_argument(
"--caps",
action="append",
default=None,
metavar="PIPELINE/MODEL",
help=(
"Request specific capabilities in pipeline/model form. "
"Can be comma-delimited or repeated (e.g. --caps byoc/text-reversal --caps live-video-to-video/noop)."
),
)
p.add_argument(
"--format",
choices=("text", "json", "jsonl"),
Expand Down Expand Up @@ -349,12 +366,38 @@ def _resolve_discovery_args(args: argparse.Namespace) -> tuple[Any, str | None,
return orchestrators, signer, signer_headers, discovery, discovery_headers


def _split_capability_queries(raw_caps: list[str] | None) -> list[str]:
if not raw_caps:
return []
queries: list[str] = []
for raw_cap in raw_caps:
queries.extend(
part.strip()
for part in raw_cap.split(",")
if part.strip()
)
return queries


def main() -> None:
args = _parse_args()

if args.debug:
logging.basicConfig(level=logging.DEBUG)

cap_queries = _split_capability_queries(args.caps)
if cap_queries:
capabilities = build_capabilities_from_queries(cap_queries)
if not capabilities:
print(
f"ERROR: --caps {cap_queries!r} did not parse into any valid capabilities "
f"(expected pipeline-id/model entries, e.g. 'byoc/text-reversal').",
file=sys.stderr,
)
sys.exit(1)
else:
capabilities = None

json_results: list[dict[str, Any]] = []

def _json_info(orch_url: str, info: Any) -> None:
Expand Down Expand Up @@ -384,6 +427,7 @@ def _json_error(orch_url: str | None, err: Exception) -> None:
signer_headers=signer_headers,
discovery_url=discovery,
discovery_headers=discovery_headers,
capabilities=capabilities,
)

for orch_url in orch_list:
Expand All @@ -392,6 +436,7 @@ def _json_error(orch_url: str | None, err: Exception) -> None:
orch_url,
signer_url=signer,
signer_headers=signer_headers,
capabilities=capabilities,
)
except LivepeerGatewayError as e:
print_error(orch_url, e)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "livepeer-gateway"
version = "0.1.0"
version = "0.1.1"
requires-python = ">=3.10"
dependencies = [
"grpcio>=1.65.0",
Expand Down
Loading