Loading nixos/lib/test-driver/test_driver/driver.py +10 −9 Original line number Diff line number Diff line Loading @@ -187,23 +187,24 @@ class Driver: # to swallow them and prevent itself from terminating. os.kill(os.getpid(), signal.SIGTERM) def create_machine(self, args: Dict[str, Any]) -> Machine: def create_machine( self, start_command: str, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: tmp_dir = get_tmp_dir() if args.get("startCommand"): start_command: str = args.get("startCommand", "") cmd = NixStartScript(start_command) name = args.get("name", cmd.machine_name) else: cmd = Machine.create_startcommand(args) # type: ignore name = args.get("name", "machine") name = name or cmd.machine_name return Machine( tmp_dir=tmp_dir, out_dir=self.out_dir, start_command=cmd, name=name, keep_vm_state=args.get("keep_vm_state", False), keep_vm_state=keep_vm_state, ) def serial_stdout_on(self) -> None: Loading nixos/lib/test-driver/test_driver/machine.py +0 −94 Original line number Diff line number Diff line Loading @@ -234,77 +234,6 @@ class NixStartScript(StartCommand): return name class LegacyStartCommand(StartCommand): """Used in some places to create an ad-hoc machine instead of using nix test instrumentation + module system for that purpose. Legacy. """ def __init__( self, netBackendArgs: Optional[str] = None, # noqa: N803 netFrontendArgs: Optional[str] = None, # noqa: N803 hda: Optional[Tuple[Path, str]] = None, cdrom: Optional[str] = None, usb: Optional[str] = None, bios: Optional[str] = None, qemuBinary: Optional[str] = None, # noqa: N803 qemuFlags: Optional[str] = None, # noqa: N803 ): if qemuBinary is not None: self._cmd = qemuBinary else: self._cmd = "qemu-kvm" self._cmd += " -m 384" # networking net_backend = "-netdev user,id=net0" net_frontend = "-device virtio-net-pci,netdev=net0" if netBackendArgs is not None: net_backend += "," + netBackendArgs if netFrontendArgs is not None: net_frontend += "," + netFrontendArgs self._cmd += f" {net_backend} {net_frontend}" # hda hda_cmd = "" if hda is not None: hda_path = hda[0].resolve() hda_interface = hda[1] if hda_interface == "scsi": hda_cmd += ( f" -drive id=hda,file={hda_path},werror=report,if=none" " -device scsi-hd,drive=hda" ) else: hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" self._cmd += hda_cmd # cdrom if cdrom is not None: self._cmd += f" -cdrom {cdrom}" # usb usb_cmd = "" if usb is not None: # https://github.com/qemu/qemu/blob/master/docs/usb2.txt usb_cmd += ( " -device usb-ehci" f" -drive id=usbdisk,file={usb},if=none,readonly" " -device usb-storage,drive=usbdisk " ) self._cmd += usb_cmd # bios if bios is not None: self._cmd += f" -bios {bios}" # qemu flags if qemuFlags is not None: self._cmd += f" {qemuFlags}" class Machine: """A handle to the machine with this name, that also knows how to manage the machine lifecycle with the help of a start script / command.""" Loading Loading @@ -376,29 +305,6 @@ class Machine: self.booted = False self.connected = False @staticmethod def create_startcommand(args: Dict[str, str]) -> StartCommand: rootlog.warning( "Using legacy create_startcommand(), " "please use proper nix test vm instrumentation, instead " "to generate the appropriate nixos test vm qemu startup script" ) hda = None if args.get("hda"): hda_arg: str = args.get("hda", "") hda_arg_path: Path = Path(hda_arg) hda = (hda_arg_path, args.get("hdaInterface", "")) return LegacyStartCommand( netBackendArgs=args.get("netBackendArgs"), netFrontendArgs=args.get("netFrontendArgs"), hda=hda, cdrom=args.get("cdrom"), usb=args.get("usb"), bios=args.get("bios"), qemuBinary=args.get("qemuBinary"), qemuFlags=args.get("qemuFlags"), ) def is_up(self) -> bool: return self.booted and self.connected Loading nixos/lib/test-script-prepend.py +12 −1 Original line number Diff line number Diff line Loading @@ -26,6 +26,17 @@ class PollingConditionProtocol(Protocol): raise Exception("This is just type information for the Nix test driver") class CreateMachineProtocol(Protocol): def __call__( self, start_command: str, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: raise Exception("This is just type information for the Nix test driver") start_all: Callable[[], None] subtest: Callable[[str], ContextManager[None]] retry: RetryProtocol Loading @@ -34,7 +45,7 @@ machines: List[Machine] vlans: List[VLan] driver: Driver log: Logger create_machine: Callable[[Dict[str, Any]], Machine] create_machine: CreateMachineProtocol run_tests: Callable[[], None] join_all: Callable[[], None] serial_stdout_off: Callable[[], None] Loading nixos/tests/boot.nix +3 −3 Original line number Diff line number Diff line Loading @@ -68,7 +68,7 @@ let nodes = { }; testScript = '' machine = create_machine({"startCommand": "${startCommand}"}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify --no-trust -r --option experimental-features nix-command /run/current-system") Loading Loading @@ -107,7 +107,7 @@ let name = "boot-netboot-" + name; nodes = { }; testScript = '' machine = create_machine({"startCommand": "${startCommand}"}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.shutdown() Loading Loading @@ -159,7 +159,7 @@ in { if os.system("qemu-img create -f qcow2 -F raw -b ${sdImage} ${mutableImage}") != 0: raise RuntimeError("Could not create mutable linked image") machine = create_machine({"startCommand": "${startCommand}"}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify -r --no-trust --option experimental-features nix-command /run/current-system") Loading nixos/tests/common/ec2.nix +1 −1 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ with pkgs.lib; + " $QEMU_OPTS" ) machine = create_machine({"startCommand": start_command}) machine = create_machine(start_command) try: '' + indentLines script + '' finally: Loading Loading
nixos/lib/test-driver/test_driver/driver.py +10 −9 Original line number Diff line number Diff line Loading @@ -187,23 +187,24 @@ class Driver: # to swallow them and prevent itself from terminating. os.kill(os.getpid(), signal.SIGTERM) def create_machine(self, args: Dict[str, Any]) -> Machine: def create_machine( self, start_command: str, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: tmp_dir = get_tmp_dir() if args.get("startCommand"): start_command: str = args.get("startCommand", "") cmd = NixStartScript(start_command) name = args.get("name", cmd.machine_name) else: cmd = Machine.create_startcommand(args) # type: ignore name = args.get("name", "machine") name = name or cmd.machine_name return Machine( tmp_dir=tmp_dir, out_dir=self.out_dir, start_command=cmd, name=name, keep_vm_state=args.get("keep_vm_state", False), keep_vm_state=keep_vm_state, ) def serial_stdout_on(self) -> None: Loading
nixos/lib/test-driver/test_driver/machine.py +0 −94 Original line number Diff line number Diff line Loading @@ -234,77 +234,6 @@ class NixStartScript(StartCommand): return name class LegacyStartCommand(StartCommand): """Used in some places to create an ad-hoc machine instead of using nix test instrumentation + module system for that purpose. Legacy. """ def __init__( self, netBackendArgs: Optional[str] = None, # noqa: N803 netFrontendArgs: Optional[str] = None, # noqa: N803 hda: Optional[Tuple[Path, str]] = None, cdrom: Optional[str] = None, usb: Optional[str] = None, bios: Optional[str] = None, qemuBinary: Optional[str] = None, # noqa: N803 qemuFlags: Optional[str] = None, # noqa: N803 ): if qemuBinary is not None: self._cmd = qemuBinary else: self._cmd = "qemu-kvm" self._cmd += " -m 384" # networking net_backend = "-netdev user,id=net0" net_frontend = "-device virtio-net-pci,netdev=net0" if netBackendArgs is not None: net_backend += "," + netBackendArgs if netFrontendArgs is not None: net_frontend += "," + netFrontendArgs self._cmd += f" {net_backend} {net_frontend}" # hda hda_cmd = "" if hda is not None: hda_path = hda[0].resolve() hda_interface = hda[1] if hda_interface == "scsi": hda_cmd += ( f" -drive id=hda,file={hda_path},werror=report,if=none" " -device scsi-hd,drive=hda" ) else: hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" self._cmd += hda_cmd # cdrom if cdrom is not None: self._cmd += f" -cdrom {cdrom}" # usb usb_cmd = "" if usb is not None: # https://github.com/qemu/qemu/blob/master/docs/usb2.txt usb_cmd += ( " -device usb-ehci" f" -drive id=usbdisk,file={usb},if=none,readonly" " -device usb-storage,drive=usbdisk " ) self._cmd += usb_cmd # bios if bios is not None: self._cmd += f" -bios {bios}" # qemu flags if qemuFlags is not None: self._cmd += f" {qemuFlags}" class Machine: """A handle to the machine with this name, that also knows how to manage the machine lifecycle with the help of a start script / command.""" Loading Loading @@ -376,29 +305,6 @@ class Machine: self.booted = False self.connected = False @staticmethod def create_startcommand(args: Dict[str, str]) -> StartCommand: rootlog.warning( "Using legacy create_startcommand(), " "please use proper nix test vm instrumentation, instead " "to generate the appropriate nixos test vm qemu startup script" ) hda = None if args.get("hda"): hda_arg: str = args.get("hda", "") hda_arg_path: Path = Path(hda_arg) hda = (hda_arg_path, args.get("hdaInterface", "")) return LegacyStartCommand( netBackendArgs=args.get("netBackendArgs"), netFrontendArgs=args.get("netFrontendArgs"), hda=hda, cdrom=args.get("cdrom"), usb=args.get("usb"), bios=args.get("bios"), qemuBinary=args.get("qemuBinary"), qemuFlags=args.get("qemuFlags"), ) def is_up(self) -> bool: return self.booted and self.connected Loading
nixos/lib/test-script-prepend.py +12 −1 Original line number Diff line number Diff line Loading @@ -26,6 +26,17 @@ class PollingConditionProtocol(Protocol): raise Exception("This is just type information for the Nix test driver") class CreateMachineProtocol(Protocol): def __call__( self, start_command: str, *, name: Optional[str] = None, keep_vm_state: bool = False, ) -> Machine: raise Exception("This is just type information for the Nix test driver") start_all: Callable[[], None] subtest: Callable[[str], ContextManager[None]] retry: RetryProtocol Loading @@ -34,7 +45,7 @@ machines: List[Machine] vlans: List[VLan] driver: Driver log: Logger create_machine: Callable[[Dict[str, Any]], Machine] create_machine: CreateMachineProtocol run_tests: Callable[[], None] join_all: Callable[[], None] serial_stdout_off: Callable[[], None] Loading
nixos/tests/boot.nix +3 −3 Original line number Diff line number Diff line Loading @@ -68,7 +68,7 @@ let nodes = { }; testScript = '' machine = create_machine({"startCommand": "${startCommand}"}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify --no-trust -r --option experimental-features nix-command /run/current-system") Loading Loading @@ -107,7 +107,7 @@ let name = "boot-netboot-" + name; nodes = { }; testScript = '' machine = create_machine({"startCommand": "${startCommand}"}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.shutdown() Loading Loading @@ -159,7 +159,7 @@ in { if os.system("qemu-img create -f qcow2 -F raw -b ${sdImage} ${mutableImage}") != 0: raise RuntimeError("Could not create mutable linked image") machine = create_machine({"startCommand": "${startCommand}"}) machine = create_machine("${startCommand}") machine.start() machine.wait_for_unit("multi-user.target") machine.succeed("nix store verify -r --no-trust --option experimental-features nix-command /run/current-system") Loading
nixos/tests/common/ec2.nix +1 −1 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ with pkgs.lib; + " $QEMU_OPTS" ) machine = create_machine({"startCommand": start_command}) machine = create_machine(start_command) try: '' + indentLines script + '' finally: Loading