feat(byoc): BYOC streaming and payments with examples#6
feat(byoc): BYOC streaming and payments with examples#6eliteprox wants to merge 12 commits intolivepeer:mainfrom
Conversation
Adds the Python-gateway counterpart of livepeer/go-livepeer#3869: - start_byoc_job / BYOCJobRequest / BYOCJob: orchestrator selection, payment preflight, V1 BYOC job signing via /sign-byoc-job, and /process/stream/start | payment | {id}/stop lifecycle. - BYOCPaymentSession: BYOC-typed /generate-live-payment with manifestID = capability_name, timeoutSeconds, per-call RequestID, and a recurring stream-payment sender. - BasePaymentSession: shared base for LV2V (PaymentSession) and BYOC, matching the remote signer split. PaymentSession is now a slim subclass. - capabilities: add CapabilityId.BYOC (37), capability_id_from_pipeline, build_capabilities_from_queries. - errors: add PaymentRequiredError (HTTP 402 retry on stream start). - orchestrator: add _join_signer_endpoint and resolve_transcoder_http_url. - examples: byoc_text_reversal, byoc_write_text, byoc_write_frames, and a --caps PIPELINE/MODEL flag on get_orchestrator_info. Stays on the existing urllib HTTP stack; no new runtime dependencies. OIDC, the urllib->httpx refactor, and unrelated media/trickle fixes are deferred to follow-up stacked PRs. Made-with: Cursor
When the orchestrator's trickle endpoint stalls, the segment-writer queue.put() times out after 5s. The publisher already handles this: it sets _active_segment_drain and rolls into the next wall-clock segment. The chained CancelledError -> TimeoutError -> TrickleSegment WriteError stack trace was logged at WARNING level via exc_info=True, which made the (non-fatal) drain look like a crash. - media_publish: drop the exc_info=True; the warning now includes the underlying error message inline. - trickle_publisher.SegmentWriter.close(): re-raise CancelledError cleanly (don't suppress) and log other close-time failures without a stack trace. No behavioral change to the publish/drain pipeline. Made-with: Cursor
…al example - Enhanced error handling for missing control URL in the text reversal job. - Updated the result retrieval logic to ensure proper handling of timeouts and cancellation of tasks. - Added a final cleanup step to ensure the job is stopped correctly, even if an error occurs during processing. This change improves the robustness of the text reversal example by ensuring that resources are managed correctly and errors are reported clearly.
…mple - Modified the media publishing configuration to include VideoOutputConfig for better handling of frame rates. - Ensured that the job starts with the correct media settings, improving the robustness of the example. This change enhances the functionality of the byoc_write_frames example by aligning it with the latest media publishing standards.
- Changed parameter name from `payment_type` to `type` in BYOCPaymentSession and BasePaymentSession classes for consistency. - Updated corresponding references in the remote_signer module. This change improves code clarity and aligns parameter naming conventions across the payment session classes.
- Updated the calculation of `time_base` to prevent division by zero by using `max(1, int(round(args.fps)))`. - This change ensures that the frame rate is always set to a valid minimum value, improving the stability of the example. This fix enhances the robustness of the byoc_write_frames example by safeguarding against invalid frame rate inputs.
- Added a new boolean parameter `use_tofu` to the BYOCPaymentSession, BasePaymentSession, and related functions to enhance payment session configuration. - Updated relevant methods to utilize the `use_tofu` parameter, ensuring consistent behavior across payment processing. This change improves flexibility in payment session management by allowing the option to enable or disable tofu usage.
j0sh
left a comment
There was a problem hiding this comment.
Partially reviewed for now
|
|
||
|
|
||
| class PaymentSession: | ||
| class PaymentSession(BasePaymentSession): |
There was a problem hiding this comment.
Curious, why does there needs to be a sub-class here? For reference, Scope uses this as-is and only calls send_payment on a timer.
There was a problem hiding this comment.
Good question. The subclass isn’t required for Scope specifically. The refactor was mainly to share the /generate-live-payment state/refresh logic between LV2V and BYOC while letting BYOC set type="byoc" to use the capability as manifestID, include BYOC timeout hints, and send recurring payments to /ai/stream/payment instead of the LV2V /payment endpoint.
I think the type field itself is necessary for the remote signer to generate the right payment flavor, since BYOC bills by capability/time instead of the normal LV2V manifest path. But we can avoid changing the public shape of PaymentSession if preferred. I can revise this to keep the existing PaymentSession unchanged and dedicated to LV2V, then create a BYOC-specific payment type BYOCPaymentSession.
There was a problem hiding this comment.
Refactored in 2e7f267 to use a new BYOCPaymentSession class instead of modifying PaymentSession.
I also refactored the behavior of byoc job signing in livepeer/go-livepeer@cacec0d to switch from using the manifestId to the Type parameter for byoc capability name. This aligns with lv2v and is more intuitive.
| # SystemExit are intentionally not caught so process-control exceptions | ||
| # still propagate. | ||
| except Exception as e: | ||
| log_fn = _LOG.debug if self._seq == 0 else _LOG.warning |
There was a problem hiding this comment.
Why all the seq ==0 special-casing around logs? Not sure I've seen those lines be noisy in practice.
There was a problem hiding this comment.
I was seeing frequent cancelled/timeout errors from trickle_publish and media_publish. these were introduced long ago when the payments loop was moved into the media publish routine.
The errors occur consistently on the first segment, but appear at different times during streaming:
livepeer_gateway.trickle_publisher.TrickleSegmentWriteError: Trickle segment writer timed out after 5.0s
Trickle segment close suppressed seq=0
Traceback (most recent call last):
File "/home/elite/.local/share/uv/python/cpython-3.12-linux-x86_64-gnu/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
return await fut
^^^^^^^^^
File "/home/elite/.local/share/uv/python/cpython-3.12-linux-x86_64-gnu/lib/python3.12/asyncio/queues.py", line 120, in put
await putter
asyncio.exceptions.CancelledError
| ) | ||
| p.add_argument( | ||
| "--caps", | ||
| nargs="*", |
There was a problem hiding this comment.
instead of making this consume the rest of the arguments, maybe make it comma delimited or allow the arguments to be repeated multiple times
There was a problem hiding this comment.
Updated examples/get_orchestrator_info.py in 0150670 so --caps no longer consumes the rest of the args.
It now supports:
--caps byoc/text-reversal
--caps live-video-to-video/noop,byoc/my-pipeline
--caps live-video-to-video/noop
…sion - Refactored BYOCPaymentSession to remove inheritance from BasePaymentSession, consolidating payment session logic directly within BYOCPaymentSession. - Introduced new methods for building payment payloads and refreshing orchestrator information. - Enhanced error handling and state management during payment requests. - Deleted the now-redundant BasePaymentSession class, streamlining the payment processing architecture. This change improves the clarity and maintainability of the payment session implementation by reducing complexity and focusing on the BYOC-specific logic.
…or_info - Updated the argument parsing for the `--caps` option to support both comma-delimited and repeated entries, improving flexibility in capability requests. - Introduced a new helper function `_split_capability_queries` to streamline the processing of capability inputs. - Enhanced help documentation for the `--caps` argument to clarify usage.
- Changed the payment type in BYOCPaymentSession from using the BYOC capability name to a fixed value of "byoc" for clarity and consistency. - Updated the return value in the `_get_payment_orch_info` function to include capabilities alongside legacy information, enhancing the payment orchestration process. This change improves the clarity of payment type usage and ensures that the orchestration information is more comprehensive.
- Introduced a new `_manifest_id` attribute in BYOCPaymentSession to store the job ID required for payment requests. - Updated the `_build_payment_payload` method to include the manifest ID in the payment payload and added error handling for missing job IDs. - Refined the timeout handling by renaming `timeoutSeconds` to `preloadSeconds` for clarity. These changes improve the robustness of payment processing by ensuring that the necessary job ID is provided and enhancing the clarity of timeout parameters.
…moke Replaces the synthetic-fixture/direct-runner smoke from 8beabc1 with a single E2E test that pushes a colored stream through the full BYOC stack (ffmpeg → mediamtx → gateway → orch → runner → orch → mediamtx → ffmpeg) and asserts non-empty bytes come back. Adds demo.sh: same path with webcam input + ffplay output, so users can visually verify the GrayscaleFilter works on their own video. Drops _fixtures/, _smoke_server.py, extra_hosts. Adds mediamtx service + LIVE_AI_PLAYBACK_HOST env on gateway. Comment marks both scripts for post-PR-#6 migration to start_byoc_job (the customer-flow SDK). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This pull request introduces new BYOC (Bring Your Own Container) support and expands the Python examples for the Livepeer Gateway SDK. It adds two new example scripts for BYOC text reversal and raw video frame publishing, enhances orchestrator discovery with capability filtering, and exposes BYOC-related classes and functions at the package level. Additionally, it introduces BYOC payment session logic and improves capability handling in the codebase.
BYOC Example Scripts:
examples/byoc_text_reversal.pyto demonstrate running a BYOC text-reversal job, sending text to be reversed, and handling responses and job control.examples/byoc_write_frames.pyto show how to start a BYOC stream job and publish raw video frames via the publish URL.Orchestrator Discovery and Capability Filtering:
examples/get_orchestrator_info.pyto accept a--capsargument, allowing users to request specific capabilities inpipeline/modelform (e.g.,byoc/text-reversal). This includes parsing, validation, and passing of capability filters to orchestrator discovery. [1] [2] [3] [4] [5] [6]build_capabilities_from_queriesandcapability_id_from_pipelineutility functions to parse and build capability filters from user queries.BYOC Payment and Capability Support:
BYOCPaymentSessionclass to handle BYOC-specific payment flows, including job signing and recurring stream payments.CapabilityIdenum and related mappings to include BYOC as a recognized capability. [1] [2]Public API and Packaging:
BYOCJob,BYOCJobRequest,BYOCPaymentSession,start_byoc_job) at the package level in__init__.py. [1] [2] [3] [4]pyproject.toml.These changes collectively enable users to run BYOC jobs, filter orchestrators by capability, and interact with BYOC payment flows using the Python SDK.