Commit def88293 authored by John Davis's avatar John Davis
Browse files

Add verify_db_is_initialized function; tests

parent 1e75d85f
Loading
Loading
Loading
Loading
+52 −2
Original line number Diff line number Diff line
@@ -14,7 +14,10 @@ from alembic.script import ScriptDirectory
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from galaxy.model.database_utils import is_one_database
from galaxy.model.database_utils import (
    database_exists,
    is_one_database,
)
from galaxy.model.migrations import (
    AlembicManager,
    DatabaseConfig,
@@ -38,6 +41,51 @@ GXY_CONFIG_PREFIX = "GALAXY_CONFIG_"
TSI_CONFIG_PREFIX = "GALAXY_INSTALL_CONFIG_"


class DatabaseDoesNotExistError(Exception):
    def __init__(self, db_url: str) -> None:
        super().__init__(
            f"""The database at {db_url} does not exist. You must
            create and initialize the database before running this script. You
            can do so by (a) running `create_db.sh`; or by (b) starting Galaxy,
            in which case Galaxy will create and initialize the database
            automatically."""
        )


class DatabaseNotInitializedError(Exception):
    def __init__(self, db_url: str) -> None:
        super().__init__(
            f"""The database at {db_url} is empty. You must
            initialize the database before running this script. You can do so by
            (a) running `create_db.sh`; or by (b) starting Galaxy, in which case
            Galaxy will initialize the database automatically."""
        )


def verify_database_is_initialized(db_url: str) -> None:
    """
    Intended for use by scripts that run database migrations (manage_db.sh,
    run_alembic.sh). Those scripts are meant to run on a database that has been
    initialized with the appropriate metadata (e.g. galaxy or install model).

    This function will raise an error if the database does not exist or has not
    been initialized*.

    *NOTE: this function cannot determine whether a database has been properly
    initialized; it can only tell when a database has *not* been initialized.
    """
    if not database_exists(db_url):
        raise DatabaseDoesNotExistError(db_url)

    engine = create_engine(db_url)
    try:
        db_state = DatabaseStateCache(engine=engine)
        if db_state.is_database_empty() or db_state.contains_only_kombu_tables():
            raise DatabaseNotInitializedError(db_url)
    finally:
        engine.dispose()


def get_configuration(argv: List[str], cwd: str) -> Tuple[DatabaseConfig, DatabaseConfig, bool]:
    """
    Return a 3-item-tuple with configuration values used for managing databases.
@@ -125,7 +173,9 @@ class LegacyScripts:
    @property
    def database(self):
        if self._database is None:
            raise LegacyScriptsException("Attempt to access identifier of database before processing the script arguments")
            raise LegacyScriptsException(
                "Attempt to access identifier of database before processing the script arguments"
            )
        return self._database

    def run(self) -> None:
+15 −0
Original line number Diff line number Diff line
import random

import pytest

from galaxy.model.migrations.scripts import (
    DatabaseDoesNotExistError,
    DatabaseNotInitializedError,
    LegacyScripts,
    LegacyScriptsException,
    verify_database_is_initialized,
)


@@ -157,3 +162,13 @@ class TestLegacyScripts:
        argv = ["caller", "--alembic-config", "path-to-alembic", "upgrade"]
        with pytest.raises(LegacyScriptsException):
            LegacyScripts(argv).database

    def test_verify_database_is_init_raises_error_if_no_database(self):
        nonexistant_path = str(random.random())[2:]
        db_url = f"sqlite:////{nonexistant_path}"
        with pytest.raises(DatabaseDoesNotExistError):
            verify_database_is_initialized(db_url)

    def test_verify_database_is_init_raises_error_if_database_not_initialized(self, sqlite_memory_url):
        with pytest.raises(DatabaseNotInitializedError):
            verify_database_is_initialized(sqlite_memory_url)