Skip to content

fix: batch accumulator doesn't flush when source iter raises #49

Merged
quettabit merged 2 commits into
mainfrom
qb/iss-43
May 11, 2026
Merged

fix: batch accumulator doesn't flush when source iter raises #49
quettabit merged 2 commits into
mainfrom
qb/iss-43

Conversation

@quettabit
Copy link
Copy Markdown
Member

closes #43

@quettabit
Copy link
Copy Markdown
Member Author

@greptileai review pls

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented May 11, 2026

Greptile Summary

This PR fixes a bug where partially-accumulated records were silently dropped when the source async iterator raised an exception before the batch was full. An except Exception block is inserted before the existing finally to flush any non-empty accumulator and re-raise, ensuring in-flight records reach the caller.

  • _batching.py: Adds except Exception around the main accumulation loop; flushes the accumulator (yield acc.take()) before re-raising, guarded by is_empty() to avoid a double-yield if the exception occurred after a normal yield acc.take().
  • tests/test_batching.py: Adds a parametrised test covering both zero-linger (synchronous anext path) and non-zero-linger (asyncio.create_task path), confirming the partial batch is surfaced and the original exception propagates.

Confidence Score: 5/5

Safe to merge — the fix is a minimal, well-targeted addition that closes a real data-loss scenario without touching unrelated paths.

The change inserts a single except Exception block that flushes in-flight records before re-raising; the is_empty() guard correctly prevents a double-yield after a normal flush, BaseException subclasses like CancelledError are intentionally excluded, and the finally cleanup runs as before. The new parametrised test covers both the synchronous anext path (zero linger) and the create_task/result() path (non-zero linger), giving direct confidence that both branches behave as intended.

No files require special attention.

Important Files Changed

Filename Overview
src/s2_sdk/_batching.py Adds except Exception flush-before-reraise block; logic is correct — is_empty() guard prevents double-yield and BaseException (e.g. CancelledError) is intentionally excluded.
tests/test_batching.py New parametrised test exercises both the zero-linger and non-zero-linger code paths, verifying flush-on-exception and exception propagation.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A([Start]) --> B[await next record]
    B --> C{record is None?}
    C -- yes --> D[break outer loop]
    C -- no --> E[acc.add record]
    E --> F{linger > 0?}
    F -- yes --> G[create_task anext\nawait with timeout]
    F -- no --> H[await anext directly]
    G --> I{task done\nin time?}
    I -- no --> J[pending_next = task\nbreak inner loop]
    I -- yes --> K[next_task.result]
    K -- raises --> L((Exception))
    H -- raises --> L
    K -- returns None --> M[break inner loop]
    H -- returns None --> M
    K -- returns record --> E
    H -- returns record --> E
    J --> N[yield acc.take]
    M --> N
    N --> B
    D --> O([Generator ends normally])
    L --> P{acc.is_empty?}
    P -- no --> Q[yield acc.take\nflush partial batch]
    P -- yes --> R[raise]
    Q --> R
    R --> S([Exception propagates to caller])
    N --> T{Exception thrown\ninto generator?}
    T -- yes --> L
    T -- no --> B
Loading

Reviews (3): Last reviewed commit: "cover both zero and non-zero linger in t..." | Re-trigger Greptile

Comment thread tests/test_batching.py
@quettabit
Copy link
Copy Markdown
Member Author

@greptileai addressed your comments. review again and update your score pls.

@quettabit quettabit marked this pull request as ready for review May 11, 2026 21:46
@quettabit quettabit requested a review from a team as a code owner May 11, 2026 21:46
@quettabit quettabit merged commit cd20814 into main May 11, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Detail Bug] Batching API drops already-received records when source iterator raises an exception

1 participant