Skip to content

Silent non-deterministic row drop in db.sql(query).pl(lazy=True) when the LazyFrame is self-rejoined and collected via engine="streaming" with a window expression #452

@mishavanbeek

Description

@mishavanbeek

What happens?

When two db.sql(query).pl(lazy=True) LazyFrames are joined together, the joined LazyFrame is referenced twice in the same plan (a self-rejoin via a derived sub-LazyFrame), a window expression (e.g. rolling_sum, rolling_std, cum_prod with .over("g")) sits downstream, and the plan is collected via collect(engine="streaming"), the output silently drops ~80% of rows. The row count varies across runs of the same plan, on the same data, in the same process — i.e. the truncation is non-deterministic.

The same plan, evaluated against pl.scan_parquet instead of duckdb's bridge, returns the full and correct result. Removing any of these triggers individually — switching to pl.scan_parquet, dropping the self-rejoin, dropping the window expression, or collecting without the streaming engine — also restores correct output.

The minimal repro below produces:

A) pl.scan_parquet, in-memory engine
   shape=(50597009, 3)  y non-null=45,647,009

B) db.sql(...).pl(lazy=True), streaming engine (3 runs)
   run 1: shape=(10301109, 3)  y non-null=9,287,943   nn_match=False
   run 2: shape=(10400465, 3)  y non-null=9,377,597   nn_match=False
   run 3: shape=(10302731, 3)  y non-null=9,289,367   nn_match=False

Environment

  • polars 1.40.1
  • duckdb 1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce, with identical behaviour)
  • pyarrow latest
  • macOS arm64 (Darwin 25.3.0), Python 3.11

Notes

The bug also manifests in our real ETL pipeline at much larger scale (47k groups, ~313M source rows, 5 source joins), where it produced row counts varying from 54.4M / 10.1M / 440K across runs of the same plan, and dropped a cum_prod-based window column to 0 rows entirely. Replacing db.sql(...).pl(lazy=True) with a duckdb→parquet→pl.scan_parquet boundary fixes it deterministically.

