Skip to content

feat: add ability to use existing connections and connection pools#54

Open
danfimov wants to merge 5 commits intomainfrom
add-db-pool-support
Open

feat: add ability to use existing connections and connection pools#54
danfimov wants to merge 5 commits intomainfrom
add-db-pool-support

Conversation

@danfimov
Copy link
Copy Markdown
Owner

@danfimov danfimov commented May 5, 2026

Connected to #41

@danfimov danfimov self-assigned this May 5, 2026
Copilot AI review requested due to automatic review settings May 5, 2026 19:30
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for injecting existing database connections and connection pools into the TaskIQ Postgres brokers/result backends/schedule sources, while ensuring that externally-provided resources are not closed during component shutdown.

Changes:

  • Added optional parameters for reusing externally-managed connections/pools across asyncpg, psycopg, psqlpy, and aiopg implementations, including ownership tracking to control shutdown behavior.
  • Introduced new integration tests validating that shared pools/connections remain usable after shutdown.
  • Updated tooling configuration/dependencies (dev dependency additions and build-system version bump).

Reviewed changes

Copilot reviewed 16 out of 17 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
uv.lock Locks the newly added dev dependency (ty) and its artifacts.
pyproject.toml Adds ty to dev deps, bumps uv_build, and updates Ruff configuration.
tests/integration/test_shared_pool.py New integration tests to ensure externally-provided pools/connections aren’t closed by shutdown.
src/taskiq_pg/asyncpg/broker.py Adds support for injected write pool/read connection with ownership tracking.
src/taskiq_pg/asyncpg/result_backend.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/asyncpg/schedule_source.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/aiopg/result_backend.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/aiopg/schedule_source.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/psqlpy/broker.py Adds support for injected write pool/read connection with ownership tracking.
src/taskiq_pg/psqlpy/result_backend.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/psqlpy/schedule_source.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/psycopg/broker.py Adds support for injected write pool/read connection with ownership tracking.
src/taskiq_pg/psycopg/result_backend.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/psycopg/schedule_source.py Adds optional external pool support + ownership-based shutdown.
src/taskiq_pg/_internal/broker.py Minor refactor/typing-related updates for the shared broker base.
src/taskiq_pg/_internal/result_backend.py Minor docstring formatting adjustments in the shared result backend base.
src/taskiq_pg/_internal/schedule_source.py Minor typing/import cleanup for the shared schedule source base.
Comments suppressed due to low confidence (1)

src/taskiq_pg/psqlpy/broker.py:202

  • shutdown() checks if self._listener is not None:, but _listener is not initialized in the class or __init__. If startup() fails before _listener is assigned (or shutdown() is called without a successful startup), this will raise AttributeError. Consider initializing _listener to None (and checking it) or using getattr(self, "_listener", None) here.
    async def shutdown(self) -> None:
        """Close all connections on shutdown."""
        await super().shutdown()
        if self._read_conn is not None and self._owns_read_conn:
            self._read_conn.close()
        if self._write_pool is not None and self._owns_write_pool:
            self._write_pool.close()
        if self._listener is not None:
            self._listener.abort_listen()
            await self._listener.shutdown()


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/taskiq_pg/psqlpy/schedule_source.py
Comment thread src/taskiq_pg/psqlpy/result_backend.py
Comment thread src/taskiq_pg/psqlpy/result_backend.py Outdated
Comment thread src/taskiq_pg/psqlpy/result_backend.py Outdated
Comment thread tests/integration/test_shared_pool.py
Comment thread tests/integration/test_shared_pool.py
Comment thread src/taskiq_pg/asyncpg/broker.py Outdated
Comment thread src/taskiq_pg/asyncpg/result_backend.py Outdated
Comment thread src/taskiq_pg/aiopg/result_backend.py Outdated
Comment thread src/taskiq_pg/psycopg/result_backend.py Outdated
Copilot AI review requested due to automatic review settings May 5, 2026 19:39
@danfimov danfimov force-pushed the add-db-pool-support branch from 5ba71bc to b3b5af2 Compare May 5, 2026 19:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/taskiq_pg/asyncpg/broker.py
Comment thread src/taskiq_pg/psqlpy/result_backend.py
Comment thread tests/integration/test_shared_pool.py
Comment thread src/taskiq_pg/psycopg/broker.py Outdated
Comment thread pyproject.toml
Comment thread tests/integration/test_shared_pool.py
Comment thread tests/integration/test_shared_pool.py
Comment thread tests/integration/test_shared_pool.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants