Unverified Commit efd138d0 authored by misuzu's avatar misuzu Committed by GitHub
Browse files

nixos/temporal: init module (#436466)

parents 97dc7178 f20852e4
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -98,6 +98,10 @@

- [KMinion](https://github.com/redpanda-data/kminion), feature-rich Prometheus exporter for Apache Kafka. Available as [services.prometheus.exporters.kafka](options.html#opt-services.prometheus.exporters.kafka).

- [Temporal](https://temporal.io/), a durable execution platform that enables
  developers to build scalable applications without sacrificing productivity or
  reliability. Available as [services.temporal](#opt-services.temporal.enable).

## Backward Incompatibilities {#sec-release-25.11-incompatibilities}

<!-- To avoid merge conflicts, consider adding your item at an arbitrary place in the list instead. -->
+1 −0
Original line number Diff line number Diff line
@@ -484,6 +484,7 @@
  ./services/cluster/patroni/default.nix
  ./services/cluster/rke2/default.nix
  ./services/cluster/spark/default.nix
  ./services/cluster/temporal/default.nix
  ./services/computing/boinc/client.nix
  ./services/computing/foldingathome/client.nix
  ./services/computing/slurm/slurm.nix
+146 −0
Original line number Diff line number Diff line
{
  config,
  lib,
  pkgs,
  ...
}:
let
  cfg = config.services.temporal;

  settingsFormat = pkgs.formats.yaml { };

  usingDefaultDataDir = cfg.dataDir == "/var/lib/temporal";
  usingDefaultUserAndGroup = cfg.user == "temporal" && cfg.group == "temporal";
in
{
  meta.maintainers = [ lib.maintainers.jpds ];

  options.services.temporal = {
    enable = lib.mkEnableOption "Temporal";

    package = lib.mkPackageOption pkgs "Temporal" {
      default = [ "temporal" ];
    };

    settings = lib.mkOption {
      type = lib.types.submodule {
        freeformType = settingsFormat.type;
      };

      description = ''
        Temporal configuration.

        See <https://docs.temporal.io/references/configuration> for more
        information about Temporal configuration options
      '';
    };

    dataDir = lib.mkOption {
      type = lib.types.path;
      default = "/var/lib/temporal";
      apply = lib.converge (lib.removeSuffix "/");
      description = ''
        Data directory for Temporal. If you change this, you need to
        manually create the directory. You also need to create the
        `temporal` user and group, or change
        [](#opt-services.temporal.user) and
        [](#opt-services.temporal.group) to existing ones with
        access to the directory.
      '';
    };

    user = lib.mkOption {
      type = lib.types.str;
      default = "temporal";
      description = ''
        The user Temporal runs as. Should be left at default unless
        you have very specific needs.
      '';
    };

    group = lib.mkOption {
      type = lib.types.str;
      default = "temporal";
      description = ''
        The group temporal runs as. Should be left at default unless
        you have very specific needs.
      '';
    };

    restartIfChanged = lib.mkOption {
      type = lib.types.bool;
      description = ''
        Automatically restart the service on config change.
        This can be set to false to defer restarts on a server or cluster.
        Please consider the security implications of inadvertently running an older version,
        and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
      '';
      default = true;
    };
  };

  config = lib.mkIf cfg.enable {
    environment.etc."temporal/temporal-server.yaml".source =
      settingsFormat.generate "temporal-server.yaml" cfg.settings;

    systemd.services.temporal = {
      description = "Temporal server";
      wantedBy = [ "multi-user.target" ];
      after = [ "network.target" ];
      inherit (cfg) restartIfChanged;
      restartTriggers = [ config.environment.etc."temporal/temporal-server.yaml".source ];
      environment = {
        HOME = cfg.dataDir;
      };
      serviceConfig = {
        ExecStart = ''
          ${cfg.package}/bin/temporal-server --root / --config /etc/temporal/ -e temporal-server start
        '';
        User = cfg.user;
        Group = cfg.group;
        Restart = "on-failure";
        DynamicUser = usingDefaultUserAndGroup && usingDefaultDataDir;
        CapabilityBoundingSet = [ "" ];
        DevicePolicy = "closed";
        LockPersonality = true;
        MemoryDenyWriteExecute = true;
        NoNewPrivileges = true;
        PrivateDevices = true;
        ProcSubset = "pid";
        ProtectClock = true;
        ProtectHome = true;
        ProtectHostname = true;
        ProtectControlGroups = true;
        ProtectKernelLogs = true;
        ProtectKernelModules = true;
        ProtectKernelTunables = true;
        ProtectProc = "invisible";
        ProtectSystem = "strict";
        ReadWritePaths = [
          cfg.dataDir
        ];
        RestrictAddressFamilies = [
          "AF_NETLINK"
          "AF_INET"
          "AF_INET6"
        ];
        RestrictNamespaces = true;
        RestrictRealtime = true;
        RestrictSUIDSGID = true;
        SystemCallArchitectures = "native";
        SystemCallFilter = [
          # 1. allow a reasonable set of syscalls
          "@system-service @resources"
          # 2. and deny unreasonable ones
          "~@privileged"
          # 3. then allow the required subset within denied groups
          "@chown"
        ];
      }
      // (lib.optionalAttrs (usingDefaultDataDir) {
        StateDirectory = "temporal";
        StateDirectoryMode = "0700";
      });
    };
  };
}
+1 −0
Original line number Diff line number Diff line
@@ -1488,6 +1488,7 @@ in
  teleport = handleTest ./teleport.nix { };
  teleports = runTest ./teleports.nix;
  thelounge = handleTest ./thelounge.nix { };
  temporal = runTest ./temporal.nix;
  terminal-emulators = handleTest ./terminal-emulators.nix { };
  thanos = runTest ./thanos.nix;
  tiddlywiki = runTest ./tiddlywiki.nix;
+311 −0
Original line number Diff line number Diff line
(
  { lib, pkgs, ... }:

  {
    name = "temporal";
    meta.maintainers = [ pkgs.lib.maintainers.jpds ];

    nodes = {
      temporal =
        { config, pkgs, ... }:
        {
          networking.firewall.allowedTCPPorts = [ 7233 ];

          environment.systemPackages = [
            (pkgs.writers.writePython3Bin "temporal-hello-workflow.py"
              {
                libraries = [ pkgs.python3Packages.temporalio ];
              }
              # Graciously taken from https://github.com/temporalio/samples-python/blob/main/hello/hello_activity.py
              ''
                import asyncio
                from concurrent.futures import ThreadPoolExecutor
                from dataclasses import dataclass
                from datetime import timedelta

                from temporalio import activity, workflow
                from temporalio.client import Client
                from temporalio.worker import Worker


                # While we could use multiple parameters in the activity, Temporal strongly
                # encourages using a single dataclass instead which can have fields added to it
                # in a backwards-compatible way.
                @dataclass
                class ComposeGreetingInput:
                    greeting: str
                    name: str


                # Basic activity that logs and does string concatenation
                @activity.defn
                def compose_greeting(input: ComposeGreetingInput) -> str:
                    activity.logger.info("Running activity with parameter %s" % input)
                    return f"{input.greeting}, {input.name}!"


                # Basic workflow that logs and invokes an activity
                @workflow.defn
                class GreetingWorkflow:
                    @workflow.run
                    async def run(self, name: str) -> str:
                        workflow.logger.info("Running workflow with parameter %s" % name)
                        return await workflow.execute_activity(
                            compose_greeting,
                            ComposeGreetingInput("Hello", name),
                            start_to_close_timeout=timedelta(seconds=10),
                        )


                async def main():
                    # Uncomment the lines below to see logging output
                    # import logging
                    # logging.basicConfig(level=logging.INFO)

                    # Start client
                    client = await Client.connect("localhost:7233")

                    # Run a worker for the workflow
                    async with Worker(
                        client,
                        task_queue="hello-activity-task-queue",
                        workflows=[GreetingWorkflow],
                        activities=[compose_greeting],
                        # Non-async activities require an executor;
                        # a thread pool executor is recommended.
                        # This same thread pool could be passed to multiple workers if desired.
                        activity_executor=ThreadPoolExecutor(5),
                    ):

                        # While the worker is running, use the client to run the workflow and
                        # print out its result. Note, in many production setups, the client
                        # would be in a completely separate process from the worker.
                        result = await client.execute_workflow(
                            GreetingWorkflow.run,
                            "World",
                            id="hello-activity-workflow-id",
                            task_queue="hello-activity-task-queue",
                        )
                        print(f"Result: {result}")


                if __name__ == "__main__":
                    asyncio.run(main())
              ''
            )
            pkgs.temporal-cli
          ];

          services.temporal = {
            enable = true;
            settings = {
              # Based on https://github.com/temporalio/temporal/blob/main/config/development-sqlite.yaml
              log = {
                stdout = true;
                level = "info";
              };
              services = {
                frontend = {
                  rpc = {
                    grpcPort = 7233;
                    membershipPort = 6933;
                    bindOnLocalHost = true;
                    httpPort = 7243;
                  };
                };
                matching = {
                  rpc = {
                    grpcPort = 7235;
                    membershipPort = 6935;
                    bindOnLocalHost = true;
                  };
                };
                history = {
                  rpc = {
                    grpcPort = 7234;
                    membershipPort = 6934;
                    bindOnLocalHost = true;
                  };
                };
                worker = {
                  rpc = {
                    grpcPort = 7239;
                    membershipPort = 6939;
                    bindOnLocalHost = true;
                  };
                };
              };

              persistence = {
                defaultStore = "sqlite-default";
                visibilityStore = "sqlite-visibility";
                numHistoryShards = 1;
                datastores = {
                  sqlite-default = {
                    sql = {
                      user = "";
                      password = "";
                      pluginName = "sqlite";
                      databaseName = "default";
                      connectAddr = "localhost";
                      connectProtocol = "tcp";
                      connectAttributes = {
                        mode = "memory";
                        cache = "private";
                      };
                      maxConns = 1;
                      maxIdleConns = 1;
                      maxConnLifetime = "1h";
                      tls = {
                        enabled = false;
                        caFile = "";
                        certFile = "";
                        keyFile = "";
                        enableHostVerification = false;
                        serverName = "";
                      };
                    };
                  };
                  sqlite-visibility = {
                    sql = {
                      user = "";
                      password = "";
                      pluginName = "sqlite";
                      databaseName = "default";
                      connectAddr = "localhost";
                      connectProtocol = "tcp";
                      connectAttributes = {
                        mode = "memory";
                        cache = "private";
                      };
                      maxConns = 1;
                      maxIdleConns = 1;
                      maxConnLifetime = "1h";
                      tls = {
                        enabled = false;
                        caFile = "";
                        certFile = "";
                        keyFile = "";
                        enableHostVerification = false;
                        serverName = "";
                      };
                    };
                  };
                };
              };
              clusterMetadata = {
                enableGlobalNamespace = false;
                failoverVersionIncrement = 10;
                masterClusterName = "active";
                currentClusterName = "active";
                clusterInformation = {
                  active = {
                    enabled = true;
                    initialFailoverVersion = 1;
                    rpcName = "frontend";
                    rpcAddress = "localhost:7233";
                    httpAddress = "localhost:7243";
                  };
                };
              };

              dcRedirectionPolicy = {
                policy = "noop";
              };

              archival = {
                history = {
                  state = "enabled";
                  enableRead = true;
                  provider = {
                    filestore = {
                      fileMode = "0666";
                      dirMode = "0766";
                    };
                    gstorage = {
                      credentialsPath = "/tmp/gcloud/keyfile.json";
                    };
                  };
                };
                visibility = {
                  state = "enabled";
                  enableRead = true;
                  provider = {
                    filestore = {
                      fileMode = "0666";
                      dirMode = "0766";
                    };
                  };
                };
              };

              namespaceDefaults = {
                archival = {
                  history = {
                    state = "disabled";
                    URI = "file:///tmp/temporal_archival/development";
                  };
                  visibility = {
                    state = "disabled";
                    URI = "file:///tmp/temporal_vis_archival/development";
                  };
                };
              };
            };
          };
        };
    };

    testScript = ''
      temporal.wait_for_unit("temporal")
      temporal.wait_for_open_port(6933)
      temporal.wait_for_open_port(6934)
      temporal.wait_for_open_port(6935)
      temporal.wait_for_open_port(7233)
      temporal.wait_for_open_port(7234)
      temporal.wait_for_open_port(7235)

      temporal.wait_until_succeeds(
        "journalctl -o cat -u temporal.service | grep 'server-version' | grep '${pkgs.temporal.version}'"
      )

      temporal.wait_until_succeeds(
        "journalctl -o cat -u temporal.service | grep 'Frontend is now healthy'"
      )

      import json
      cluster_list_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster list --output json"))
      assert cluster_list_json[0]['clusterName'] == "active"

      cluster_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster describe --output json"))
      assert cluster_describe_json['serverVersion'] in "${pkgs.temporal.version}"

      temporal.log(temporal.wait_until_succeeds("temporal operator namespace create --namespace default"))

      temporal.wait_until_succeeds(
        "journalctl -o cat -u temporal.service | grep 'Register namespace succeeded'"
      )

      namespace_list_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace list --output json"))
      assert len(namespace_list_json) == 2

      namespace_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace describe --output json --namespace default"))
      assert namespace_describe_json['namespaceInfo']['name'] == "default"
      assert namespace_describe_json['namespaceInfo']['state'] == "NAMESPACE_STATE_REGISTERED"

      workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json"))
      assert len(workflow_json) == 0

      out = temporal.wait_until_succeeds("temporal-hello-workflow.py")
      assert "Result: Hello, World!" in out

      workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json"))
      assert workflow_json[0]['execution']['workflowId'] == "hello-activity-workflow-id"
      assert workflow_json[0]['status'] == "WORKFLOW_EXECUTION_STATUS_COMPLETED"

      temporal.log(temporal.succeed(
        "systemd-analyze security temporal.service | grep -v '✓'"
      ))
    '';
  }
)
Loading