We tried a workaround that opens a fresh duckdb connection per register_io_source generator invocation (so no relation object is shared across polars's repeated calls into the source). That fixes the minimal repro but not the larger production case, suggesting the shared-relation pattern is only one of multiple failure modes in this bridge under the streaming engine.

To Reproduce

Self-contained script below. Variable-length groups are required to trigger the bug at this scale — with uniform-length groups the same plan returns the correct row count.

uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \
    python duckdb_polars_streaming_repro.py
"""Minimal reproducer for a silent, non-deterministic row drop in
duckdb-python's polars bridge.

When two ``db.sql(query).pl(lazy=True)`` LazyFrames are joined together,
the joined LazyFrame is referenced twice in the same plan (a self-rejoin),
a window expression sits downstream, and the plan is collected via
``collect(engine="streaming")``, the output drops ~80% of rows and the
row count varies across runs of the same plan.

Example output on the repro below (50,597,009 rows total):

    A) pl.scan_parquet, in-memory engine
       shape=(50597009, 3)  y non-null=45,647,009

    B) db.sql(...).pl(lazy=True), streaming engine (3 runs)
       run 1: shape=(10301109, 3)  y non-null=9,287,943   nn_match=False
       run 2: shape=(10400465, 3)  y non-null=9,377,597   nn_match=False
       run 3: shape=(10302731, 3)  y non-null=9,289,367   nn_match=False

Removing ANY of these — using ``pl.scan_parquet`` instead of duckdb's
bridge, dropping the self-rejoin, dropping the window expression, or
collecting without the streaming engine — restores correct output.

Tested with:
    polars  1.40.1
    duckdb  1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce)
    pyarrow latest

Run:
    uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \\
        python duckdb_polars_streaming_repro.py
"""
from __future__ import annotations

import shutil
import tempfile
from pathlib import Path

import duckdb
import numpy as np
import polars as pl

print(f"polars {pl.__version__}  duckdb {duckdb.__version__}")

# ---------------------------------------------------------------------------
# 1) Two parquet files with variable-length groups.
#    `left`:  (g, t, x)  — group id, index, payload
#    `right`: (g, t)     — same key columns, no payload
# Variable per-group length matters: uniform-length groups don't reproduce
# at this scale.
# ---------------------------------------------------------------------------
N_GROUPS = 50_000
rng = np.random.default_rng(42)
group_lens = np.clip(
    rng.lognormal(mean=6.8, sigma=0.5, size=N_GROUPS).astype(int), 30, 2900
)
N = int(group_lens.sum())
print(f"rows: {N:,}  groups: {N_GROUPS:,}")

g = np.repeat(np.arange(N_GROUPS, dtype=np.int32), group_lens)
t = np.concatenate([np.arange(n, dtype=np.int32) for n in group_lens])
x = rng.uniform(-1.0, 1.0, N).astype(np.float32)

tmpdir = Path(tempfile.mkdtemp(prefix="repro_"))
left_path = tmpdir / "left.parquet"
right_path = tmpdir / "right.parquet"
pl.DataFrame({"g": g, "t": t, "x": x}).write_parquet(
    left_path, row_group_size=200_000
)
pl.DataFrame({"g": g, "t": t}).write_parquet(
    right_path, row_group_size=200_000
)
del g, t, x


# ---------------------------------------------------------------------------
# 2) The plan:
#    - `left` and `right` are joined into `joined`.
#    - `joined` is referenced TWICE: once directly, once via a derived
#      LazyFrame that selects the group keys uniquely.
#    - A rolling-sum window over `g` is applied at the end.
# ---------------------------------------------------------------------------
def build(left_lf: pl.LazyFrame, right_lf: pl.LazyFrame) -> pl.LazyFrame:
    joined = left_lf.join(right_lf, on=["g", "t"], how="inner")
    keys = joined.select("g").unique()                # ← reference #1
    plan = joined.join(keys, on="g")                  # ← reference #2
    return plan.select(
        "g",
        "t",
        pl.col("x").rolling_sum(window_size=100).over("g").alias("y"),
    )


# ---------------------------------------------------------------------------
# 3) Reference: pure pl.scan_parquet, in-memory engine.
# ---------------------------------------------------------------------------
print("\nA) pl.scan_parquet, in-memory engine")
ref = build(pl.scan_parquet(left_path), pl.scan_parquet(right_path)).collect()
ref_shape = ref.shape
ref_nn = ref["y"].drop_nulls().drop_nans().len()
print(f"   shape={ref_shape}  y non-null={ref_nn:,}")

# ---------------------------------------------------------------------------
# 4) Bug path: db.sql(...).pl(lazy=True) + collect(engine="streaming").
# ---------------------------------------------------------------------------
print("\nB) db.sql(...).pl(lazy=True), streaming engine (3 runs)")
for i in range(3):
    db_l = duckdb.connect(":memory:")
    db_r = duckdb.connect(":memory:")
    try:
        left_lf = db_l.sql(
            f"select * from read_parquet('{left_path}')"
        ).pl(lazy=True)
        right_lf = db_r.sql(
            f"select * from read_parquet('{right_path}')"
        ).pl(lazy=True)
        out = build(left_lf, right_lf).collect(engine="streaming")
        nn = out["y"].drop_nulls().drop_nans().len()
        print(
            f"   run {i+1}: shape={out.shape}  y non-null={nn:,}  "
            f"shape_match={out.shape == ref_shape}  nn_match={nn == ref_nn}"
        )
    finally:
        db_l.close()
        db_r.close()

shutil.rmtree(tmpdir)

OS:

macOS arm64 (Darwin 25.3.0)

DuckDB Package Version:

1.5.1 / 1.5.2 / 1.6.0.dev12

Python Version:

3.11

Full Name:

Misha van Beek

Affiliation:

Bayesline

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have tested with a stable release

Did you include all relevant data sets for reproducing the issue?

Yes

Did you include all code required to reproduce the issue?

  • Yes, I have

Did you include all relevant configuration to reproduce the issue?

  • Yes, I have

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions