Unverified Commit 110afff2 authored by Peder Bergebakken Sundt's avatar Peder Bergebakken Sundt Committed by GitHub
Browse files

nixos/kafka: Added cluster/MirrorMaker testcase (#317496)

parents 2ab76280 4fd8bff9
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -655,7 +655,7 @@ in
  jool = import ./jool.nix { inherit pkgs runTest; };
  jotta-cli = handleTest ./jotta-cli.nix { };
  k3s = handleTest ./k3s { };
  kafka = handleTest ./kafka.nix { };
  kafka = handleTest ./kafka { };
  kanboard = runTest ./web-apps/kanboard.nix;
  kanidm = handleTest ./kanidm.nix { };
  kanidm-provisioning = handleTest ./kanidm-provisioning.nix { };
+5 −7
Original line number Diff line number Diff line
{
  system ? builtins.currentSystem,
  config ? { },
  pkgs ? import ../.. { inherit system config; },
}:
{ pkgs, ... }:

with pkgs.lib;

@@ -13,7 +9,7 @@ let
      kafkaPackage,
      mode ? "kraft",
    }:
    (import ./make-test-python.nix ({
    (import ../make-test-python.nix ({
      inherit name;
      meta = with pkgs.lib.maintainers; {
        maintainers = [ nequissimus ];
@@ -71,6 +67,7 @@ let
                9092
                9093
              ];
              virtualisation.diskSize = 1024;
              # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
              virtualisation.memorySize = 2047;
            };
@@ -84,6 +81,7 @@ let
              };

              networking.firewall.allowedTCPPorts = [ 2181 ];
              virtualisation.diskSize = 1024;
            };
        };

@@ -116,7 +114,7 @@ let
            + "--from-beginning --max-messages 1"
        )
      '';
    }) { inherit system; });
    }));

in
with pkgs;
+199 −0
Original line number Diff line number Diff line
import ../make-test-python.nix (
  { lib, pkgs, ... }:

  let
    inherit (lib) mkMerge;

    # Generate with `kafka-storage.sh random-uuid`
    clusterId = "ii5pZE5LRkSeWrnyBhMOYQ";

    kafkaConfig = {
      networking.firewall.allowedTCPPorts = [
        9092
        9093
      ];

      virtualisation.diskSize = 1024;
      virtualisation.memorySize = 1024 * 2;

      environment.systemPackages = [ pkgs.apacheKafka ];

      services.apache-kafka = {
        enable = true;

        clusterId = "${clusterId}";

        formatLogDirs = true;

        settings = {
          listeners = [
            "PLAINTEXT://:9092"
            "CONTROLLER://:9093"
          ];
          "listener.security.protocol.map" = [
            "PLAINTEXT:PLAINTEXT"
            "CONTROLLER:PLAINTEXT"
          ];
          "controller.quorum.voters" = lib.imap1 (i: name: "${toString i}@${name}:9093") (
            builtins.attrNames kafkaNodes
          );
          "controller.listener.names" = [ "CONTROLLER" ];

          "process.roles" = [
            "broker"
            "controller"
          ];

          "log.dirs" = [ "/var/lib/apache-kafka" ];
          "num.partitions" = 6;
          "offsets.topic.replication.factor" = 2;
          "transaction.state.log.replication.factor" = 2;
          "transaction.state.log.min.isr" = 2;
        };
      };

      systemd.services.apache-kafka = {
        after = [ "network-online.target" ];
        requires = [ "network-online.target" ];
        serviceConfig.StateDirectory = "apache-kafka";
      };
    };

    extraKafkaConfig = {
      kafka1 = {
        services.apache-kafka.settings = {
          "node.id" = 1;
          "broker.rack" = 1;
        };
      };

      kafka2 = {
        services.apache-kafka.settings = {
          "node.id" = 2;
          "broker.rack" = 2;
        };
      };

      kafka3 = {
        services.apache-kafka.settings = {
          "node.id" = 3;
          "broker.rack" = 3;
        };
      };

      kafka4 = {
        services.apache-kafka.settings = {
          "node.id" = 4;
          "broker.rack" = 3;
        };
      };
    };

    kafkaNodes = builtins.mapAttrs (
      _: val:
      mkMerge [
        val
        kafkaConfig
      ]
    ) extraKafkaConfig;
  in
  {
    name = "kafka-cluster";
    meta = with pkgs.lib.maintainers; {
      maintainers = [ jpds ];
    };

    nodes = {
      inherit (kafkaNodes)
        kafka1
        kafka2
        kafka3
        kafka4
        ;

      client =
        { config, ... }:
        {
          environment.systemPackages = [ pkgs.apacheKafka ];
          virtualisation.diskSize = 1024;
        };
    };

    testScript = ''
      import json

      for machine in kafka1, kafka2, kafka3, kafka4:
        machine.wait_for_unit("apache-kafka")

      for machine in kafka1, kafka2, kafka3, kafka4:
        machine.wait_for_open_port(9092)
        machine.wait_for_open_port(9093)

        machine.wait_until_succeeds(
          "journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'"
        )

        machine.wait_until_succeeds(
          "journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'"
        )

        machine.wait_until_succeeds(
          "journalctl -o cat -u apache-kafka.service | grep 'BrokerLifecycleManager' | grep 'Incarnation [[:graph:]]\+ of broker [[:digit:]] in cluster ${clusterId}'"
        )

      current_voters_json = kafka1.wait_until_succeeds(
        "kafka-metadata-quorum.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 describe --status | grep CurrentVoters"
      ).replace("CurrentVoters:", "")

      voters = json.loads(current_voters_json)

      assert len(voters) == 4

      kafka1.wait_until_succeeds(
        "kafka-topics.sh --bootstrap-server kafka1:9092 --create --topic test-123 --replication-factor 2"
      )

      for machine in kafka1, kafka2, kafka3, kafka4:
        machine.wait_until_succeeds(
          "journalctl -o cat -u apache-kafka.service | grep -E 'Created log for partition test-123-[[:digit:]] in /var/lib/apache-kafka/test-123-[[:digit:]] with properties'"
        )

      kafka1.wait_until_succeeds(
        "kafka-topics.sh --bootstrap-server=kafka1:9092 --describe --topic test-123 | "
        + "grep 'PartitionCount: 6'"
      )

      # Should never see a replica on both 3 and 4 as they're in the same rack
      kafka1.fail(
        "kafka-topics.sh --bootstrap-server=kafka1:9092 --describe --topic test-123 | "
        + "grep -E 'Replicas: (3,4|4,3)'"
      )

      client.succeed(
          "echo 'test 2' | "
          + "kafka-console-producer.sh "
          + "--bootstrap-server kafka1:9092 "
          + "--topic test-123"
      )
      assert "test 2" in client.succeed(
          "kafka-console-consumer.sh "
          + "--bootstrap-server kafka2:9092 --topic test-123 "
          + "--group readtest "
          + "--from-beginning --max-messages 1"
      )

      client.succeed(
          "echo 'test 3' | "
          + "kafka-console-producer.sh "
          + "--bootstrap-server kafka2:9092 "
          + "--topic test-123"
      )
      assert "test 3" in client.succeed(
          "kafka-console-consumer.sh "
          + "--bootstrap-server kafka3:9092 --topic test-123 "
          + "--group readtest "
          + "--max-messages 1"
      )
    '';
  }
)
+11 −0
Original line number Diff line number Diff line
{
  system ? builtins.currentSystem,
  config ? { },
  pkgs ? import ../../.. { inherit system config; },
}:

