Commit 7b5f924a authored by John Davis's avatar John Davis
Browse files

Do not reuse username generation logic in migration

parent b3ca95a2
Loading
Loading
Loading
Loading
+6 −14
Original line number Diff line number Diff line
@@ -888,17 +888,10 @@ def get_user_by_username(session, username: str, model_class=User):

def username_from_email(session, email, model_class=User):
    """Get next available username generated based on email"""
    engine = session.bind
    with engine.connect() as connection:
        return username_from_email_with_connection(connection, email, model_class)


def username_from_email_with_connection(connection, email, model_class=User):
    # This function is also called from database revision scripts, which do not provide a session.
    username = email.split("@", 1)[0].lower()
    username = filter_out_invalid_username_characters(username)
    if username_exists(connection, username, model_class):
        username = generate_next_available_username(connection, username, model_class)
    if username_exists(session, username, model_class):
        username = generate_next_available_username(session, username, model_class)
    return username


@@ -909,14 +902,13 @@ def filter_out_invalid_username_characters(username):
    return username


def username_exists(connection, username: str, model_class=User):
    stmt = select(model_class).filter(model_class.username == username).limit(1)
    return bool(connection.execute(stmt).first())
def username_exists(session, username: str, model_class=User):
    return bool(get_user_by_username(session, username, model_class))


def generate_next_available_username(connection, username, model_class=User):
def generate_next_available_username(session, username, model_class=User):
    """Generate unique username; user can change it later"""
    i = 1
    while connection.execute(select(model_class).where(model_class.username == f"{username}-{i}")).first():
    while session.execute(select(model_class).where(model_class.username == f"{username}-{i}")).first():
        i += 1
    return f"{username}-{i}"
+38 −2
Original line number Diff line number Diff line
@@ -6,6 +6,8 @@ Create Date: 2024-06-11 13:41:36.411803

"""

import string

from alembic import op
from sqlalchemy import (
    or_,
@@ -13,7 +15,6 @@ from sqlalchemy import (
    update,
)

from galaxy.managers.users import username_from_email_with_connection
from galaxy.model import User
from galaxy.model.migrations.util import (
    alter_column,
@@ -48,6 +49,41 @@ def _generate_missing_usernames():
    connection = op.get_bind()
    users = connection.execute(stmt).all()
    for id, email in users:
        new_username = username_from_email_with_connection(connection, email)
        new_username = username_from_email(connection, email)
        update_stmt = update(User).where(User.id == id).values(username=new_username)
        connection.execute(update_stmt)


# The code below is a near-duplicate of similar code in managers.users. The duplication is
# intentional: we want to preserve this logic in the migration script. The only differences are:
# (1) this code uses a Connection instead of a Session;
# (2) the username_exists function inlines the Select statement from managers.users::get_user_by_username.


def username_from_email(connection, email, model_class=User):
    # This function is also called from database revision scripts, which do not provide a session.
    username = email.split("@", 1)[0].lower()
    username = filter_out_invalid_username_characters(username)
    if username_exists(connection, username, model_class):
        username = generate_next_available_username(connection, username, model_class)
    return username


def filter_out_invalid_username_characters(username):
    """Replace invalid characters in username"""
    for char in [x for x in username if x not in f"{string.ascii_lowercase + string.digits}-."]:
        username = username.replace(char, "-")
    return username


def username_exists(connection, username: str, model_class=User):
    stmt = select(model_class).filter(model_class.username == username).limit(1)
    return bool(connection.execute(stmt).first())


def generate_next_available_username(connection, username, model_class=User):
    """Generate unique username; user can change it later"""
    i = 1
    while connection.execute(select(model_class).where(model_class.username == f"{username}-{i}")).first():
        i += 1
    return f"{username}-{i}"