Skip to content

Add SkyPilot Kubernetes launcher backend for Tangle#223

Open
Michaelvll wants to merge 19 commits intoTangleML:masterfrom
Michaelvll:add-skypilot-launcher
Open

Add SkyPilot Kubernetes launcher backend for Tangle#223
Michaelvll wants to merge 19 commits intoTangleML:masterfrom
Michaelvll:add-skypilot-launcher

Conversation

@Michaelvll
Copy link
Copy Markdown

A new ContainerTaskLauncher that submits Tangle tasks as SkyPilot managed
jobs. SkyPilot then handles container scheduling, multi-cluster placement,
multi-node coordination, log mirroring, and cancellation.

Why

Adding SkyPilot as a launcher backend brings these capabilities to Tangle
pipelines without changing the ComponentSpec authoring model:

  • Multi-cluster / multi-cloud placement. One launcher instance can dispatch to any context in kubernetes.allowed_contexts; sky's optimizer picks per task based on accelerator/region. Lets a single Tangle pipeline fan out across clusters and clouds.
  • Multi-node without a cap: SkyPilot supports large-scale jobs with no upper limit for the number of nodes
  • High-performance networking: SkyPilot automatically set up the high-performance infiniband for the jobs without burdens for MLEs
  • Kueue integration (quota + priority). First-class integration with Kueue -- pipelines can target fair-share / preemption-aware queues directly.
  • Warm-pool reuse. Submitting to a SkyPilot Pool reuses pre-warmed pods instead of creating a fresh one per task, allowing batch inference jobs cutting cold-start from minutes to seconds.
  • Native S3 / R2 / Azure / HTTPS storage mounts. SkyPilot's Storage mounts any of these in addition to GCS — no custom storage-provider code needed in Tangle.
  • Unified logs and cancellation via sky.jobs.tail_logs and sky.jobs.cancel, mirrored back into the Tangle UI's /container_log.

Opt in:

pip install tangle[skypilot]

TANGLE_LAUNCHER=skypilot \
  SKYPILOT_API_SERVER_ENDPOINT=http://<sky-api-server> \
  DATABASE_URI=postgresql://... \
  python orchestrator_main.py

Existing installs are unaffected; skypilot is a new optional dependency group in pyproject.toml. Headless / orchestrator_main.py users can opt in with TANGLE_LAUNCHER=skypilot.

Examples

23 unit tests pass with sky stubbed (pytest tests/test_skypilot_launchers.py).

Live runs targeted Nebius Kubernetes contexts + GCP access with the SkyPilot API server . After starting start_local_skypilot.py, the runs below are one python examples/<name>.py away.

1. Multi-node PyTorch DDP

examples/multinode_pipeline_e2e.py. One ComponentSpec annotated with number_of_nodes: "2" + H100:1 runs as a 2-pod NCCL job; rank 0 saves a checkpoint to GCS. SUCCEEDED on 2× NVIDIA H100; loss converged from 3.46 →
0.026 over 5 epochs with identical all-reduced losses across ranks.

Walkthrough:

tangle-skypilot-multi-node-training.mp4

2. Inference across multiple clusters

examples/multicluster_inference_e2e.py. Four-task graph; one inference
task asks for H100, the other for H200. Sky's optimizer dispatches them to
different K8s contexts in the same pipeline. Same Qwen2.5-0.5B-Instruct
script runs on both, identical greedy completions, a final compare task
fans them together.

Walkthrough:

tangle-skypilot-cross-cluster-inference.mp4

Michaelvll and others added 19 commits April 26, 2026 19:20
Adds SkyPilotKubernetesLauncher, a new ContainerTaskLauncher implementation
that submits Tangle pipeline tasks as SkyPilot managed jobs. SkyPilot then
handles container scheduling, multi-cluster / multi-cloud placement,
multi-node coordination, preemption recovery, log streaming, and cancellation.

Capabilities exercised that the existing kubernetes_launchers cannot do:
- num_nodes > 16 (Tangle K8s launcher caps at 16)
- Cross-cloud spot with auto-recovery (Tangle has GKE-only spot, no recovery)
- Multi-cloud / multi-cluster placement (infra=None for any-cloud)
- Warm-pool reuse via SkyPilot Pool (no Tangle equivalent)
- s3://, https://, abfs:// file mounts (Tangle: HostPath + gcsfuse only)
- First-class priority_class for Kueue (no annotation API in Tangle today)

The new launcher is selected via TANGLE_LAUNCHER=skypilot env var; existing
Kubernetes deployments are unaffected. skypilot is added as an optional
dependency group ([skypilot]) so the install footprint is opt-in.