{
  base = import ./base.nix { inherit system pkgs; };
  cluster = import ./cluster.nix { inherit system pkgs; };
  mirrormaker = import ./mirrormaker.nix { inherit system pkgs; };
}
+240 −0
Original line number Diff line number Diff line
import ../make-test-python.nix (
  { lib, pkgs, ... }:

  let
    inherit (lib) mkMerge;

    # Generate with `kafka-storage.sh random-uuid`
    clusterAId = "ihzlrasUQ9O3Yy0ZWYkd6w";

    clusterBId = "Bnu_zrzKRH6-7KcK7t3I5Q";

    kafkaConfig = {
      networking.firewall.allowedTCPPorts = [
        9092
        9093
      ];

      virtualisation.diskSize = 1024;
      virtualisation.memorySize = 1024 * 2;

      environment.systemPackages = [ pkgs.apacheKafka ];

      services.apache-kafka = {
        enable = true;

        formatLogDirs = true;

        settings = {
          listeners = [
            "PLAINTEXT://:9092"
            "CONTROLLER://:9093"
          ];
          "listener.security.protocol.map" = [
            "PLAINTEXT:PLAINTEXT"
            "CONTROLLER:PLAINTEXT"
          ];
          "controller.listener.names" = [ "CONTROLLER" ];

          "process.roles" = [
            "broker"
            "controller"
          ];

          "log.dirs" = [ "/var/lib/apache-kafka" ];
          "num.partitions" = 1;
          "offsets.topic.replication.factor" = 1;
          "transaction.state.log.replication.factor" = 1;
          "transaction.state.log.min.isr" = 1;
        };
      };

      systemd.services.apache-kafka = {
        after = [ "network-online.target" ];
        requires = [ "network-online.target" ];
        serviceConfig.StateDirectory = "apache-kafka";
      };
    };

    extraKafkaConfig = {
      kafkaa1 = {
        services.apache-kafka = {
          clusterId = "${clusterAId}";

          settings = {
            "node.id" = 1;
            "controller.quorum.voters" = [ "1@kafkaa1:9093" ];
          };
        };
      };

      kafkab1 = {
        services.apache-kafka = {
          clusterId = "${clusterBId}";

          settings = {
            "node.id" = 1;
            "controller.quorum.voters" = [ "1@kafkab1:9093" ];
          };
        };
      };
    };

    kafkaNodes = builtins.mapAttrs (
      _: val:
      mkMerge [
        val
        kafkaConfig
      ]
    ) extraKafkaConfig;

    mirrorMakerProperties = pkgs.writeText "mm2.properties" ''
      name = A->B

      clusters = A, B

      A.bootstrap.servers = kafkaa1:9092
      B.bootstrap.servers = kafkab1:9092

      A->B.enabled = true
      A->B.topics = .*

      B->A.enabled = false
      B->A.topics = .*

      replication.factor=1
      replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

      tasks.max = 2
      refresh.topics.enabled = true
      refresh.topics.interval.seconds = 5
      sync.topic.configs.enabled = true

      checkpoints.topic.replication.factor=1
      heartbeats.topic.replication.factor=1
      offset-syncs.topic.replication.factor=1

      offset.storage.replication.factor=1
      status.storage.replication.factor=1
      config.storage.replication.factor=1

      emit.checkpoints.enabled = true
      emit.checkpoints.interval.seconds = 5
    '';
  in
  {
    name = "kafka-mirrormaker";
    meta = with pkgs.lib.maintainers; {
      maintainers = [ jpds ];
    };

    nodes = {
      inherit (kafkaNodes) kafkaa1 kafkab1;

      mirrormaker =
        { config, ... }:
        {
          virtualisation.diskSize = 1024;
          virtualisation.memorySize = 1024 * 2;

          # Define a mirrormaker systemd service
          systemd.services.kafka-connect-mirror-maker = {
            after = [ "network-online.target" ];
            requires = [ "network-online.target" ];
            wantedBy = [ "multi-user.target" ];

            serviceConfig = {
              ExecStart = ''
                ${pkgs.apacheKafka}/bin/connect-mirror-maker.sh ${mirrorMakerProperties}
              '';
              Restart = "on-failure";
              RestartSec = "5s";
            };
          };
        };
    };

    testScript = ''
      import json

      for machine in kafkaa1, kafkab1:
        machine.wait_for_unit("apache-kafka")

      for machine in kafkaa1, kafkab1:
        machine.wait_for_open_port(9092)
        machine.wait_for_open_port(9093)

        machine.wait_until_succeeds(
          "journalctl -o cat -u apache-kafka.service | grep 'Transition from STARTING to STARTED'"
        )

        machine.wait_until_succeeds(
          "journalctl -o cat -u apache-kafka.service | grep 'Kafka Server started'"
        )

      for machine in kafkaa1, kafkab1:
        current_voters_json = machine.wait_until_succeeds(
          f"kafka-metadata-quorum.sh --bootstrap-server {machine.name}:9092 describe --status | grep CurrentVoters"
        ).replace("CurrentVoters:", "")

        voters = json.loads(current_voters_json)

        assert len(voters) == 1

      mirrormaker.wait_for_unit("kafka-connect-mirror-maker")

      mirrormaker.wait_until_succeeds(
        "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Kafka MirrorMaker initializing'"
      )
      mirrormaker.wait_until_succeeds(
        "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Targeting clusters \[A, B\]'"
      )
      mirrormaker.wait_until_succeeds(
        "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[Worker clientId=A->B, groupId=A-mm2\] Finished starting connectors and tasks'"
      )

      mirrormaker.wait_until_succeeds(
        """
          journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'INFO \[MirrorSourceConnector\|task-0\] \[Producer clientId=A->B\|A->B-0\|offset-syncs-source-producer\] Cluster ID: ${clusterAId}'
        """
      )

      kafkaa1.wait_until_succeeds(
        "journalctl -o cat -u apache-kafka.service | grep 'Stabilized group B-mm2'"
      )

      kafkab1.wait_until_succeeds(
        "journalctl -o cat -u apache-kafka.service | grep 'Stabilized group A-mm2'"
      )

      kafkaa1.wait_until_succeeds(
        "kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mm-1 --partitions 1 --replication-factor 1"
      )

      for machine in kafkaa1, kafkab1:
        machine.succeed(
          "kafka-topics.sh --bootstrap-server localhost:9092 --list | grep 'test-mm-1'"
        )

      mirrormaker.wait_until_succeeds(
        "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'replicating [[:digit:]]\+ topic-partitions A->B: \[test-mm-1-0\]'"
      )

      mirrormaker.wait_until_succeeds(
        "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Found [[:digit:]]\+ new topic-partitions on A'"
      )

      kafkaa1.wait_until_succeeds(
        "kafka-verifiable-producer.sh --bootstrap-server kafkaa1:9092 --throughput 10 --max-messages 100 --topic test-mm-1"
      )

      mirrormaker.wait_until_succeeds(
        "journalctl -o cat -u kafka-connect-mirror-maker.service | grep 'Committing offsets for [[:digit:]]\+ acknowledged messages'"
      )

      kafkab1.wait_until_succeeds(
        "kafka-verifiable-consumer.sh --bootstrap-server kafkab1:9092 --topic test-mm-1 --group-id testreplication --max-messages 100"
      )
    '';
  }
)