Unverified Commit 100612b3 authored by Marius van den Beek's avatar Marius van den Beek Committed by GitHub
Browse files

Merge pull request #20425 from jdavcs/25.0_trigger

[25.0] Update triggers
parents db865963 b24d8ae6
Loading
Loading
Loading
Loading
+77 −0
Original line number Diff line number Diff line
"""Update postgresql trigger to use clock_timestamp function

NOTE: This migration will not be applied on SQLIte.

Revision ID: a91ea1d97111
Revises: f070559879f1
Create Date: 2025-06-09 12:21:53.427419

"""

from alembic import op

from galaxy.model.migrations.util import _is_sqlite

# revision identifiers, used by Alembic.
revision = "a91ea1d97111"
down_revision = "f070559879f1"
branch_labels = None
depends_on = None


def upgrade():
    if not _is_sqlite():
        create_functions_and_triggers("clock_timestamp()")


def downgrade():
    if not _is_sqlite():
        create_functions_and_triggers("CURRENT_TIMESTAMP")


def create_functions_and_triggers(timestamp):
    version_info = op.get_bind().engine.dialect.server_version_info
    # For offline mode (version_info is None), we assume that version > 10
    if version_info and version_info[0] > 10 or not version_info:
        trigger_fn = statement_trigger_fn
    else:
        trigger_fn = row_trigger_fn

    for id_field in ["history_id", "id"]:
        function_name = f"fn_audit_history_by_{id_field}"
        stmt = trigger_fn(function_name, id_field, timestamp)
        op.execute(stmt)


def statement_trigger_fn(function_name, id_field, timestamp):
    return f"""
        CREATE OR REPLACE FUNCTION {function_name}()
            RETURNS TRIGGER
            LANGUAGE 'plpgsql'
        AS $BODY$
            BEGIN
                INSERT INTO history_audit (history_id, update_time)
                SELECT DISTINCT {id_field}, {timestamp} AT TIME ZONE 'UTC'
                FROM new_table
                WHERE {id_field} IS NOT NULL
                ON CONFLICT DO NOTHING;
                RETURN NULL;
            END;
        $BODY$
    """


def row_trigger_fn(function_name, id_field, timestamp):
    return f"""
        CREATE OR REPLACE FUNCTION {function_name}()
            RETURNS TRIGGER
            LANGUAGE 'plpgsql'
        AS $BODY$
            BEGIN
                INSERT INTO history_audit (history_id, update_time)
                VALUES (NEW.{id_field}, {timestamp} AT TIME ZONE 'UTC')
                ON CONFLICT DO NOTHING;
                RETURN NULL;
            END;
        $BODY$
    """
+2 −2
Original line number Diff line number Diff line
@@ -46,8 +46,8 @@ REVISION_TAGS = {
    "24.1": "04288b6a5b25",
    "release_24.2": "a4c3ef999ab5",
    "24.2": "a4c3ef999ab5",
    "release_25.0": "f070559879f1",
    "25.0": "f070559879f1",
    "release_25.0": "a91ea1d97111",
    "25.0": "a91ea1d97111",
}


+0 −160
Original line number Diff line number Diff line
"""
Database trigger installation and removal
"""

from galaxy.model.triggers.utils import execute_statements


def install_timestamp_triggers(engine):
    """
    Install update_time propagation triggers for history table
    """
    statements = get_timestamp_install_sql(engine.name)
    execute_statements(engine, statements)


def drop_timestamp_triggers(engine):
    """
    Remove update_time propagation triggers for history table
    """
    statements = get_timestamp_drop_sql(engine.name)
    execute_statements(engine, statements)


def get_timestamp_install_sql(variant):
    """
    Generate a list of SQL statements for installation of timestamp triggers
    """

    sql = get_timestamp_drop_sql(variant)

    if "postgres" in variant:
        # PostgreSQL has a separate function definition and a trigger
        # assignment. The first two statements the functions, and
        # the later assign those functions to triggers on tables

        fn_name = "update_history_update_time"
        sql.append(build_pg_timestamp_fn(fn_name, "history", source_key="history_id"))
        sql.append(build_pg_trigger("history_dataset_association", fn_name))
        sql.append(build_pg_trigger("history_dataset_collection_association", fn_name))

    else:
        # Other database variants are more granular. Requiring separate
        # statements for INSERT/UPDATE/DELETE, and the body of the trigger
        # is not necessarily reusable with a function

        for operation in ["INSERT", "UPDATE", "DELETE"]:
            # change hda -> update history
            sql.append(
                build_timestamp_trigger(operation, "history_dataset_association", "history", source_key="history_id")
            )

            # change hdca -> update history
            sql.append(
                build_timestamp_trigger(
                    operation, "history_dataset_collection_association", "history", source_key="history_id"
                )
            )

    return sql


