Commit f2ca9905 authored by Linus Heckemann's avatar Linus Heckemann
Browse files

nixos/systemd-boot-builder: use pathlib.Path where possible

`switch-to-configuration boot` was taking suspiciously long on a machine
of mine where the boot partition is on a slow SD card. Some tracing led
me to discover that it was in fact deleting all the kernels and initrds
every time, only to rewrite them.

This turned out to be because of the naive (non-path-normalising) string
concatenation used to construct paths in `known_paths`, so all the files
were recognised as obsolete and deleted:


known_paths=['/EFI/nixos/5jz3m9df1cbxn4hzjjs3aaz8lb9vvimc-linux-6.15.7-Image.efi', '/EFI/nixos/xri8qzfvzclf89x7nfwgq248miw7jbp0-initrd-linux-6.15.7-initrd.efi', '/EFI/nixos/b18llskzrcdgw2nbib58qqcaabiik6yc-linux-6.16-Image.efi', '/EFI/nixos/mdj53j746bii1vw227dfhkyd8ajwab2w-initrd-linux-6.16-initrd.efi', '/EFI/nixos/b18llskzrcdgw2nbib58qqcaabiik6yc-linux-6.16-Image.efi', '/EFI/nixos/mdj53j746bii1vw227dfhkyd8ajwab2w-initrd-linux-6.16-initrd.efi', '/EFI/nixos/b18llskzrcdgw2nbib58qqcaabiik6yc-linux-6.16-Image.efi', '/EFI/nixos/mdj53j746bii1vw227dfhkyd8ajwab2w-initrd-linux-6.16-initrd.efi', '/EFI/nixos/5jz3m9df1cbxn4hzjjs3aaz8lb9vvimc-linux-6.15.7-Image.efi', '/EFI/nixos/1ihk03c1i5518hlgm5mnhrig2hy3hq24-initrd-linux-6.15.7-initrd.efi', '/EFI/nixos/5jz3m9df1cbxn4hzjjs3aaz8lb9vvimc-linux-6.15.7-Image.efi', '/EFI/nixos/1ihk03c1i5518hlgm5mnhrig2hy3hq24-initrd-linux-6.15.7-initrd.efi', '/EFI/nixos/5jz3m9df1cbxn4hzjjs3aaz8lb9vvimc-linux-6.15.7-Image.efi', '/EFI/nixos/1ihk03c1i5518hlgm5mnhrig2hy3hq24-initrd-linux-6.15.7-initrd.efi']
path='/boot//EFI/nixos/5jz3m9df1cbxn4hzjjs3aaz8lb9vvimc-linux-6.15.7-Image.efi'
path='/boot//EFI/nixos/xri8qzfvzclf89x7nfwgq248miw7jbp0-initrd-linux-6.15.7-initrd.efi'
path='/boot//EFI/nixos/b18llskzrcdgw2nbib58qqcaabiik6yc-linux-6.16-Image.efi'
path='/boot//EFI/nixos/mdj53j746bii1vw227dfhkyd8ajwab2w-initrd-linux-6.16-initrd.efi'
path='/boot//EFI/nixos/1ihk03c1i5518hlgm5mnhrig2hy3hq24-initrd-linux-6.15.7-initrd.efi'


This can be avoided by using pathlib.Path, which normalises paths and
generally provides a more consistent and convenient API. I therefore
went ahead and replaced all use of `str` for path handling with `Path`
in the builder. This may fix some other, similar bugs, as well, but I
haven't checked in detail.
parent fbcf476f
Loading
Loading
Loading
Loading
+89 −79
Original line number Diff line number Diff line
@@ -3,23 +3,22 @@ import argparse
import ctypes
import datetime
import errno
import glob
import os
import os.path
import re
import shutil
import subprocess
import sys
import warnings
import json
from typing import NamedTuple, Any
from typing import NamedTuple, Any, Sequence
from dataclasses import dataclass
from pathlib import Path

# These values will be replaced with actual values during the package build
EFI_SYS_MOUNT_POINT = "@efiSysMountPoint@"
BOOT_MOUNT_POINT = "@bootMountPoint@"
LOADER_CONF = f"{EFI_SYS_MOUNT_POINT}/loader/loader.conf"  # Always stored on the ESP
NIXOS_DIR = "@nixosDir@"
EFI_SYS_MOUNT_POINT = Path("@efiSysMountPoint@")
BOOT_MOUNT_POINT = Path("@bootMountPoint@")
LOADER_CONF = EFI_SYS_MOUNT_POINT / "loader/loader.conf"  # Always stored on the ESP
NIXOS_DIR = Path("@nixosDir@".strip("/")) # Path relative to the XBOOTLDR or ESP mount point
TIMEOUT = "@timeout@"
EDITOR = "@editor@" == "1" # noqa: PLR0133
CONSOLE_MODE = "@consoleMode@"
@@ -37,16 +36,16 @@ STORE_DIR = "@storeDir@"

@dataclass
class BootSpec:
    init: str
    initrd: str
    kernel: str
    init: Path
    initrd: Path
    kernel: Path
    kernelParams: list[str]  # noqa: N815
    label: str
    system: str
    toplevel: str
    toplevel: Path
    specialisations: dict[str, "BootSpec"]
    sortKey: str  # noqa: N815
    devicetree: str | None = None  # noqa: N815
    devicetree: Path | None = None  # noqa: N815
    initrdSecrets: str | None = None  # noqa: N815


@@ -54,7 +53,7 @@ libc = ctypes.CDLL("libc.so.6")

FILE = None | int

def run(cmd: list[str], stdout: FILE = None) -> subprocess.CompletedProcess[str]:
def run(cmd: Sequence[str | Path], stdout: FILE = None) -> subprocess.CompletedProcess[str]:
    return subprocess.run(cmd, check=True, text=True, stdout=stdout)

class SystemIdentifier(NamedTuple):
@@ -63,21 +62,21 @@ class SystemIdentifier(NamedTuple):
    specialisation: str | None


def copy_if_not_exists(source: str, dest: str) -> None:
    if not os.path.exists(dest):
def copy_if_not_exists(source: Path, dest: Path) -> None:
    if not dest.exists():
        shutil.copyfile(source, dest)


def generation_dir(profile: str | None, generation: int) -> str:
def generation_dir(profile: str | None, generation: int) -> Path:
    if profile:
        return "/nix/var/nix/profiles/system-profiles/%s-%d-link" % (profile, generation)
        return Path(f"/nix/var/nix/profiles/system-profiles/{profile}-{generation}-link")
    else:
        return "/nix/var/nix/profiles/system-%d-link" % (generation)
        return Path(f"/nix/var/nix/profiles/system-{generation}-link")

def system_dir(profile: str | None, generation: int, specialisation: str | None) -> str:
def system_dir(profile: str | None, generation: int, specialisation: str | None) -> Path:
    d = generation_dir(profile, generation)
    if specialisation:
        return os.path.join(d, "specialisation", specialisation)
        return d / "specialisation" / specialisation
    else:
        return d

