Loading nixos/tests/acme.nix +41 −30 Original line number Diff line number Diff line Loading @@ -407,7 +407,6 @@ in { '' import time TOTAL_RETRIES = 20 Loading @@ -428,6 +427,16 @@ in { return retries + 1 def protect(self, func): def wrapper(*args, retries: int = 0, **kwargs): try: return func(*args, **kwargs) except Exception as err: retries = self.handle_fail(retries, err.args) return wrapper(*args, retries=retries, **kwargs) return wrapper backoff = BackoffTracker() Loading @@ -441,7 +450,9 @@ in { f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}" ) switcher_path = f"/run/current-system/specialisation/{name}/bin/switch-to-configuration" switcher_path = ( f"/run/current-system/specialisation/{name}/bin/switch-to-configuration" ) rc, _ = node.execute(f"test -e '{switcher_path}'") if rc > 0: switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration" Loading @@ -465,8 +476,9 @@ in { actual_issuer = node.succeed( f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}" ).partition("=")[2] print(f"{fname} issuer: {actual_issuer}") assert issuer.lower() in actual_issuer.lower() assert ( issuer.lower() in actual_issuer.lower() ), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}" # Ensure cert comes before chain in fullchain.pem Loading @@ -488,19 +500,21 @@ in { assert False def check_connection(node, domain, retries=0): @backoff.protect def check_connection(node, domain): result = node.succeed( "openssl s_client -brief -verify 2 -CAfile /tmp/ca.crt" f" -servername {domain} -connect {domain}:443 < /dev/null 2>&1" ) for line in result.lower().split("\n"): if "verification" in line and "error" in line: retries = backoff.handle_fail(retries, f"Failed to connect to https://{domain}") return check_connection(node, domain, retries) assert not ( "verification" in line and "error" in line ), f"Failed to connect to https://{domain}" def check_connection_key_bits(node, domain, bits, retries=0): @backoff.protect def check_connection_key_bits(node, domain, bits): result = node.succeed( "openssl s_client -CAfile /tmp/ca.crt" f" -servername {domain} -connect {domain}:443 < /dev/null" Loading @@ -508,12 +522,11 @@ in { ) print("Key type:", result) if bits not in result: retries = backoff.handle_fail(retries, f"Did not find expected number of bits ({bits}) in key") return check_connection_key_bits(node, domain, bits, retries) assert bits in result, f"Did not find expected number of bits ({bits}) in key" def check_stapling(node, domain, retries=0): @backoff.protect def check_stapling(node, domain): # Pebble doesn't provide a full OCSP responder, so just check the URL result = node.succeed( "openssl s_client -CAfile /tmp/ca.crt" Loading @@ -522,20 +535,20 @@ in { ) print("OCSP Responder URL:", result) if "${caDomain}:4002" not in result.lower(): retries = backoff.handle_fail(retries, "OCSP Stapling check failed") return check_stapling(node, domain, retries) assert "${caDomain}:4002" in result.lower(), "OCSP Stapling check failed" def download_ca_certs(node, retries=0): exit_code, _ = node.execute("curl https://${caDomain}:15000/roots/0 > /tmp/ca.crt") exit_code_2, _ = node.execute( "curl https://${caDomain}:15000/intermediate-keys/0 >> /tmp/ca.crt" ) @backoff.protect def download_ca_certs(node): node.succeed("curl https://${caDomain}:15000/roots/0 > /tmp/ca.crt") node.succeed("curl https://${caDomain}:15000/intermediate-keys/0 >> /tmp/ca.crt") if exit_code + exit_code_2 > 0: retries = backoff.handle_fail(retries, "Failed to connect to pebble to download root CA certs") return download_ca_certs(node, retries) @backoff.protect def set_a_record(node): node.succeed( 'curl --data \'{"host": "${caDomain}", "addresses": ["${nodes.acme.networking.primaryIPAddress}"]}\' http://${dnsServerIP nodes}:8055/add-a' ) start_all() Loading @@ -543,9 +556,7 @@ in { dnsserver.wait_for_unit("pebble-challtestsrv.service") client.wait_for_unit("default.target") client.succeed( 'curl --data \'{"host": "${caDomain}", "addresses": ["${nodes.acme.networking.primaryIPAddress}"]}\' http://${dnsServerIP nodes}:8055/add-a' ) set_a_record(client) acme.systemctl("start network-online.target") acme.wait_for_unit("network-online.target") Loading Loading @@ -642,7 +653,7 @@ in { webserver.wait_for_unit("acme-finished-lego.example.test.target") webserver.wait_for_unit("nginx.service") webserver.succeed("echo HENLO && systemctl cat nginx.service") webserver.succeed("test \"$(stat -c '%U' /var/lib/acme/* | uniq)\" = \"root\"") webserver.succeed('test "$(stat -c \'%U\' /var/lib/acme/* | uniq)" = "root"') check_connection(client, "a.example.test") check_connection(client, "lego.example.test") Loading Loading
nixos/tests/acme.nix +41 −30 Original line number Diff line number Diff line Loading @@ -407,7 +407,6 @@ in { '' import time TOTAL_RETRIES = 20 Loading @@ -428,6 +427,16 @@ in { return retries + 1 def protect(self, func): def wrapper(*args, retries: int = 0, **kwargs): try: return func(*args, **kwargs) except Exception as err: retries = self.handle_fail(retries, err.args) return wrapper(*args, retries=retries, **kwargs) return wrapper backoff = BackoffTracker() Loading @@ -441,7 +450,9 @@ in { f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}" ) switcher_path = f"/run/current-system/specialisation/{name}/bin/switch-to-configuration" switcher_path = ( f"/run/current-system/specialisation/{name}/bin/switch-to-configuration" ) rc, _ = node.execute(f"test -e '{switcher_path}'") if rc > 0: switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration" Loading @@ -465,8 +476,9 @@ in { actual_issuer = node.succeed( f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}" ).partition("=")[2] print(f"{fname} issuer: {actual_issuer}") assert issuer.lower() in actual_issuer.lower() assert ( issuer.lower() in actual_issuer.lower() ), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}" # Ensure cert comes before chain in fullchain.pem Loading @@ -488,19 +500,21 @@ in { assert False def check_connection(node, domain, retries=0): @backoff.protect def check_connection(node, domain): result = node.succeed( "openssl s_client -brief -verify 2 -CAfile /tmp/ca.crt" f" -servername {domain} -connect {domain}:443 < /dev/null 2>&1" ) for line in result.lower().split("\n"): if "verification" in line and "error" in line: retries = backoff.handle_fail(retries, f"Failed to connect to https://{domain}") return check_connection(node, domain, retries) assert not ( "verification" in line and "error" in line ), f"Failed to connect to https://{domain}" def check_connection_key_bits(node, domain, bits, retries=0): @backoff.protect def check_connection_key_bits(node, domain, bits): result = node.succeed( "openssl s_client -CAfile /tmp/ca.crt" f" -servername {domain} -connect {domain}:443 < /dev/null" Loading @@ -508,12 +522,11 @@ in { ) print("Key type:", result) if bits not in result: retries = backoff.handle_fail(retries, f"Did not find expected number of bits ({bits}) in key") return check_connection_key_bits(node, domain, bits, retries) assert bits in result, f"Did not find expected number of bits ({bits}) in key" def check_stapling(node, domain, retries=0): @backoff.protect def check_stapling(node, domain): # Pebble doesn't provide a full OCSP responder, so just check the URL result = node.succeed( "openssl s_client -CAfile /tmp/ca.crt" Loading @@ -522,20 +535,20 @@ in { ) print("OCSP Responder URL:", result) if "${caDomain}:4002" not in result.lower(): retries = backoff.handle_fail(retries, "OCSP Stapling check failed") return check_stapling(node, domain, retries) assert "${caDomain}:4002" in result.lower(), "OCSP Stapling check failed" def download_ca_certs(node, retries=0): exit_code, _ = node.execute("curl https://${caDomain}:15000/roots/0 > /tmp/ca.crt") exit_code_2, _ = node.execute( "curl https://${caDomain}:15000/intermediate-keys/0 >> /tmp/ca.crt" ) @backoff.protect def download_ca_certs(node): node.succeed("curl https://${caDomain}:15000/roots/0 > /tmp/ca.crt") node.succeed("curl https://${caDomain}:15000/intermediate-keys/0 >> /tmp/ca.crt") if exit_code + exit_code_2 > 0: retries = backoff.handle_fail(retries, "Failed to connect to pebble to download root CA certs") return download_ca_certs(node, retries) @backoff.protect def set_a_record(node): node.succeed( 'curl --data \'{"host": "${caDomain}", "addresses": ["${nodes.acme.networking.primaryIPAddress}"]}\' http://${dnsServerIP nodes}:8055/add-a' ) start_all() Loading @@ -543,9 +556,7 @@ in { dnsserver.wait_for_unit("pebble-challtestsrv.service") client.wait_for_unit("default.target") client.succeed( 'curl --data \'{"host": "${caDomain}", "addresses": ["${nodes.acme.networking.primaryIPAddress}"]}\' http://${dnsServerIP nodes}:8055/add-a' ) set_a_record(client) acme.systemctl("start network-online.target") acme.wait_for_unit("network-online.target") Loading Loading @@ -642,7 +653,7 @@ in { webserver.wait_for_unit("acme-finished-lego.example.test.target") webserver.wait_for_unit("nginx.service") webserver.succeed("echo HENLO && systemctl cat nginx.service") webserver.succeed("test \"$(stat -c '%U' /var/lib/acme/* | uniq)\" = \"root\"") webserver.succeed('test "$(stat -c \'%U\' /var/lib/acme/* | uniq)" = "root"') check_connection(client, "a.example.test") check_connection(client, "lego.example.test") Loading