Resource annotations are kept identical to kubernetes_launchers (cpu, memory,
accelerators including JSON dict and SkyPilot string forms, ephemeral_storage,
multi_node/number_of_nodes) so the same ComponentSpec runs against either
backend. Multi-node dynamic-data inputs are bridged to bash env vars set
from SKYPILOT_NUM_NODES / SKYPILOT_NODE_RANK / SKYPILOT_NODE_IPS.

20 tests pass against real cloud_pipelines_backend imports (sky.jobs SDK
stubbed to keep tests offline). End-to-end pipeline dry-run example at
examples/skypilot_launcher_dryrun.py exercises the full launch -> refresh ->
log -> persist -> terminate lifecycle on a deliberately Tangle-impossible
component spec (32 nodes, JSON-format accelerators, spot, Pool, S3 + GCS
mixed file_mounts).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The launcher relies on sky.jobs.queue records returning start_at / end_at
fields and on Resources(priority_class=...) which are stable in 0.12.1+.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reverts incidental whitespace/comment cleanup that crept into the previous
commit. Keeps the launcher-selection change (the new _build_launcher()
function and its single-line invocation in main()) and nothing else.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
E2E testing surfaced two cases where the launcher passed local-filesystem
paths (Tangle's LocalStorageProvider) directly to sky.Task.file_mounts,
which then fails sky's "must exist locally" validation with a generic error.

Fix:
  * Inputs: raise LauncherError with an actionable message pointing the
    user at configuring a cloud StorageProvider, instead of letting sky's
    validation fire from inside file_mounts.
  * Outputs: log a warning and skip the mount entirely when the URI is
    a relative local path. The container can still write to /tmp/outputs/
    inside the pod; the writes won't be persisted to Tangle's local
    storage, but the job still runs and emits logs through sky.jobs.

Tested against a kind cluster with sky.jobs.launch dispatching a Hello
World pipeline — full lifecycle (submit -> RUNNING -> SUCCEEDED) verified
end-to-end. See examples/skypilot_launcher_dryrun.py for the offline
counterpart.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new tests covering the behavior introduced by the previous fix commit
(58b4983), surfaced during end-to-end testing on a real Kubernetes cluster
with Tangle's LocalStorageProvider:

  * test_input_local_uri_raises_actionable_error — non-cloud input URI
    raises LauncherError with a message pointing at cloud StorageProvider
    config, instead of letting sky.Task validation fire a generic error.

  * test_output_local_uri_skipped_no_mount — non-cloud output URI is
    skipped with a warning. The container still runs (writes to its own
    /tmp/outputs/), the launcher just doesn't sync writes back to Tangle's
    local storage.

22/22 tests pass against real cloud_pipelines_backend imports.

End-to-end runs verified on kind cluster:
  Hello (smoke):       SUCCEEDED
  Failing (exit 7):    FAILED
  Annotated:           SUCCEEDED   (priority_class, JSON accelerators)
  Cancellation:        CANCELLED   (sky.jobs.cancel triggered, status flowed back)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
End-to-end testing on a kind cluster surfaced an architectural mismatch
between Tangle's LocalStorageProvider and SkyPilot's file_mounts model:
Tangle's local provider uses HostPath volumes (bidirectional), but
file_mounts is one-way upload + cloud-bucket mount, so a SkyPilot pod
cannot write back to the orchestrator's local filesystem.

Practical effect: single-component pipelines run fine on any provider
(stdout is captured by SkyPilot regardless), but multi-component graph
pipelines need a cloud StorageProvider so output->input artifact URIs
are gs://, s3://, abfs:// etc. that SkyPilot can mount on both sides.

Adds:
  * Module docstring section explicitly documenting the requirement and
    showing the GoogleCloudStorageProvider configuration recipe.
  * test_multistep_with_cloud_uris_passes_through — verifies the launcher
    correctly mounts both upstream output and downstream input as cloud
    URIs through SkyPilot file_mounts.

23/23 tests pass.

Verified end-to-end on kind cluster with single-step pipelines:
  Hello (smoke):     SUCCEEDED
  Failing (exit 7):  FAILED   (status flowed back via SkyPilotLaunchedJob)
  Annotated:         SUCCEEDED  (priority_class, JSON accelerators)
  Cancellation:      CANCELLED  (sky.jobs.cancel triggered)

Multi-step on LocalStorageProvider fails as expected (step 1 SUCCEEDED in
sky but the local output URI is skipped, so step 2 has no upstream input).
Live verification of multi-step with GCS pending bucket access.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Surfaced while running Tangle's two-step pipeline against a real GKE
cluster with the API server deployed via the SkyPilot helm chart in
consolidation mode:

1. sky.jobs.launch return shape: nightly returns
   tuple[Optional[int], Handle], older versions tuple[List[int], Handle].
   Handle both shapes when extracting the job id.

2. Cloud-URI file_mounts under consolidation mode: constructing
   sky.Task() directly with cloud-URI file_mounts goes through
   translate_local_file_mounts_to_two_hop, which rejects them with
   NotSupportedError when no managed-jobs bucket is configured. Build a
   YAML-shaped dict and use sky.Task.from_yaml_config(); the YAML parser
   auto-promotes cloud-URI entries to sky.Storage MOUNT mounts, which is
   the path that works in consolidation mode.

3. SkyPilot MOUNT mode rejects sub-paths: gcsfuse mounts a bucket root,
   not an individual object. Mount each unique bucket once at
   /mnt/skypilot/<scheme>/<bucket> and reference sub-paths within. This
   also dedupes when several inputs/outputs live in the same bucket.

4. sky.jobs.queue return shape and filtering: nightly returns
   tuple[list[dict], ...] and ignores job_ids=. Unwrap the tuple and
   filter records by job_id locally.

Tests updated to mirror the new internal shape.
Verified end-to-end: jobs 11 (generate) and 12 (shout) SUCCEEDED on
GKE with output flowing through gs://tangle-skypilot-test-zhwu.
A live test that exercises the launcher's multi-node path:
  - One Tangle ComponentSpec, TaskSpec annotated with
    tangleml.com/launchers/kubernetes/multi_node/number_of_nodes: 2.
  - The launcher submits as a 2-pod SkyPilot managed job; the
    TANGLE_MULTI_NODE_* env-var prelude bridges SkyPilot's runtime
    values (SKYPILOT_NODE_RANK / NODE_IPS / NUM_NODES) into the
    container.
  - Rank 1 opens a TCP connection to rank 0 (using the peer-0 address
    from the prelude) to prove peer addressing is reachable, not just
    exported.

Verified on GKE in consolidation mode (sky job 16, 17s job duration):
both pods came up, rank 1 connected to rank 0:12321, and rank 0 wrote
the output report to GCS.

Annotations live on TaskSpec (not ComponentSpec.metadata) because
orchestrator_sql.py forwards task_spec.annotations only.
… show them

Without this, Tangle's /api/executions/{id}/container_log endpoint
reads from container_execution.log_uri (a cloud-storage URI written
by upload_log()) and gets nothing, because the previous upload_log()
was a no-op. The UI's "Logs" tab is therefore empty for SkyPilot-
launched containers — even though `sky jobs logs <id>` works fine
out-of-band.

Two pieces:

1. Wire a `storage_provider` arg through the launcher and
   SkyPilotLaunchedJob (mirrors how kubernetes_launchers takes one).
   Defaults to None — without it, upload_log() stays a no-op (sky
   logs still work via the CLI/SDK; only the UI mirror is skipped).

2. get_log() retries with exponential backoff (6 attempts, 2-8s
   spacing) when sky.jobs.tail_logs returns just the
   "Job N is already in terminal state ..." hint. Sky returns this
   message when called immediately after job termination, before
   the controller has finalized the user-job log file. Once the file
   is visible (~5-30s later), retry succeeds and the actual
   stdout/stderr is uploaded to log_uri.

Verified end-to-end (sky job 21, multinode test): the Tangle UI's
container_log endpoint now returns the full sky log — both ranks'
output, peer IPs, and the rank-1 -> rank-0 socket exchange — instead
of a one-line terminal-state hint.
…ot-"

Encodes both the orchestrator (Tangle) and the launcher (SkyPilot) in
the managed-job name. Surfaces in:
  - SkyPilot dashboard / `sky jobs queue` (NAME column)
  - Tangle UI's container_state debug pane (`skypilot.job_name`)

Makes it obvious at a glance which launcher ran a given task without
having to inspect debug_info keys.
…-skypilot-"

Keeping launcher code minimal — naming surfaces are demonstrated via
the example pipeline name itself, not by changing the default prefix.

This reverts commit e094684e87123ed5c3c5d4cd02c0ab9a3afd2128.
…ot-'

Makes 'skypilot' visible in the Tangle UI's task name and the SkyPilot
dashboard's job name without touching launcher defaults.
…lot-' too

Surfaces 'skypilot' in the Tangle Pipelines list (run names) in
addition to the inner task name. Makes which launcher ran the
pipeline obvious from the index page without drilling into the
run detail.
…angle library

Run once after starting Tangle to make 'SkyPilot: GPU Sanity Check' and
'SkyPilot: Multi-node Peer Check' visible in the UI's component picker
when building a pipeline. The 'SkyPilot:' name prefix surfaces the
launcher in the component browser without changing any Tangle defaults.
…orch DDP

Both the example pipeline and the published-component swap the
synthetic socket roundtrip for a real multi-node PyTorch
DistributedDataParallel training run on a small MLP with synthetic
regression data.

Two pods × 1 H100 each (annotation: H100:1, num_nodes=2). NCCL
backend; gradients all-reduced across ranks every step. Drops the
storage_mount/output declarations so worker pods don't need
storage-side credentials — the launcher's log_uri pipeline already
captures stdout and serves it via Tangle's /container_log endpoint.

Verified on CoreWeave's sky-dev cluster (sky job 29):
  loss 3.46 -> 0.08 -> 0.057 -> 0.025 -> 0.026 over 5 epochs
  total ~14s after image pull
  device=cuda gpu="NVIDIA H100 80GB HBM3" on both ranks, identical
    all-reduced losses (DDP sync confirmed)

Falls back to gloo (CPU) automatically when no GPU is present, so the
same component runs against either accelerator profile.
… to GCS

Restores the checkpoint + training_log outputs on the multinode example
now that worker pods have GCS auth via SkyPilot's gcpCredentials helm
option (mounts a GCP service-account key + GOOGLE_APPLICATION_CREDENTIALS
into every job pod). gcsfuse on workers can now write to gs:// paths.

Verified on CoreWeave sky-dev (sky job 30):
  - 2x[H100:1], NCCL, loss 3.46 -> 0.026 over 5 epochs, ~18s
  - rank 0 wrote 1.51 MiB checkpoint + training log to
    gs://tangle-skypilot-test-zhwu/artifacts/by_execution/<exec_id>/outputs/

The published "SkyPilot: Multi-node PyTorch DDP" component spec also
declares the outputs and writes them, so any pipeline built around it
in the Tangle UI persists artifacts the same way.
Pipeline graph:
  prepare (CPU) -> infer_h100 (H100:1) + infer_h200 (H200:1) -> compare (CPU)

The two inference tasks declare different accelerator constraints, so
SkyPilot's optimizer dispatches them to *different* K8s contexts in the
same allowed_contexts list — proving cross-cluster placement under one
Tangle pipeline. Both run an identical gpt2 generation script over the
same 3 prompts; the final task fans both result JSON files together
into a side-by-side report.

Verified on CoreWeave (allowed_contexts=[sky-dev, h200cluster],
launcher infra=None so the optimizer picks per task):
  - sky job 32 (infer_h100)    -> NVIDIA H100 80GB HBM3 on sky-dev
  - sky job 33 (infer_h200)    -> NVIDIA H200          on h200cluster
  - identical gpt2 completions on both, ~10-15% latency advantage on H200
  - artifact handoff via gs://tangle-skypilot-test-zhwu (storage_mounts
    work on both clusters via the GCP SA key mounted by the helm chart's
    gcpCredentials option)