@@ -101,7 +100,8 @@ def generation_conf_filename(profile: str | None, generation: int, specialisatio


def write_loader_conf(profile: str | None, generation: int, specialisation: str | None) -> None:
    with open(f"{LOADER_CONF}.tmp", 'w') as f:
    tmp = LOADER_CONF.with_suffix(".tmp")
    with tmp.open('x') as f:
        f.write(f"timeout {TIMEOUT}\n")
        f.write("default %s\n" % generation_conf_filename(profile, generation, specialisation))
        if not EDITOR:
@@ -111,17 +111,17 @@ def write_loader_conf(profile: str | None, generation: int, specialisation: str
        f.write(f"console-mode {CONSOLE_MODE}\n")
        f.flush()
        os.fsync(f.fileno())
    os.rename(f"{LOADER_CONF}.tmp", LOADER_CONF)
    os.rename(tmp, LOADER_CONF)


def get_bootspec(profile: str | None, generation: int) -> BootSpec:
    system_directory = system_dir(profile, generation, None)
    boot_json_path = os.path.join(system_directory, "boot.json")
    if os.path.isfile(boot_json_path):
        with open(boot_json_path, 'r') as boot_json_f:
    boot_json_path = (system_directory / "boot.json").resolve()
    if boot_json_path.is_file():
        with boot_json_path.open("r") as f:
            # check if json is well-formed, else throw error with filepath
            try:
                bootspec_json = json.load(boot_json_f)
                bootspec_json = json.load(f)
            except ValueError as e:
                print(f"error: Malformed Json: {e}, in {boot_json_path}", file=sys.stderr)
                sys.exit(1)
@@ -145,21 +145,32 @@ def bootspec_from_json(bootspec_json: dict[str, Any]) -> BootSpec:
    systemdBootExtension = bootspec_json.get('org.nixos.systemd-boot', {})
    sortKey = systemdBootExtension.get('sortKey', 'nixos')
    devicetree = systemdBootExtension.get('devicetree')

    if devicetree:
        devicetree = Path(devicetree)

    main_json = bootspec_json['org.nixos.bootspec.v1']
    for attr in ("kernel", "initrd", "toplevel"):
        if attr in main_json:
            main_json[attr] = Path(main_json[attr])
    return BootSpec(
        **bootspec_json['org.nixos.bootspec.v1'],
        **main_json,
        specialisations=specialisations,
        sortKey=sortKey,
        devicetree=devicetree,
    )


def copy_from_file(file: str, dry_run: bool = False) -> str:
    store_file_path = os.path.realpath(file)
    suffix = os.path.basename(store_file_path)
    store_subdir = os.path.relpath(store_file_path, start=STORE_DIR).split(os.path.sep)[0]
    efi_file_path = f"{NIXOS_DIR}/{suffix}.efi" if suffix == store_subdir else f"{NIXOS_DIR}/{store_subdir}-{suffix}.efi"
def copy_from_file(file: Path, dry_run: bool = False) -> Path:
    """
    Copy a file to the boot filesystem (XBOOTLDR if in use, otherwise ESP), basing the destination filename on the store path that's being copied from. Return the destination path, relative to the boot filesystem mountpoint.
    """
    store_file_path = file.resolve()
    suffix = store_file_path.name
    store_subdir = store_file_path.relative_to(STORE_DIR).parts[0]
    efi_file_path = NIXOS_DIR / (f"{suffix}.efi" if suffix == store_subdir else f"{store_subdir}-{suffix}.efi")
    if not dry_run:
        copy_if_not_exists(store_file_path, f"{BOOT_MOUNT_POINT}{efi_file_path}")
        copy_if_not_exists(store_file_path, BOOT_MOUNT_POINT / efi_file_path)
    return efi_file_path


@@ -178,7 +189,7 @@ def write_entry(profile: str | None, generation: int, specialisation: str | None

    try:
        if bootspec.initrdSecrets is not None:
            run([bootspec.initrdSecrets, f"{BOOT_MOUNT_POINT}%s" % (initrd)])
            run([bootspec.initrdSecrets, BOOT_MOUNT_POINT / initrd])
    except subprocess.CalledProcessError:
        if current:
            print("failed to create initrd secrets!", file=sys.stderr)
@@ -188,21 +199,20 @@ def write_entry(profile: str | None, generation: int, specialisation: str | None
                  f'for "{title} - Configuration {generation}", an older generation', file=sys.stderr)
            print("note: this is normal after having removed "
                  "or renamed a file in `boot.initrd.secrets`", file=sys.stderr)
    entry_file = f"{BOOT_MOUNT_POINT}/loader/entries/%s" % (
        generation_conf_filename(profile, generation, specialisation))
    tmp_path = "%s.tmp" % (entry_file)
    entry_file = BOOT_MOUNT_POINT / "loader/entries" / generation_conf_filename(profile, generation, specialisation)
    tmp_path = entry_file.with_suffix(".tmp")
    kernel_params = "init=%s " % bootspec.init

    kernel_params = kernel_params + " ".join(bootspec.kernelParams)
    build_time = int(os.path.getctime(system_dir(profile, generation, specialisation)))
    build_time = int(system_dir(profile, generation, specialisation).stat().st_ctime)
    build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F')

    with open(tmp_path, 'w') as f:
    with tmp_path.open("w") as f:
        f.write(BOOT_ENTRY.format(title=title,
                    sort_key=bootspec.sortKey,
                    generation=generation,
                    kernel=kernel,
                    initrd=initrd,
                    kernel=f"/{kernel}",
                    initrd=f"/{initrd}",
                    kernel_params=kernel_params,
                    description=f"{bootspec.label}, built on {build_date}"))
        if machine_id is not None:
@@ -211,7 +221,7 @@ def write_entry(profile: str | None, generation: int, specialisation: str | None
            f.write("devicetree %s\n" % devicetree)
        f.flush()
        os.fsync(f.fileno())
    os.rename(tmp_path, entry_file)
    tmp_path.rename(entry_file)


def get_generations(profile: str | None = None) -> list[SystemIdentifier]:
@@ -241,41 +251,43 @@ def get_generations(profile: str | None = None) -> list[SystemIdentifier]:


def remove_old_entries(gens: list[SystemIdentifier]) -> None:
    rex_profile = re.compile(r"^" + re.escape(BOOT_MOUNT_POINT) + r"/loader/entries/nixos-(.*)-generation-.*\.conf$")
    rex_generation = re.compile(r"^" + re.escape(BOOT_MOUNT_POINT) + r"/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
    rex_profile = re.compile(r"^nixos-(.*)-generation-.*\.conf$")
    rex_generation = re.compile(r"^nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
    known_paths = []
    for gen in gens:
        bootspec = get_bootspec(gen.profile, gen.generation)
        known_paths.append(copy_from_file(bootspec.kernel, True))
        known_paths.append(copy_from_file(bootspec.initrd, True))
    for path in glob.iglob(f"{BOOT_MOUNT_POINT}/loader/entries/nixos*-generation-[1-9]*.conf"):
        if rex_profile.match(path):
            prof = rex_profile.sub(r"\1", path)
        known_paths.append(copy_from_file(bootspec.kernel, True).name)
        known_paths.append(copy_from_file(bootspec.initrd, True).name)
    for path in (BOOT_MOUNT_POINT / "loader/entries").glob("nixos*-generation-[1-9]*.conf", case_sensitive=False):
        if rex_profile.match(path.name):
            prof = rex_profile.sub(r"\1", path.name)
        else:
            prof = None
        try:
            gen_number = int(rex_generation.sub(r"\1", path))
            gen_number = int(rex_generation.sub(r"\1", path.name))
        except ValueError:
            continue
        if (prof, gen_number, None) not in gens:
            os.unlink(path)
    for path in glob.iglob(f"{BOOT_MOUNT_POINT}/{NIXOS_DIR}/*"):
        if path not in known_paths and not os.path.isdir(path):
            os.unlink(path)
            path.unlink()
    for path in (BOOT_MOUNT_POINT / NIXOS_DIR).iterdir():
        if path.name not in known_paths and not path.is_dir():
            path.unlink()


def cleanup_esp() -> None:
    for path in glob.iglob(f"{EFI_SYS_MOUNT_POINT}/loader/entries/nixos*"):
        os.unlink(path)
    if os.path.isdir(f"{EFI_SYS_MOUNT_POINT}/{NIXOS_DIR}"):
        shutil.rmtree(f"{EFI_SYS_MOUNT_POINT}/{NIXOS_DIR}")
    for path in (EFI_SYS_MOUNT_POINT / "loader/entries").glob("nixos*"):
        path.unlink()
    nixos_dir = EFI_SYS_MOUNT_POINT / NIXOS_DIR
    if nixos_dir.is_dir():
        shutil.rmtree(nixos_dir)


def get_profiles() -> list[str]:
    if os.path.isdir("/nix/var/nix/profiles/system-profiles/"):
        return [x
            for x in os.listdir("/nix/var/nix/profiles/system-profiles/")
            if not x.endswith("-link")]
    system_profiles = Path("/nix/var/nix/profiles/system-profiles/")
    if system_profiles.is_dir():
        return [x.name
            for x in system_profiles.iterdir()
            if not x.name.endswith("-link")]
    else:
        return []

@@ -306,8 +318,7 @@ def install_bootloader(args: argparse.Namespace) -> None:

    if os.getenv("NIXOS_INSTALL_BOOTLOADER") == "1":
        # bootctl uses fopen() with modes "wxe" and fails if the file exists.
        if os.path.exists(LOADER_CONF):
            os.unlink(LOADER_CONF)
        LOADER_CONF.unlink(missing_ok=True)

        run(
            [f"{SYSTEMD}/bin/bootctl", f"--esp-path={EFI_SYS_MOUNT_POINT}"]
@@ -356,8 +367,8 @@ def install_bootloader(args: argparse.Namespace) -> None:
                + ["update"]
            )

    os.makedirs(f"{BOOT_MOUNT_POINT}/{NIXOS_DIR}", exist_ok=True)
    os.makedirs(f"{BOOT_MOUNT_POINT}/loader/entries", exist_ok=True)
    (BOOT_MOUNT_POINT / NIXOS_DIR).mkdir(parents=True, exist_ok=True)
    (BOOT_MOUNT_POINT / "loader/entries").mkdir(parents=True, exist_ok=True)

    gens = get_generations()
    for profile in get_profiles():
@@ -368,7 +379,7 @@ def install_bootloader(args: argparse.Namespace) -> None:
    for gen in gens:
        try:
            bootspec = get_bootspec(gen.profile, gen.generation)
            is_default = os.path.dirname(bootspec.init) == args.default_config
            is_default = Path(bootspec.init).parent == Path(args.default_config)
            write_entry(*gen, machine_id, bootspec, current=is_default)
            for specialisation in bootspec.specialisations.keys():
                write_entry(gen.profile, gen.generation, specialisation, machine_id, bootspec, current=is_default)
@@ -388,22 +399,21 @@ def install_bootloader(args: argparse.Namespace) -> None:
        # automatically, as we don't have information about the mount point anymore.
        cleanup_esp()

    for root, _, files in os.walk(f"{BOOT_MOUNT_POINT}/{NIXOS_DIR}/.extra-files", topdown=False):
        relative_root = root.removeprefix(f"{BOOT_MOUNT_POINT}/{NIXOS_DIR}/.extra-files").removeprefix("/")
        actual_root = os.path.join(f"{BOOT_MOUNT_POINT}", relative_root)
    extra_files_dir = BOOT_MOUNT_POINT / NIXOS_DIR / ".extra-files"
    for root, _, files in extra_files_dir.walk(top_down=False):
        relative_root = root.relative_to(extra_files_dir)
        actual_root = BOOT_MOUNT_POINT / relative_root

        for file in files:
            actual_file = os.path.join(actual_root, file)

            if os.path.exists(actual_file):
                os.unlink(actual_file)
            os.unlink(os.path.join(root, file))
            actual_file = actual_root / file
            actual_file.unlink(missing_ok=True)
            (root / file).unlink()

        if not len(os.listdir(actual_root)):
            os.rmdir(actual_root)
        os.rmdir(root)
        if not list(actual_root.iterdir()):
            actual_root.rmdir()
        root.rmdir()

    os.makedirs(f"{BOOT_MOUNT_POINT}/{NIXOS_DIR}/.extra-files", exist_ok=True)
    extra_files_dir.mkdir(parents=True, exist_ok=True)

    run([COPY_EXTRA_FILES])