feat: add ability to use existing connections and connection pools#54
Open
feat: add ability to use existing connections and connection pools#54
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for injecting existing database connections and connection pools into the TaskIQ Postgres brokers/result backends/schedule sources, while ensuring that externally-provided resources are not closed during component shutdown.
Changes:
- Added optional parameters for reusing externally-managed connections/pools across asyncpg, psycopg, psqlpy, and aiopg implementations, including ownership tracking to control shutdown behavior.
- Introduced new integration tests validating that shared pools/connections remain usable after shutdown.
- Updated tooling configuration/dependencies (dev dependency additions and build-system version bump).
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
uv.lock |
Locks the newly added dev dependency (ty) and its artifacts. |
pyproject.toml |
Adds ty to dev deps, bumps uv_build, and updates Ruff configuration. |
tests/integration/test_shared_pool.py |
New integration tests to ensure externally-provided pools/connections aren’t closed by shutdown. |
src/taskiq_pg/asyncpg/broker.py |
Adds support for injected write pool/read connection with ownership tracking. |
src/taskiq_pg/asyncpg/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/asyncpg/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/aiopg/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/aiopg/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psqlpy/broker.py |
Adds support for injected write pool/read connection with ownership tracking. |
src/taskiq_pg/psqlpy/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psqlpy/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psycopg/broker.py |
Adds support for injected write pool/read connection with ownership tracking. |
src/taskiq_pg/psycopg/result_backend.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/psycopg/schedule_source.py |
Adds optional external pool support + ownership-based shutdown. |
src/taskiq_pg/_internal/broker.py |
Minor refactor/typing-related updates for the shared broker base. |
src/taskiq_pg/_internal/result_backend.py |
Minor docstring formatting adjustments in the shared result backend base. |
src/taskiq_pg/_internal/schedule_source.py |
Minor typing/import cleanup for the shared schedule source base. |
Comments suppressed due to low confidence (1)
src/taskiq_pg/psqlpy/broker.py:202
shutdown()checksif self._listener is not None:, but_listeneris not initialized in the class or__init__. Ifstartup()fails before_listeneris assigned (orshutdown()is called without a successful startup), this will raiseAttributeError. Consider initializing_listenertoNone(and checking it) or usinggetattr(self, "_listener", None)here.
async def shutdown(self) -> None:
"""Close all connections on shutdown."""
await super().shutdown()
if self._read_conn is not None and self._owns_read_conn:
self._read_conn.close()
if self._write_pool is not None and self._owns_write_pool:
self._write_pool.close()
if self._listener is not None:
self._listener.abort_listen()
await self._listener.shutdown()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5ba71bc to
b3b5af2
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Connected to #41