Loading nixos/tests/vector/dnstap.nix +138 −29 Original line number Diff line number Diff line Loading @@ -8,6 +8,40 @@ in meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; nodes = { clickhouse = { config, pkgs, ... }: { networking.firewall.allowedTCPPorts = [ 6000 ]; services.vector = { enable = true; settings = { sources = { vector_dnstap_source = { type = "vector"; address = "[::]:6000"; }; }; sinks = { clickhouse = { type = "clickhouse"; inputs = [ "vector_dnstap_source" ]; endpoint = "http://localhost:8123"; database = "dnstap"; table = "records"; date_time_best_effort = true; }; }; }; }; services.clickhouse.enable = true; }; unbound = { config, pkgs, ... }: { Loading Loading @@ -37,6 +71,12 @@ in codec = "json"; }; }; vector_dnstap_sink = { type = "vector"; inputs = [ "dnstap" ]; address = "clickhouse:6000"; }; }; }; }; Loading Loading @@ -99,7 +139,72 @@ in }; }; testScript = '' testScript = let # work around quote/substitution complexity by Nix, Perl, bash and SQL. databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS dnstap"; tableDDL = pkgs.writeText "table.sql" '' CREATE TABLE IF NOT EXISTS dnstap.records ( timestamp DateTime64(6), dataType LowCardinality(String), dataTypeId UInt8, messageType LowCardinality(String), messageTypeId UInt8, requestData Nullable(JSON), responseData Nullable(JSON), responsePort UInt16, serverId LowCardinality(String), serverVersion LowCardinality(String), socketFamily LowCardinality(String), socketProtocol LowCardinality(String), sourceAddress String, sourcePort UInt16, ) ENGINE = MergeTree() ORDER BY (serverId, timestamp) PARTITION BY toYYYYMM(timestamp) ''; tableView = pkgs.writeText "view.sql" '' CREATE MATERIALIZED VIEW dnstap.domains_view ( timestamp DateTime64(6), serverId LowCardinality(String), domain String, record_type LowCardinality(String) ) ENGINE = MergeTree() PARTITION BY toYYYYMM(timestamp) ORDER BY (serverId, timestamp) POPULATE AS SELECT timestamp, serverId, JSONExtractString(requestData.question[1]::String, 'domainName') as domain, JSONExtractString(requestData.question[1]::String, 'questionType') as record_type FROM dnstap.records WHERE messageTypeId = 5 # ClientQuery ''; selectQuery = pkgs.writeText "select.sql" '' SELECT domain, count(domain) FROM dnstap.domains_view GROUP BY domain ''; in '' clickhouse.wait_for_unit("clickhouse") clickhouse.wait_for_open_port(6000) clickhouse.wait_for_open_port(8123) clickhouse.succeed( "cat ${databaseDDL} | clickhouse-client", "cat ${tableDDL} | clickhouse-client", "cat ${tableView} | clickhouse-client", ) unbound.wait_for_unit("unbound") unbound.wait_for_unit("vector") Loading Loading @@ -127,5 +232,9 @@ in unbound.wait_until_succeeds( "grep ClientResponse /var/lib/vector/logs.log | grep '\"domainName\":\"test.local.\"' | grep '\"rData\":\"192.168.123.5\"'" ) clickhouse.log(clickhouse.wait_until_succeeds( "cat ${selectQuery} | clickhouse-client | grep 'test.local.'" )) ''; } Loading
nixos/tests/vector/dnstap.nix +138 −29 Original line number Diff line number Diff line Loading @@ -8,6 +8,40 @@ in meta.maintainers = [ pkgs.lib.maintainers.happysalada ]; nodes = { clickhouse = { config, pkgs, ... }: { networking.firewall.allowedTCPPorts = [ 6000 ]; services.vector = { enable = true; settings = { sources = { vector_dnstap_source = { type = "vector"; address = "[::]:6000"; }; }; sinks = { clickhouse = { type = "clickhouse"; inputs = [ "vector_dnstap_source" ]; endpoint = "http://localhost:8123"; database = "dnstap"; table = "records"; date_time_best_effort = true; }; }; }; }; services.clickhouse.enable = true; }; unbound = { config, pkgs, ... }: { Loading Loading @@ -37,6 +71,12 @@ in codec = "json"; }; }; vector_dnstap_sink = { type = "vector"; inputs = [ "dnstap" ]; address = "clickhouse:6000"; }; }; }; }; Loading Loading @@ -99,7 +139,72 @@ in }; }; testScript = '' testScript = let # work around quote/substitution complexity by Nix, Perl, bash and SQL. databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS dnstap"; tableDDL = pkgs.writeText "table.sql" '' CREATE TABLE IF NOT EXISTS dnstap.records ( timestamp DateTime64(6), dataType LowCardinality(String), dataTypeId UInt8, messageType LowCardinality(String), messageTypeId UInt8, requestData Nullable(JSON), responseData Nullable(JSON), responsePort UInt16, serverId LowCardinality(String), serverVersion LowCardinality(String), socketFamily LowCardinality(String), socketProtocol LowCardinality(String), sourceAddress String, sourcePort UInt16, ) ENGINE = MergeTree() ORDER BY (serverId, timestamp) PARTITION BY toYYYYMM(timestamp) ''; tableView = pkgs.writeText "view.sql" '' CREATE MATERIALIZED VIEW dnstap.domains_view ( timestamp DateTime64(6), serverId LowCardinality(String), domain String, record_type LowCardinality(String) ) ENGINE = MergeTree() PARTITION BY toYYYYMM(timestamp) ORDER BY (serverId, timestamp) POPULATE AS SELECT timestamp, serverId, JSONExtractString(requestData.question[1]::String, 'domainName') as domain, JSONExtractString(requestData.question[1]::String, 'questionType') as record_type FROM dnstap.records WHERE messageTypeId = 5 # ClientQuery ''; selectQuery = pkgs.writeText "select.sql" '' SELECT domain, count(domain) FROM dnstap.domains_view GROUP BY domain ''; in '' clickhouse.wait_for_unit("clickhouse") clickhouse.wait_for_open_port(6000) clickhouse.wait_for_open_port(8123) clickhouse.succeed( "cat ${databaseDDL} | clickhouse-client", "cat ${tableDDL} | clickhouse-client", "cat ${tableView} | clickhouse-client", ) unbound.wait_for_unit("unbound") unbound.wait_for_unit("vector") Loading Loading @@ -127,5 +232,9 @@ in unbound.wait_until_succeeds( "grep ClientResponse /var/lib/vector/logs.log | grep '\"domainName\":\"test.local.\"' | grep '\"rData\":\"192.168.123.5\"'" ) clickhouse.log(clickhouse.wait_until_succeeds( "cat ${selectQuery} | clickhouse-client | grep 'test.local.'" )) ''; }