Commit 56973b2d authored by Sarah Brofeldt's avatar Sarah Brofeldt
Browse files

nixos/tests/kafka: test KRaft mode

parent 45f84cdf
Loading
Loading
Loading
Loading
+61 −26
Original line number Diff line number Diff line
@@ -6,45 +6,79 @@
with pkgs.lib;

let
  makeKafkaTest = name: kafkaPackage: (import ./make-test-python.nix ({
  makeKafkaTest = name: { kafkaPackage, mode ? "zookeeper" }: (import ./make-test-python.nix ({
    inherit name;
    meta = with pkgs.lib.maintainers; {
      maintainers = [ nequissimus ];
    };

    nodes = {
      zookeeper1 = { ... }: {
        services.zookeeper = {
          enable = true;
        };

        networking.firewall.allowedTCPPorts = [ 2181 ];
      };
      kafka = { ... }: {
        services.apache-kafka = {
        services.apache-kafka = mkMerge [
          ({
            enable = true;
            package = kafkaPackage;
            settings = {
              "offsets.topic.replication.factor" = 1;
              "log.dirs" = [
                "/var/lib/kafka/logdir1"
                "/var/lib/kafka/logdir2"
              ];
            };
          })
          (mkIf (mode == "zookeeper") {
            settings = {
              "zookeeper.session.timeout.ms" = 600000;
              "zookeeper.connect" = [ "zookeeper1:2181" ];
            "log.dirs" = [ "/tmp/apache-kafka" ];
            };

          package = kafkaPackage;
          })
          (mkIf (mode == "kraft") {
            clusterId = "ak2fIHr4S8WWarOF_ODD0g";
            formatLogDirs = true;
            settings = {
              "node.id" = 1;
              "process.roles" = [
                "broker"
                "controller"
              ];
              "listeners" = [
                "PLAINTEXT://:9092"
                "CONTROLLER://:9093"
              ];
              "listener.security.protocol.map" = [
                "PLAINTEXT:PLAINTEXT"
                "CONTROLLER:PLAINTEXT"
              ];
              "controller.quorum.voters" = [
                "1@kafka:9093"
              ];
              "controller.listener.names" = [ "CONTROLLER" ];
            };
          })
        ];

        networking.firewall.allowedTCPPorts = [ 9092 ];
        networking.firewall.allowedTCPPorts = [ 9092 9093 ];
        # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
        virtualisation.memorySize = 2047;
      };
    } // optionalAttrs (mode == "zookeeper") {
      zookeeper1 = { ... }: {
        services.zookeeper = {
          enable = true;
        };

        networking.firewall.allowedTCPPorts = [ 2181 ];
      };
    };

    testScript = ''
      start_all()

      ${optionalString (mode == "zookeeper") ''
      zookeeper1.wait_for_unit("default.target")
      zookeeper1.wait_for_unit("zookeeper.service")
      zookeeper1.wait_for_open_port(2181)
      ''}

      kafka.wait_for_unit("default.target")
      kafka.wait_for_unit("apache-kafka.service")
@@ -69,12 +103,13 @@ let
  }) { inherit system; });

in with pkgs; {
  kafka_2_8  = makeKafkaTest "kafka_2_8"  apacheKafka_2_8;
  kafka_3_0  = makeKafkaTest "kafka_3_0"  apacheKafka_3_0;
  kafka_3_1  = makeKafkaTest "kafka_3_1"  apacheKafka_3_1;
  kafka_3_2  = makeKafkaTest "kafka_3_2"  apacheKafka_3_2;
  kafka_3_3  = makeKafkaTest "kafka_3_3"  apacheKafka_3_3;
  kafka_3_4  = makeKafkaTest "kafka_3_4"  apacheKafka_3_4;
  kafka_3_5  = makeKafkaTest "kafka_3_5"  apacheKafka_3_5;
  kafka  = makeKafkaTest "kafka"  apacheKafka;
  kafka_2_8 = makeKafkaTest "kafka_2_8" { kafkaPackage = apacheKafka_2_8; };
  kafka_3_0 = makeKafkaTest "kafka_3_0" { kafkaPackage = apacheKafka_3_0; };
  kafka_3_1 = makeKafkaTest "kafka_3_1" { kafkaPackage = apacheKafka_3_1; };
  kafka_3_2 = makeKafkaTest "kafka_3_2" { kafkaPackage = apacheKafka_3_2; };
  kafka_3_3 = makeKafkaTest "kafka_3_3" { kafkaPackage = apacheKafka_3_3; };
  kafka_3_4 = makeKafkaTest "kafka_3_4" { kafkaPackage = apacheKafka_3_4; };
  kafka_3_5 = makeKafkaTest "kafka_3_5" { kafkaPackage = apacheKafka_3_5; };
  kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; };
  kafka_kraft = makeKafkaTest "kafka_kraft" { kafkaPackage = apacheKafka; mode = "kraft"; };
}