def get_timestamp_drop_sql(variant):
    """
    Generate a list of statements to drop the timestamp update triggers
    """

    sql = []

    if "postgres" in variant:
        sql.append("DROP FUNCTION IF EXISTS update_history_update_time() CASCADE;")
    else:
        for operation in ["INSERT", "UPDATE", "DELETE"]:
            for when in ["BEFORE", "AFTER"]:
                sql.append(build_drop_trigger(operation, "history_dataset_association", when))
                sql.append(build_drop_trigger(operation, "history_dataset_collection_association", when))

    return sql


def build_pg_timestamp_fn(fn_name, target_table, source_key, target_key="id"):
    """Generates a PostgreSQL history update timestamp function"""

    return f"""
        CREATE OR REPLACE FUNCTION {fn_name}()
            RETURNS trigger
            LANGUAGE 'plpgsql'
        AS $BODY$
            BEGIN
                IF (TG_OP = 'DELETE') THEN
                    UPDATE {target_table}
                    SET update_time = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
                    WHERE {target_key} = OLD.{source_key};
                    RETURN OLD;
                ELSEIF (TG_OP = 'UPDATE') THEN
                    UPDATE {target_table}
                    SET update_time = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
                    WHERE {target_key} = NEW.{source_key} OR {target_key} = OLD.{source_key};
                    RETURN NEW;
                ELSIF (TG_OP = 'INSERT') THEN
                    UPDATE {target_table}
                    SET update_time = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
                    WHERE {target_key} = NEW.{source_key};
                    RETURN NEW;
                END IF;
            END;
        $BODY$;
    """


def build_pg_trigger(source_table, fn_name, when="AFTER"):
    """Assigns a PostgreSQL trigger to indicated table, calling user-defined function"""
    when_initial = when.lower()[0]
    trigger_name = f"trigger_{source_table}_{when_initial}iudr"
    return f"""
        CREATE TRIGGER {trigger_name}
            {when} INSERT OR DELETE OR UPDATE
            ON {source_table}
            FOR EACH ROW
            EXECUTE PROCEDURE {fn_name}();
    """


def build_timestamp_trigger(operation, source_table, target_table, source_key, target_key="id", when="AFTER"):
    """Creates a non-PostgreSQL update_time trigger"""

    trigger_name = get_trigger_name(operation, source_table, when)

    # three different update clauses depending on update/insert/delete
    clause = ""
    if operation == "DELETE":
        clause = f"{target_key} = OLD.{source_key}"
    elif operation == "UPDATE":
        clause = f"{target_key} = NEW.{source_key} OR {target_key} = OLD.{source_key}"
    else:
        clause = f"{target_key} = NEW.{source_key}"

    return f"""
        CREATE TRIGGER {trigger_name}
            {when} {operation}
            ON {source_table}
            FOR EACH ROW
            BEGIN
                UPDATE {target_table}
                SET update_time = CURRENT_TIMESTAMP
                WHERE {clause};
            END;
    """


def build_drop_trigger(operation, source_table, when="AFTER"):
    """Drops a non-PostgreSQL trigger by name"""
    trigger_name = get_trigger_name(operation, source_table, when)
    return f"DROP TRIGGER IF EXISTS {trigger_name}"


def get_trigger_name(operation, source_table, when):
    """Non-PostgreSQL trigger name"""
    op_initial = operation.lower()[0]
    when_initial = when.lower()[0]
    return f"trigger_{source_table}_{when_initial}{op_initial}r"
+11 −3
Original line number Diff line number Diff line
from galaxy.model.triggers.utils import execute_statements
from sqlalchemy import DDL

# function name prefix
fn_prefix = "fn_audit_history_by"
@@ -56,7 +56,7 @@ def _postgres_install(engine):
            AS $BODY$
                BEGIN
                    INSERT INTO history_audit (history_id, update_time)
                    SELECT DISTINCT {id_field}, CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
                    SELECT DISTINCT {id_field}, clock_timestamp() AT TIME ZONE 'UTC'
                    FROM new_table
                    WHERE {id_field} IS NOT NULL
                    ON CONFLICT DO NOTHING;
@@ -75,7 +75,7 @@ def _postgres_install(engine):
            AS $BODY$
                BEGIN
                    INSERT INTO history_audit (history_id, update_time)
                    VALUES (NEW.{id_field}, CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
                    VALUES (NEW.{id_field}, clock_timestamp() AT TIME ZONE 'UTC')
                    ON CONFLICT DO NOTHING;
                    RETURN NULL;
                END;
@@ -174,3 +174,11 @@ def get_trigger_name(label, operation, when, statement=False):
    when_initial = when.lower()[0]
    rs = "s" if statement else "r"
    return f"trigger_{label}_{when_initial}{op_initial}{rs}"


def execute_statements(engine, raw_sql):
    statements = raw_sql if isinstance(raw_sql, list) else [raw_sql]
    with engine.begin() as connection:
        for sql in statements:
            cmd = DDL(sql)
            connection.execute(cmd)
+0 −9
Original line number Diff line number Diff line
from sqlalchemy import DDL


def execute_statements(engine, raw_sql):
    statements = raw_sql if isinstance(raw_sql, list) else [raw_sql]
    with engine.begin() as connection:
        for sql in statements:
            cmd = DDL(sql)
            connection.execute(cmd)