Unverified Commit c654f663 authored by Yt's avatar Yt Committed by GitHub
Browse files

nixos/vector: Add journald+ClickHouse test (#405185)

parents 06ec618a 8932afec
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@
  file-sink = import ./file-sink.nix { inherit system pkgs; };
  api = import ./api.nix { inherit system pkgs; };
  dnstap = import ./dnstap.nix { inherit system pkgs; };
  journald-clickhouse = import ./journald-clickhouse.nix { inherit system pkgs; };
  nginx-clickhouse = import ./nginx-clickhouse.nix { inherit system pkgs; };
  syslog-quickwit = import ./syslog-quickwit.nix { inherit system pkgs; };
}
+157 −0
Original line number Diff line number Diff line
import ../make-test-python.nix (
  { lib, pkgs, ... }:
  let
    # Take the original journald message and create a new payload which only
    # contains the relevant fields - these must match the database columns.
    journalVrlRemapTransform = {
      journald_remap = {
        inputs = [ "journald" ];
        type = "remap";
        source = ''
          m = {}
          m.app = .SYSLOG_IDENTIFIER
          m.host = .host
          m.severity = to_int(.PRIORITY) ?? 0
          m.level = to_syslog_level(m.severity) ?? ""
          m.message = strip_ansi_escape_codes!(.message)
          m.timestamp = .timestamp
          m.uid = to_int(._UID) ?? 0
          m.pid = to_int(._PID) ?? 0
          . = [m]
        '';
      };
    };
  in
  {
    name = "vector-journald-clickhouse";
    meta.maintainers = [ pkgs.lib.maintainers.happysalada ];

    nodes = {
      clickhouse =
        { config, pkgs, ... }:
        {
          virtualisation.diskSize = 5 * 1024;
          virtualisation.memorySize = 4096;

          networking.firewall.allowedTCPPorts = [ 6000 ];

          services.vector = {
            enable = true;
            journaldAccess = true;

            settings = {
              sources = {
                journald = {
                  type = "journald";
                };

                vector_source = {
                  type = "vector";
                  address = "[::]:6000";
                };
              };

              transforms = journalVrlRemapTransform;

              sinks = {
                clickhouse = {
                  type = "clickhouse";
                  inputs = [
                    "journald_remap"
                    "vector_source"
                  ];
                  endpoint = "http://localhost:8123";
                  database = "journald";
                  table = "logs";
                  date_time_best_effort = true;
                };
              };
            };

          };

          services.clickhouse = {
            enable = true;
          };
        };

      vector =
        { config, pkgs, ... }:
        {
          services.vector = {
            enable = true;
            journaldAccess = true;

            settings = {
              sources = {
                journald = {
                  type = "journald";
                };
              };

              transforms = journalVrlRemapTransform;

              sinks = {
                vector_sink = {
                  type = "vector";
                  inputs = [ "journald_remap" ];
                  address = "clickhouse:6000";
                };
              };
            };
          };
        };
    };

    testScript =
      let
        # work around quote/substitution complexity by Nix, Perl, bash and SQL.
        databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald";

        # https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry
        tableDDL = pkgs.writeText "table.sql" ''
          CREATE TABLE IF NOT EXISTS journald.logs (
            timestamp DateTime64(6),
            app LowCardinality(String),
            host LowCardinality(String),
            level LowCardinality(String),
            severity UInt8,
            message String,
            uid UInt16,
            pid UInt32,
          )
          ENGINE = MergeTree()
          ORDER BY (host, app, timestamp)
          PARTITION BY toYYYYMM(timestamp)
        '';

        selectQuery = pkgs.writeText "select.sql" ''
          SELECT COUNT(host) FROM journald.logs
          WHERE message LIKE '%Vector has started%'
        '';
      in
      ''
        clickhouse.wait_for_unit("clickhouse")
        clickhouse.wait_for_open_port(6000)
        clickhouse.wait_for_open_port(8123)

        clickhouse.succeed(
          "cat ${databaseDDL} | clickhouse-client"
        )

        clickhouse.succeed(
          "cat ${tableDDL} | clickhouse-client"
        )

        for machine in clickhouse, vector:
          machine.wait_for_unit("vector")
          machine.wait_until_succeeds(
            "journalctl -o cat -u vector.service | grep 'Vector has started'"
          )

        clickhouse.wait_until_succeeds(
          "cat ${selectQuery} | clickhouse-client | grep 2"
        )
      '';
  }
)