Tangle's stock kubernetes_launchers can only target a single api_client
context at a time, so this graph is not expressible there without
running two separate orchestrators.
…ebius-h100 and switch model to Qwen2.5-0.5B-Instruct

The pipeline now reads as a multi-cloud demo: each inference task name
encodes the target cloud + accelerator (gke-l4, nebius-h100) so the
placement is obvious in the Tangle UI and SkyPilot dashboard. The
underlying GPU asks (H100, H200) keep the example runnable in the
current allowed_contexts (sky-dev + h200cluster); comments mark the
spots to swap to L4:1 / H100:1 once a real GKE-L4 and Nebius-H100
context are added.

Switched the model from gpt2 to Qwen/Qwen2.5-0.5B-Instruct — same small
footprint (~1GB), but uses the chat template so completions read as
real assistant responses instead of the gpt2 loop-y output. Output
schema also includes the model id per result.

Verified on CoreWeave (sky jobs 41-43, run 019ddade7817bada7422):
  prompt: 'The capital of France is'
    gke-l4       (NVIDIA H100, 436ms): 'Paris.'
    nebius-h100  (NVIDIA H200, 401ms): 'Paris.'
  prompt: 'A haiku about distributed computing:'
    both: 'In parallel threads of thought,\\nTogether we solve problems
           vast and small,\\nEfficiency in every task achieved.'

Same prompt, identical greedy completions on both clusters; ~7-15%
latency advantage on H200. Cross-cluster placement is sky's optimizer
working off the per-task accelerator constraint, not any per-task
infra pinning in the launcher.
…lot launcher + UI

Mirrors upstream start_local.py but swaps DockerContainerLauncher for
SkyPilotKubernetesLauncher and uses GoogleCloudStorageProvider when
TANGLE_STORAGE_BUCKET is set (LocalStorageProvider otherwise). Serves
the same Tangle frontend on /, so users get the full UI + the
SkyPilot launcher in one entry point.

Run:
  TANGLE_STORAGE_BUCKET=gs://my-bucket python -m uvicorn \
    start_local_skypilot:app --host 0.0.0.0 --port 9091
@Michaelvll Michaelvll requested a review from Ark-kun as a code owner May 4, 2026 16:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant