Unverified Commit 9ec67884 authored by Stefan Gränitz's avatar Stefan Gränitz Committed by GitHub
Browse files

[lldb] Add HTTPS tests for SymbolLocatorSymStore (#192274)

Using self-signed certificates is the only way forward for testing 
security features on the HTTPS path. As we don't want to allow
any arbitrary certificate, we add a new property that pins a
fingerprint and any self-signed certificate is only accepted if it
matches this fingerprint.
parent 35480b22
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -1380,3 +1380,14 @@ def skipUnlessArm64eSupported(func):
        return None

    return skipTestIfFn(can_build_and_run_arm64e)(func)


def skipUnlessPackageAvailable(name):
    """Skip the test case if the named package is not available on the system."""
    available = True
    try:
        __import__(name)
    except ImportError:
        available = False

    return unittest.skipUnless(available, f"requires the '{name}' package")
+21 −0
Original line number Diff line number Diff line
@@ -67,6 +67,25 @@ public:
      return s->GetCurrentValue();
    return SymbolLocatorSymStore::GetSystemDefaultCachePath();
  }

  std::optional<std::string> GetTLSCertFingerprint() const {
    OptionValueString *s =
        m_collection_sp->GetPropertyAtIndexAsOptionValueString(
            ePropertyTLSCertFingerprint);
    if (!s)
      return {};
    llvm::StringRef val = s->GetCurrentValueAsRef();
    if (val.empty())
      return {};
    if (val.size() != 64 || !llvm::all_of(val, llvm::isHexDigit)) {
      Debugger::ReportWarning(llvm::formatv(
          "plugin.symbol-locator.symstore.tls-cert-fingerprint: expected a "
          "64-character hex string (SHA-256), but got '{0}', ignoring",
          val));
      return {};
    }
    return val.lower();
  }
};

} // namespace
@@ -269,6 +288,8 @@ RequestFileFromSymStoreServerHTTP(llvm::StringRef base_url, llvm::StringRef key,
      client);

  llvm::HTTPRequest request(source_url);
  request.PinnedCertFingerprint =
      GetGlobalPluginProperties().GetTLSCertFingerprint();
  if (llvm::Error Err = client.perform(request, Handler)) {
    Debugger::ReportWarning(
        llvm::formatv("failed to download from SymStore '{0}': {1}", source_url,
+3 −0
Original line number Diff line number Diff line
@@ -8,4 +8,7 @@ let Definition = "symbollocatorsymstore", Path = "plugin.symbol-locator.symstore
  def CachePath : Property<"cache", "String">,
    DefaultStringValue<"">,
    Desc<"Default cache directory for downloaded symbol files. Used when no cache is specified in _NT_SYMBOL_PATH.">;
  def TLSCertFingerprint : Property<"tls-cert-fingerprint", "String">,
    DefaultStringValue<"">,
    Desc<"SHA-256 fingerprint (lowercase hex, no separators) of the symbol server's TLS certificate. When set, LLDB will accept an HTTPS server that presents this self-signed certificate even if it is not trusted by the system certificate store (Windows only).">;
}
+162 −2
Original line number Diff line number Diff line
import datetime
import http.server
import ipaddress
import os
import shutil
import socketserver
import ssl
import sys
import threading
from functools import partial
@@ -118,6 +121,107 @@ class HTTPServer:
            self._thread.join()


class HTTPSServer:
    """
    Context Manager to serve a local directory tree via HTTPS.
    """

    class ErrorAwareTCPServer(socketserver.ThreadingTCPServer):
        """TCP layer that will suppress errors of the given type."""

        def __init__(self, address, handler, err):
            self.err = err
            super().__init__(address, handler)

        def handle_error(self, request, client_address):
            if isinstance(sys.exc_info()[1], self.err):
                return
            super().handle_error(request, client_address)

    def __init__(self, dir=None, handler=None, cert=None):
        if handler is None:
            handler = partial(http.server.SimpleHTTPRequestHandler, directory=dir)
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        ctx.load_cert_chain(cert.file, cert.key_file)
        address = ("localhost", 0)  # auto-select free port
        self._server = self.ErrorAwareTCPServer(address, handler, ssl.SSLError)
        self._server.socket = ctx.wrap_socket(self._server.socket, server_side=True)
        self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)

    def __enter__(self):
        self._thread.start()
        host, port = self._server.server_address
        return f"https://{host}:{port}"

    def __exit__(self, *exc_info):
        if self._server:
            self._server.shutdown()
            self._server.server_close()
        if self._thread:
            self._thread.join()


class SelfSignedCert:
    """
    Self-signed cert/key pair for localhost.
    """

    def __init__(self, tmpdir):
        from cryptography import x509
        from cryptography.x509.oid import NameOID
        from cryptography.hazmat.primitives import hashes, serialization
        from cryptography.hazmat.primitives.asymmetric import rsa

        key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")])
        now = datetime.datetime.now(datetime.timezone.utc)
        cert = (
            x509.CertificateBuilder()
            .subject_name(name)
            .issuer_name(name)
            .public_key(key.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(now)
            .not_valid_after(now + datetime.timedelta(days=1))
            .add_extension(
                x509.SubjectAlternativeName(
                    [
                        x509.DNSName("localhost"),
                        x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
                    ]
                ),
                critical=False,
            )
            .sign(key, hashes.SHA256())
        )
        os.makedirs(tmpdir, exist_ok=False)
        self.key_file = os.path.join(tmpdir, "server.key")
        self.file = os.path.join(tmpdir, "server.crt")
        with open(self.key_file, "wb") as f:
            f.write(
                key.private_bytes(
                    serialization.Encoding.PEM,
                    serialization.PrivateFormat.TraditionalOpenSSL,
                    serialization.NoEncryption(),
                )
            )
        with open(self.file, "wb") as f:
            f.write(cert.public_bytes(serialization.Encoding.PEM))
        self.fingerprint = cert.fingerprint(hashes.SHA256()).hex()


class RedirectHandler(http.server.BaseHTTPRequestHandler):
    base_url = None

    def do_GET(self):
        self.send_response(301)
        self.send_header("Location", self.base_url + self.path)
        self.end_headers()

    def log_message(self, *args):
        pass  # suppress request logs


class RequestCounter(http.server.SimpleHTTPRequestHandler):
    requests = 0  # class-level so all instances share one counter

@@ -200,8 +304,6 @@ class SymStoreTests(TestBase):
                    warnings = err_file.read().decode()
                self.assertEqual(warnings, "")

    # TODO: Add test coverage for common HTTPS security scenarios, e.g. self-signed
    # certs, non-HTTPS redirects, etc.
    def test_http(self):
        """
        Check that breakpoint resolves with remote SymStore.
@@ -323,3 +425,61 @@ class SymStoreTests(TestBase):
                with NtSymbolPath(dir):
                    self.try_breakpoint(exe, should_have_loc=True)
            self.assertEqual(RequestCounter.requests, 0)

    @skipUnlessPackageAvailable("cryptography")
    def test_https(self):
        """
        Check that breakpoint resolves with remote SymStore via HTTPS.
        """
        exe, sym = self.build_inferior()
        with MockedSymStore(self, exe, sym) as symstore_dir:
            cert = SelfSignedCert(self.getBuildArtifact("cert"))
            with HTTPSServer(dir=symstore_dir, cert=cert) as https_url:
                # We accept only the self-signed certificate with this fingerprint
                self.runCmd(
                    f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {cert.fingerprint}"
                )
                self.runCmd(
                    f"settings set plugin.symbol-locator.symstore.urls {https_url}"
                )
                self.try_breakpoint(exe, should_have_loc=True)

    @skipUnlessPackageAvailable("cryptography")
    def test_https_reject_selfsigned_cert(self):
        """
        Check that LLDB rejects an HTTPS server with an untrusted self-signed cert.
        """
        exe, sym = self.build_inferior()
        with MockedSymStore(self, exe, sym) as symstore_dir:
            cert = SelfSignedCert(self.getBuildArtifact("cert"))
            with HTTPSServer(dir=symstore_dir, cert=cert) as https_url:
                # No fingerprint set
                self.runCmd(
                    f"settings set plugin.symbol-locator.symstore.urls {https_url}"
                )
                self.try_breakpoint(exe, should_have_loc=False)
                # Incorrect fingerprint set
                bogus = "DEADBEEFCAFEBABE"
                self.runCmd(
                    f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {bogus}{bogus}{bogus}{bogus}"
                )
                self.try_breakpoint(exe, should_have_loc=False)

    @skipUnlessPackageAvailable("cryptography")
    def test_https_reject_redirect_http(self):
        """
        Check that LLDB does not retrieve symbols from servers that redirect with security downgrades.
        """
        exe, sym = self.build_inferior()
        with MockedSymStore(self, exe, sym) as symstore_dir:
            with HTTPServer(symstore_dir) as http_url:
                RedirectHandler.base_url = http_url
                cert = SelfSignedCert(self.getBuildArtifact("cert"))
                with HTTPSServer(handler=RedirectHandler, cert=cert) as https_url:
                    self.runCmd(
                        f"settings set plugin.symbol-locator.symstore.urls {https_url}"
                    )
                    self.runCmd(
                        f"settings set plugin.symbol-locator.symstore.tls-cert-fingerprint {cert.fingerprint}"
                    )
                    self.try_breakpoint(exe, should_have_loc=False)
+4 −0
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@
#include "llvm/Support/MemoryBuffer.h"

#include <chrono>
#include <optional>

namespace llvm {

@@ -31,7 +32,10 @@ struct HTTPRequest {
  SmallString<128> Url;
  SmallVector<std::string, 0> Headers;
  HTTPMethod Method = HTTPMethod::GET;
  // Follow redirects without security downgrades.
  bool FollowRedirects = true;
  // Allow self-signed TLS certificates with this SHA-256 (WinHTTP only).
  std::optional<std::string> PinnedCertFingerprint;
  HTTPRequest(StringRef Url);
};

Loading