Loading nixos/lib/test-driver/test_driver/driver.py +46 −9 Original line number Diff line number Diff line Loading @@ -12,6 +12,8 @@ from test_driver.machine import Machine, NixStartScript, retry from test_driver.polling_condition import PollingCondition from test_driver.vlan import VLan SENTINEL = object() def get_tmp_dir() -> Path: """Returns a temporary directory that is defined by TMPDIR, TEMP, TMP or CWD Loading Loading @@ -187,23 +189,58 @@ class Driver: # to swallow them and prevent itself from terminating. os.kill(os.getpid(), signal.SIGTERM) def create_machine(self, args: Dict[str, Any]) -> Machine: def create_machine( self, start_command: str | dict, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: # Legacy args handling # FIXME: remove after 24.05 if isinstance(start_command, dict): if name is not None or keep_vm_state: raise TypeError( "Dictionary passed to create_machine must be the only argument" ) args = start_command start_command = args.pop("startCommand", SENTINEL) if start_command is SENTINEL: raise TypeError( "Dictionary passed to create_machine must contain startCommand" ) if not isinstance(start_command, str): raise TypeError( f"startCommand must be a string, got: {repr(start_command)}" ) name = args.pop("name", None) keep_vm_state = args.pop("keep_vm_state", False) if args: raise TypeError( f"Unsupported arguments passed to create_machine: {args}" ) rootlog.warning( "Using create_machine with a single dictionary argument is deprecated, and will be removed in NixOS 24.11" ) # End legacy args handling tmp_dir = get_tmp_dir() if args.get("startCommand"): start_command: str = args.get("startCommand", "") cmd = NixStartScript(start_command) name = args.get("name", cmd.machine_name) else: cmd = Machine.create_startcommand(args) # type: ignore name = args.get("name", "machine") name = name or cmd.machine_name return Machine( tmp_dir=tmp_dir, out_dir=self.out_dir, start_command=cmd, name=name, keep_vm_state=args.get("keep_vm_state", False), keep_vm_state=keep_vm_state, ) def serial_stdout_on(self) -> None: Loading nixos/lib/test-driver/test_driver/machine.py +0 −95 Original line number Diff line number Diff line Loading @@ -208,7 +208,6 @@ class StartCommand: ), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, cwd=state_dir, env=self.build_environment(state_dir, shared_dir), Loading @@ -235,77 +234,6 @@ class NixStartScript(StartCommand): return name class LegacyStartCommand(StartCommand): """Used in some places to create an ad-hoc machine instead of using nix test instrumentation + module system for that purpose. Legacy. """ def __init__( self, netBackendArgs: Optional[str] = None, # noqa: N803 netFrontendArgs: Optional[str] = None, # noqa: N803 hda: Optional[Tuple[Path, str]] = None, cdrom: Optional[str] = None, usb: Optional[str] = None, bios: Optional[str] = None, qemuBinary: Optional[str] = None, # noqa: N803 qemuFlags: Optional[str] = None, # noqa: N803 ): if qemuBinary is not None: self._cmd = qemuBinary else: self._cmd = "qemu-kvm" self._cmd += " -m 384" # networking net_backend = "-netdev user,id=net0" net_frontend = "-device virtio-net-pci,netdev=net0" if netBackendArgs is not None: net_backend += "," + netBackendArgs if netFrontendArgs is not None: net_frontend += "," + netFrontendArgs self._cmd += f" {net_backend} {net_frontend}" # hda hda_cmd = "" if hda is not None: hda_path = hda[0].resolve() hda_interface = hda[1] if hda_interface == "scsi": hda_cmd += ( f" -drive id=hda,file={hda_path},werror=report,if=none" " -device scsi-hd,drive=hda" ) else: hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" self._cmd += hda_cmd # cdrom if cdrom is not None: self._cmd += f" -cdrom {cdrom}" # usb usb_cmd = "" if usb is not None: # https://github.com/qemu/qemu/blob/master/docs/usb2.txt usb_cmd += ( " -device usb-ehci" f" -drive id=usbdisk,file={usb},if=none,readonly" " -device usb-storage,drive=usbdisk " ) self._cmd += usb_cmd # bios if bios is not None: self._cmd += f" -bios {bios}" # qemu flags if qemuFlags is not None: self._cmd += f" {qemuFlags}" class Machine: """A handle to the machine with this name, that also knows how to manage the machine lifecycle with the help of a start script / command.""" Loading Loading @@ -377,29 +305,6 @@ class Machine: self.booted = False self.connected = False @staticmethod def create_startcommand(args: Dict[str, str]) -> StartCommand: rootlog.warning( "Using legacy create_startcommand(), " "please use proper nix test vm instrumentation, instead " "to generate the appropriate nixos test vm qemu startup script" ) hda = None if args.get("hda"): hda_arg: str = args.get("hda", "") hda_arg_path: Path = Path(hda_arg) hda = (hda_arg_path, args.get("hdaInterface", "")) return LegacyStartCommand( netBackendArgs=args.get("netBackendArgs"), netFrontendArgs=args.get("netFrontendArgs"), hda=hda, cdrom=args.get("cdrom"), usb=args.get("usb"), bios=args.get("bios"), qemuBinary=args.get("qemuBinary"), qemuFlags=args.get("qemuFlags"), ) def is_up(self) -> bool: return self.booted and self.connected Loading nixos/lib/test-script-prepend.py +12 −1 Original line number Diff line number Diff line Loading @@ -26,6 +26,17 @@ class PollingConditionProtocol(Protocol): raise Exception("This is just type information for the Nix test driver") class CreateMachineProtocol(Protocol): def __call__( self, start_command: str | dict, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: raise Exception("This is just type information for the Nix test driver") start_all: Callable[[], None] subtest: Callable[[str], ContextManager[None]] retry: RetryProtocol Loading @@ -34,7 +45,7 @@ machines: List[Machine] vlans: List[VLan] driver: Driver log: Logger create_machine: Callable[[Dict[str, Any]], Machine] create_machine: CreateMachineProtocol run_tests: Callable[[], None] join_all: Callable[[], None] serial_stdout_off: Callable[[], None] Loading nixos/tests/boot.nix +53 −29 Original line number Diff line number Diff line Loading @@ -4,10 +4,41 @@ }: with import ../lib/testing-python.nix { inherit system pkgs; }; with pkgs.lib; let qemu-common = import ../lib/qemu-common.nix { inherit (pkgs) lib pkgs; }; lib = pkgs.lib; qemu-common = import ../lib/qemu-common.nix { inherit lib pkgs; }; mkStartCommand = { memory ? 2048, cdrom ? null, usb ? null, pxe ? null, uboot ? false, uefi ? false, extraFlags ? [], }: let qemu = qemu-common.qemuBinary pkgs.qemu_test; flags = [ "-m" (toString memory) "-netdev" ("user,id=net0" + (lib.optionalString (pxe != null) ",tftp=${pxe},bootfile=netboot.ipxe")) "-device" ("virtio-net-pci,netdev=net0" + (lib.optionalString (pxe != null && uefi) ",romfile=${pkgs.ipxe}/ipxe.efirom")) ] ++ lib.optionals (cdrom != null) [ "-cdrom" cdrom ] ++ lib.optionals (usb != null) [ "-device" "usb-ehci" "-drive" "id=usbdisk,file=${usb},if=none,readonly" "-device" "usb-storage,drive=usbdisk" ] ++ lib.optionals (pxe != null) [ "-boot" "order=n" ] ++ lib.optionals uefi [ "-drive" "if=pflash,format=raw,unit=0,readonly=on,file=${pkgs.OVMF.firmware}" "-drive" "if=pflash,format=raw,unit=1,readonly=on,file=${pkgs.OVMF.variables}" ] ++ extraFlags; flagsStr = lib.concatStringsSep " " flags; in "${qemu} ${flagsStr}"; iso = (import ../lib/eval-config.nix { Loading @@ -28,21 +59,16 @@ let ]; }).config.system.build.sdImage; pythonDict = params: "\n {\n ${concatStringsSep ",\n " (mapAttrsToList (name: param: "\"${name}\": \"${param}\"") params)},\n }\n"; makeBootTest = name: extraConfig: makeBootTest = name: config: let machineConfig = pythonDict ({ qemuBinary = qemu-common.qemuBinary pkgs.qemu_test; qemuFlags = "-m 768"; } // extraConfig); startCommand = mkStartCommand config; in makeTest { name = "boot-" + name; nodes = { }; testScript = '' machine = create_machine(${machineConfig}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify --no-trust -r --option experimental-features nix-command /run/current-system") Loading Loading @@ -73,43 +99,35 @@ let config.system.build.netbootIpxeScript ]; }; machineConfig = pythonDict ({ qemuBinary = qemu-common.qemuBinary pkgs.qemu_test; qemuFlags = "-boot order=n -m 2000"; netBackendArgs = "tftp=${ipxeBootDir},bootfile=netboot.ipxe"; startCommand = mkStartCommand ({ pxe = ipxeBootDir; } // extraConfig); in makeTest { name = "boot-netboot-" + name; nodes = { }; testScript = '' machine = create_machine(${machineConfig}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.shutdown() ''; }; uefiBinary = { x86_64-linux = "${pkgs.OVMF.fd}/FV/OVMF.fd"; aarch64-linux = "${pkgs.OVMF.fd}/FV/QEMU_EFI.fd"; }.${pkgs.stdenv.hostPlatform.system}; in { uefiCdrom = makeBootTest "uefi-cdrom" { uefi = true; cdrom = "${iso}/iso/${iso.isoName}"; bios = uefiBinary; }; uefiUsb = makeBootTest "uefi-usb" { uefi = true; usb = "${iso}/iso/${iso.isoName}"; bios = uefiBinary; }; uefiNetboot = makeNetbootTest "uefi" { bios = uefiBinary; # Custom ROM is needed for EFI PXE boot. I failed to understand exactly why, because QEMU should still use iPXE for EFI. netFrontendArgs = "romfile=${pkgs.ipxe}/ipxe.efirom"; uefi = true; }; } // optionalAttrs (pkgs.stdenv.hostPlatform.system == "x86_64-linux") { } // lib.optionalAttrs (pkgs.stdenv.hostPlatform.system == "x86_64-linux") { biosCdrom = makeBootTest "bios-cdrom" { cdrom = "${iso}/iso/${iso.isoName}"; }; Loading @@ -124,9 +142,12 @@ in { sdImage = "${sd}/sd-image/${sd.imageName}"; mutableImage = "/tmp/linked-image.qcow2"; machineConfig = pythonDict { bios = "${pkgs.ubootQemuX86}/u-boot.rom"; qemuFlags = "-m 768 -machine type=pc,accel=tcg -drive file=${mutableImage},if=ide,format=qcow2"; startCommand = mkStartCommand { extraFlags = [ "-bios" "${pkgs.ubootQemuX86}/u-boot.rom" "-machine" "type=pc,accel=tcg" "-drive" "file=${mutableImage},if=virtio" ]; }; in makeTest { name = "boot-uboot-extlinux"; Loading @@ -138,11 +159,14 @@ in { if os.system("qemu-img create -f qcow2 -F raw -b ${sdImage} ${mutableImage}") != 0: raise RuntimeError("Could not create mutable linked image") machine = create_machine(${machineConfig}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify -r --no-trust --option experimental-features nix-command /run/current-system") machine.shutdown() ''; # kernel can't find rootfs after boot - investigate? meta.broken = true; }; } nixos/tests/common/ec2.nix +1 −1 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ with pkgs.lib; + " $QEMU_OPTS" ) machine = create_machine({"startCommand": start_command}) machine = create_machine(start_command) try: '' + indentLines script + '' finally: Loading Loading
nixos/lib/test-driver/test_driver/driver.py +46 −9 Original line number Diff line number Diff line Loading @@ -12,6 +12,8 @@ from test_driver.machine import Machine, NixStartScript, retry from test_driver.polling_condition import PollingCondition from test_driver.vlan import VLan SENTINEL = object() def get_tmp_dir() -> Path: """Returns a temporary directory that is defined by TMPDIR, TEMP, TMP or CWD Loading Loading @@ -187,23 +189,58 @@ class Driver: # to swallow them and prevent itself from terminating. os.kill(os.getpid(), signal.SIGTERM) def create_machine(self, args: Dict[str, Any]) -> Machine: def create_machine( self, start_command: str | dict, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: # Legacy args handling # FIXME: remove after 24.05 if isinstance(start_command, dict): if name is not None or keep_vm_state: raise TypeError( "Dictionary passed to create_machine must be the only argument" ) args = start_command start_command = args.pop("startCommand", SENTINEL) if start_command is SENTINEL: raise TypeError( "Dictionary passed to create_machine must contain startCommand" ) if not isinstance(start_command, str): raise TypeError( f"startCommand must be a string, got: {repr(start_command)}" ) name = args.pop("name", None) keep_vm_state = args.pop("keep_vm_state", False) if args: raise TypeError( f"Unsupported arguments passed to create_machine: {args}" ) rootlog.warning( "Using create_machine with a single dictionary argument is deprecated, and will be removed in NixOS 24.11" ) # End legacy args handling tmp_dir = get_tmp_dir() if args.get("startCommand"): start_command: str = args.get("startCommand", "") cmd = NixStartScript(start_command) name = args.get("name", cmd.machine_name) else: cmd = Machine.create_startcommand(args) # type: ignore name = args.get("name", "machine") name = name or cmd.machine_name return Machine( tmp_dir=tmp_dir, out_dir=self.out_dir, start_command=cmd, name=name, keep_vm_state=args.get("keep_vm_state", False), keep_vm_state=keep_vm_state, ) def serial_stdout_on(self) -> None: Loading
nixos/lib/test-driver/test_driver/machine.py +0 −95 Original line number Diff line number Diff line Loading @@ -208,7 +208,6 @@ class StartCommand: ), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, cwd=state_dir, env=self.build_environment(state_dir, shared_dir), Loading @@ -235,77 +234,6 @@ class NixStartScript(StartCommand): return name class LegacyStartCommand(StartCommand): """Used in some places to create an ad-hoc machine instead of using nix test instrumentation + module system for that purpose. Legacy. """ def __init__( self, netBackendArgs: Optional[str] = None, # noqa: N803 netFrontendArgs: Optional[str] = None, # noqa: N803 hda: Optional[Tuple[Path, str]] = None, cdrom: Optional[str] = None, usb: Optional[str] = None, bios: Optional[str] = None, qemuBinary: Optional[str] = None, # noqa: N803 qemuFlags: Optional[str] = None, # noqa: N803 ): if qemuBinary is not None: self._cmd = qemuBinary else: self._cmd = "qemu-kvm" self._cmd += " -m 384" # networking net_backend = "-netdev user,id=net0" net_frontend = "-device virtio-net-pci,netdev=net0" if netBackendArgs is not None: net_backend += "," + netBackendArgs if netFrontendArgs is not None: net_frontend += "," + netFrontendArgs self._cmd += f" {net_backend} {net_frontend}" # hda hda_cmd = "" if hda is not None: hda_path = hda[0].resolve() hda_interface = hda[1] if hda_interface == "scsi": hda_cmd += ( f" -drive id=hda,file={hda_path},werror=report,if=none" " -device scsi-hd,drive=hda" ) else: hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" self._cmd += hda_cmd # cdrom if cdrom is not None: self._cmd += f" -cdrom {cdrom}" # usb usb_cmd = "" if usb is not None: # https://github.com/qemu/qemu/blob/master/docs/usb2.txt usb_cmd += ( " -device usb-ehci" f" -drive id=usbdisk,file={usb},if=none,readonly" " -device usb-storage,drive=usbdisk " ) self._cmd += usb_cmd # bios if bios is not None: self._cmd += f" -bios {bios}" # qemu flags if qemuFlags is not None: self._cmd += f" {qemuFlags}" class Machine: """A handle to the machine with this name, that also knows how to manage the machine lifecycle with the help of a start script / command.""" Loading Loading @@ -377,29 +305,6 @@ class Machine: self.booted = False self.connected = False @staticmethod def create_startcommand(args: Dict[str, str]) -> StartCommand: rootlog.warning( "Using legacy create_startcommand(), " "please use proper nix test vm instrumentation, instead " "to generate the appropriate nixos test vm qemu startup script" ) hda = None if args.get("hda"): hda_arg: str = args.get("hda", "") hda_arg_path: Path = Path(hda_arg) hda = (hda_arg_path, args.get("hdaInterface", "")) return LegacyStartCommand( netBackendArgs=args.get("netBackendArgs"), netFrontendArgs=args.get("netFrontendArgs"), hda=hda, cdrom=args.get("cdrom"), usb=args.get("usb"), bios=args.get("bios"), qemuBinary=args.get("qemuBinary"), qemuFlags=args.get("qemuFlags"), ) def is_up(self) -> bool: return self.booted and self.connected Loading
nixos/lib/test-script-prepend.py +12 −1 Original line number Diff line number Diff line Loading @@ -26,6 +26,17 @@ class PollingConditionProtocol(Protocol): raise Exception("This is just type information for the Nix test driver") class CreateMachineProtocol(Protocol): def __call__( self, start_command: str | dict, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: raise Exception("This is just type information for the Nix test driver") start_all: Callable[[], None] subtest: Callable[[str], ContextManager[None]] retry: RetryProtocol Loading @@ -34,7 +45,7 @@ machines: List[Machine] vlans: List[VLan] driver: Driver log: Logger create_machine: Callable[[Dict[str, Any]], Machine] create_machine: CreateMachineProtocol run_tests: Callable[[], None] join_all: Callable[[], None] serial_stdout_off: Callable[[], None] Loading
nixos/tests/boot.nix +53 −29 Original line number Diff line number Diff line Loading @@ -4,10 +4,41 @@ }: with import ../lib/testing-python.nix { inherit system pkgs; }; with pkgs.lib; let qemu-common = import ../lib/qemu-common.nix { inherit (pkgs) lib pkgs; }; lib = pkgs.lib; qemu-common = import ../lib/qemu-common.nix { inherit lib pkgs; }; mkStartCommand = { memory ? 2048, cdrom ? null, usb ? null, pxe ? null, uboot ? false, uefi ? false, extraFlags ? [], }: let qemu = qemu-common.qemuBinary pkgs.qemu_test; flags = [ "-m" (toString memory) "-netdev" ("user,id=net0" + (lib.optionalString (pxe != null) ",tftp=${pxe},bootfile=netboot.ipxe")) "-device" ("virtio-net-pci,netdev=net0" + (lib.optionalString (pxe != null && uefi) ",romfile=${pkgs.ipxe}/ipxe.efirom")) ] ++ lib.optionals (cdrom != null) [ "-cdrom" cdrom ] ++ lib.optionals (usb != null) [ "-device" "usb-ehci" "-drive" "id=usbdisk,file=${usb},if=none,readonly" "-device" "usb-storage,drive=usbdisk" ] ++ lib.optionals (pxe != null) [ "-boot" "order=n" ] ++ lib.optionals uefi [ "-drive" "if=pflash,format=raw,unit=0,readonly=on,file=${pkgs.OVMF.firmware}" "-drive" "if=pflash,format=raw,unit=1,readonly=on,file=${pkgs.OVMF.variables}" ] ++ extraFlags; flagsStr = lib.concatStringsSep " " flags; in "${qemu} ${flagsStr}"; iso = (import ../lib/eval-config.nix { Loading @@ -28,21 +59,16 @@ let ]; }).config.system.build.sdImage; pythonDict = params: "\n {\n ${concatStringsSep ",\n " (mapAttrsToList (name: param: "\"${name}\": \"${param}\"") params)},\n }\n"; makeBootTest = name: extraConfig: makeBootTest = name: config: let machineConfig = pythonDict ({ qemuBinary = qemu-common.qemuBinary pkgs.qemu_test; qemuFlags = "-m 768"; } // extraConfig); startCommand = mkStartCommand config; in makeTest { name = "boot-" + name; nodes = { }; testScript = '' machine = create_machine(${machineConfig}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify --no-trust -r --option experimental-features nix-command /run/current-system") Loading Loading @@ -73,43 +99,35 @@ let config.system.build.netbootIpxeScript ]; }; machineConfig = pythonDict ({ qemuBinary = qemu-common.qemuBinary pkgs.qemu_test; qemuFlags = "-boot order=n -m 2000"; netBackendArgs = "tftp=${ipxeBootDir},bootfile=netboot.ipxe"; startCommand = mkStartCommand ({ pxe = ipxeBootDir; } // extraConfig); in makeTest { name = "boot-netboot-" + name; nodes = { }; testScript = '' machine = create_machine(${machineConfig}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.shutdown() ''; }; uefiBinary = { x86_64-linux = "${pkgs.OVMF.fd}/FV/OVMF.fd"; aarch64-linux = "${pkgs.OVMF.fd}/FV/QEMU_EFI.fd"; }.${pkgs.stdenv.hostPlatform.system}; in { uefiCdrom = makeBootTest "uefi-cdrom" { uefi = true; cdrom = "${iso}/iso/${iso.isoName}"; bios = uefiBinary; }; uefiUsb = makeBootTest "uefi-usb" { uefi = true; usb = "${iso}/iso/${iso.isoName}"; bios = uefiBinary; }; uefiNetboot = makeNetbootTest "uefi" { bios = uefiBinary; # Custom ROM is needed for EFI PXE boot. I failed to understand exactly why, because QEMU should still use iPXE for EFI. netFrontendArgs = "romfile=${pkgs.ipxe}/ipxe.efirom"; uefi = true; }; } // optionalAttrs (pkgs.stdenv.hostPlatform.system == "x86_64-linux") { } // lib.optionalAttrs (pkgs.stdenv.hostPlatform.system == "x86_64-linux") { biosCdrom = makeBootTest "bios-cdrom" { cdrom = "${iso}/iso/${iso.isoName}"; }; Loading @@ -124,9 +142,12 @@ in { sdImage = "${sd}/sd-image/${sd.imageName}"; mutableImage = "/tmp/linked-image.qcow2"; machineConfig = pythonDict { bios = "${pkgs.ubootQemuX86}/u-boot.rom"; qemuFlags = "-m 768 -machine type=pc,accel=tcg -drive file=${mutableImage},if=ide,format=qcow2"; startCommand = mkStartCommand { extraFlags = [ "-bios" "${pkgs.ubootQemuX86}/u-boot.rom" "-machine" "type=pc,accel=tcg" "-drive" "file=${mutableImage},if=virtio" ]; }; in makeTest { name = "boot-uboot-extlinux"; Loading @@ -138,11 +159,14 @@ in { if os.system("qemu-img create -f qcow2 -F raw -b ${sdImage} ${mutableImage}") != 0: raise RuntimeError("Could not create mutable linked image") machine = create_machine(${machineConfig}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify -r --no-trust --option experimental-features nix-command /run/current-system") machine.shutdown() ''; # kernel can't find rootfs after boot - investigate? meta.broken = true; }; }
nixos/tests/common/ec2.nix +1 −1 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ with pkgs.lib; + " $QEMU_OPTS" ) machine = create_machine({"startCommand": start_command}) machine = create_machine(start_command) try: '' + indentLines script + '' finally: Loading