What happens?
When two db.sql(query).pl(lazy=True) LazyFrames are joined together, the joined LazyFrame is referenced twice in the same plan (a self-rejoin via a derived sub-LazyFrame), a window expression (e.g. rolling_sum, rolling_std, cum_prod with .over("g")) sits downstream, and the plan is collected via collect(engine="streaming"), the output silently drops ~80% of rows. The row count varies across runs of the same plan, on the same data, in the same process — i.e. the truncation is non-deterministic.
The same plan, evaluated against pl.scan_parquet instead of duckdb's bridge, returns the full and correct result. Removing any of these triggers individually — switching to pl.scan_parquet, dropping the self-rejoin, dropping the window expression, or collecting without the streaming engine — also restores correct output.
The minimal repro below produces:
A) pl.scan_parquet, in-memory engine
shape=(50597009, 3) y non-null=45,647,009
B) db.sql(...).pl(lazy=True), streaming engine (3 runs)
run 1: shape=(10301109, 3) y non-null=9,287,943 nn_match=False
run 2: shape=(10400465, 3) y non-null=9,377,597 nn_match=False
run 3: shape=(10302731, 3) y non-null=9,289,367 nn_match=False
Environment
- polars 1.40.1
- duckdb 1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce, with identical behaviour)
- pyarrow latest
- macOS arm64 (Darwin 25.3.0), Python 3.11
Notes
The bug also manifests in our real ETL pipeline at much larger scale (47k groups, ~313M source rows, 5 source joins), where it produced row counts varying from 54.4M / 10.1M / 440K across runs of the same plan, and dropped a cum_prod-based window column to 0 rows entirely. Replacing db.sql(...).pl(lazy=True) with a duckdb→parquet→pl.scan_parquet boundary fixes it deterministically.
We tried a workaround that opens a fresh duckdb connection per register_io_source generator invocation (so no relation object is shared across polars's repeated calls into the source). That fixes the minimal repro but not the larger production case, suggesting the shared-relation pattern is only one of multiple failure modes in this bridge under the streaming engine.
To Reproduce
Self-contained script below. Variable-length groups are required to trigger the bug at this scale — with uniform-length groups the same plan returns the correct row count.
uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \
python duckdb_polars_streaming_repro.py
"""Minimal reproducer for a silent, non-deterministic row drop in
duckdb-python's polars bridge.
When two ``db.sql(query).pl(lazy=True)`` LazyFrames are joined together,
the joined LazyFrame is referenced twice in the same plan (a self-rejoin),
a window expression sits downstream, and the plan is collected via
``collect(engine="streaming")``, the output drops ~80% of rows and the
row count varies across runs of the same plan.
Example output on the repro below (50,597,009 rows total):
A) pl.scan_parquet, in-memory engine
shape=(50597009, 3) y non-null=45,647,009
B) db.sql(...).pl(lazy=True), streaming engine (3 runs)
run 1: shape=(10301109, 3) y non-null=9,287,943 nn_match=False
run 2: shape=(10400465, 3) y non-null=9,377,597 nn_match=False
run 3: shape=(10302731, 3) y non-null=9,289,367 nn_match=False
Removing ANY of these — using ``pl.scan_parquet`` instead of duckdb's
bridge, dropping the self-rejoin, dropping the window expression, or
collecting without the streaming engine — restores correct output.
Tested with:
polars 1.40.1
duckdb 1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce)
pyarrow latest
Run:
uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \\
python duckdb_polars_streaming_repro.py
"""
from __future__ import annotations
import shutil
import tempfile
from pathlib import Path
import duckdb
import numpy as np
import polars as pl
print(f"polars {pl.__version__} duckdb {duckdb.__version__}")
# ---------------------------------------------------------------------------
# 1) Two parquet files with variable-length groups.
# `left`: (g, t, x) — group id, index, payload
# `right`: (g, t) — same key columns, no payload
# Variable per-group length matters: uniform-length groups don't reproduce
# at this scale.
# ---------------------------------------------------------------------------
N_GROUPS = 50_000
rng = np.random.default_rng(42)
group_lens = np.clip(
rng.lognormal(mean=6.8, sigma=0.5, size=N_GROUPS).astype(int), 30, 2900
)
N = int(group_lens.sum())
print(f"rows: {N:,} groups: {N_GROUPS:,}")
g = np.repeat(np.arange(N_GROUPS, dtype=np.int32), group_lens)
t = np.concatenate([np.arange(n, dtype=np.int32) for n in group_lens])
x = rng.uniform(-1.0, 1.0, N).astype(np.float32)
tmpdir = Path(tempfile.mkdtemp(prefix="repro_"))
left_path = tmpdir / "left.parquet"
right_path = tmpdir / "right.parquet"
pl.DataFrame({"g": g, "t": t, "x": x}).write_parquet(
left_path, row_group_size=200_000
)
pl.DataFrame({"g": g, "t": t}).write_parquet(
right_path, row_group_size=200_000
)
del g, t, x
# ---------------------------------------------------------------------------
# 2) The plan:
# - `left` and `right` are joined into `joined`.
# - `joined` is referenced TWICE: once directly, once via a derived
# LazyFrame that selects the group keys uniquely.
# - A rolling-sum window over `g` is applied at the end.
# ---------------------------------------------------------------------------
def build(left_lf: pl.LazyFrame, right_lf: pl.LazyFrame) -> pl.LazyFrame:
joined = left_lf.join(right_lf, on=["g", "t"], how="inner")
keys = joined.select("g").unique() # ← reference #1
plan = joined.join(keys, on="g") # ← reference #2
return plan.select(
"g",
"t",
pl.col("x").rolling_sum(window_size=100).over("g").alias("y"),
)
# ---------------------------------------------------------------------------
# 3) Reference: pure pl.scan_parquet, in-memory engine.
# ---------------------------------------------------------------------------
print("\nA) pl.scan_parquet, in-memory engine")
ref = build(pl.scan_parquet(left_path), pl.scan_parquet(right_path)).collect()
ref_shape = ref.shape
ref_nn = ref["y"].drop_nulls().drop_nans().len()
print(f" shape={ref_shape} y non-null={ref_nn:,}")
# ---------------------------------------------------------------------------
# 4) Bug path: db.sql(...).pl(lazy=True) + collect(engine="streaming").
# ---------------------------------------------------------------------------
print("\nB) db.sql(...).pl(lazy=True), streaming engine (3 runs)")
for i in range(3):
db_l = duckdb.connect(":memory:")
db_r = duckdb.connect(":memory:")
try:
left_lf = db_l.sql(
f"select * from read_parquet('{left_path}')"
).pl(lazy=True)
right_lf = db_r.sql(
f"select * from read_parquet('{right_path}')"
).pl(lazy=True)
out = build(left_lf, right_lf).collect(engine="streaming")
nn = out["y"].drop_nulls().drop_nans().len()
print(
f" run {i+1}: shape={out.shape} y non-null={nn:,} "
f"shape_match={out.shape == ref_shape} nn_match={nn == ref_nn}"
)
finally:
db_l.close()
db_r.close()
shutil.rmtree(tmpdir)
OS:
macOS arm64 (Darwin 25.3.0)
DuckDB Package Version:
1.5.1 / 1.5.2 / 1.6.0.dev12
Python Version:
3.11
Full Name:
Misha van Beek
Affiliation:
Bayesline
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
Did you include all relevant configuration to reproduce the issue?
What happens?
When two
db.sql(query).pl(lazy=True)LazyFrames are joined together, the joined LazyFrame is referenced twice in the same plan (a self-rejoin via a derived sub-LazyFrame), a window expression (e.g.rolling_sum,rolling_std,cum_prodwith.over("g")) sits downstream, and the plan is collected viacollect(engine="streaming"), the output silently drops ~80% of rows. The row count varies across runs of the same plan, on the same data, in the same process — i.e. the truncation is non-deterministic.The same plan, evaluated against
pl.scan_parquetinstead of duckdb's bridge, returns the full and correct result. Removing any of these triggers individually — switching topl.scan_parquet, dropping the self-rejoin, dropping the window expression, or collecting without the streaming engine — also restores correct output.The minimal repro below produces:
Environment
Notes
The bug also manifests in our real ETL pipeline at much larger scale (47k groups, ~313M source rows, 5 source joins), where it produced row counts varying from 54.4M / 10.1M / 440K across runs of the same plan, and dropped a
cum_prod-based window column to 0 rows entirely. Replacingdb.sql(...).pl(lazy=True)with a duckdb→parquet→pl.scan_parquetboundary fixes it deterministically.We tried a workaround that opens a fresh duckdb connection per
register_io_sourcegenerator invocation (so norelationobject is shared across polars's repeated calls into the source). That fixes the minimal repro but not the larger production case, suggesting the shared-relation pattern is only one of multiple failure modes in this bridge under the streaming engine.To Reproduce
Self-contained script below. Variable-length groups are required to trigger the bug at this scale — with uniform-length groups the same plan returns the correct row count.
OS:
macOS arm64 (Darwin 25.3.0)
DuckDB Package Version:
1.5.1 / 1.5.2 / 1.6.0.dev12
Python Version:
3.11
Full Name:
Misha van Beek
Affiliation:
Bayesline
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
Did you include all relevant configuration to reproduce the issue?