Commit eea2bd9c authored by Grant's avatar Grant
Browse files

add in asyncpg support

parent 1465a51a
Loading
Loading
Loading
Loading
+59 −0
Original line number Diff line number Diff line
import asyncpg
from common.env import check_environment as ce


class AsyncPostgresMixin:
    """
    Serve common connection method for postgres, but now with async capabilities.

    The default `search_path` variable can be set with the following
    operating system variable:
        - DATABASE_SEARCH_PATH
    """

    DEFAULT_DB = ce("DATABASE_DB_PG", ce("DATABASE_DB", ce("PG_DATABASE", "postgres")))
    DEFAULT_USER = ce("DATABASE_USER_PG", ce("DATABASE_USER", ce("PG_USER", "postgres")))
    DEFAULT_PW = ce("DATABASE_PW_PG", ce("DATABASE_PW", ce("PG_PASSWORD", "postgres")))
    DEFAULT_HOST = ce("DATABASE_HOST_PG", ce("DATABASE_HOST", ce("PG_HOST", "localhost")))
    DEFAULT_PORT = int(ce("DATABASE_PORT_PG", ce("DATABASE_PORT", ce("PG_PORT", 5432))))
    DEFAULT_TIMEOUT = ce("DATABASE_TIMEOUT", ce("PG_TIMEOUT", 60))
    DEFAULT_SCHEMA = ce("DATABASE_SCHEMA_PG", ce("DATABASE_SCHEMA", ce("PG_SCHEMA", "public")))
    DEFAULT_ENGINE = ce("DATABASE_ENGINE_PG", ce("DATABASE_ENGINE", "postgresql"))
    DEFAULT_SEARCH_PATH = ce("DATABASE_SEARCH_PATH", "public")

    async def open(self, search_path=DEFAULT_SEARCH_PATH):
        """
        Explicitly open the database connection asynchronously.

        :param search_path: the search path to default to
        :return: connection object if successful, else None
        """
        try:
            self.connection = await asyncpg.connect(
                database=self.DEFAULT_DB,
                user=self.DEFAULT_USER,
                password=self.DEFAULT_PW,
                host=self.DEFAULT_HOST,
                port=self.DEFAULT_PORT,
                timeout=self.DEFAULT_TIMEOUT,
            )
            await self.connection.set_search_path(search_path)
            return self.connection
        except Exception as error:
            self.logger.error(f"Failed to connect to database: {error}")
            return None

    async def query(self, query):
        """
        Query the database asynchronously.

        :param query: A valid SQL statement to send to the database.
        :return: Results of the query if any, otherwise None
        """
        if not self.connection or self.connection.is_closed():
            await self.open()
        try:
            return await self.connection.fetch(query)
        except Exception as error:
            self.logger.error(f"Error during query execution: {error}")
            return None
+2 −1
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ dependencies = [
]

[project.optional-dependencies]
postgres = ["psycopg2-binary==2.9.9"]
postgres = ["psycopg2-binary==2.9.9", "asyncpg"]
mssql = ["pymssql==2.2.11"]
influx = ["influxdb==5.3.1"]
auth = [
@@ -54,6 +54,7 @@ scrapers = [
]
all = [
    "psycopg2-binary==2.9.9",
    "asyncpg",
    "pymssql==2.2.11",
    "influxdb==5.3.1",
    "urllib3~=1.26.9",