package main import ( "math/rand" "net/http" "os" "os/exec" "regexp" "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/weaveworks/kured/pkg/alerts" "github.com/weaveworks/kured/pkg/daemonsetlock" "github.com/weaveworks/kured/pkg/delaytick" "github.com/weaveworks/kured/pkg/notifications/slack" ) var ( version = "unreleased" // Command line flags period time.Duration dsNamespace string dsName string lockAnnotation string prometheusURL string alertFilter *regexp.Regexp rebootSentinel string forceRebootSentinel string slackHookURL string slackUsername string // Metrics rebootRequiredGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "kured", Name: "reboot_required", Help: "OS requires reboot due to software updates.", }, []string{"node"}) ) func init() { prometheus.MustRegister(rebootRequiredGauge) } func main() { rootCmd := &cobra.Command{ Use: "kured", Short: "Kubernetes Reboot Daemon", Run: root} rootCmd.PersistentFlags().DurationVar(&period, "period", time.Minute*60, "reboot check period") rootCmd.PersistentFlags().StringVar(&dsNamespace, "ds-namespace", "kube-system", "namespace containing daemonset on which to place lock") rootCmd.PersistentFlags().StringVar(&dsName, "ds-name", "kured", "name of daemonset on which to place lock") rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", "weave.works/kured-node-lock", "annotation in which to record locking node") rootCmd.PersistentFlags().StringVar(&prometheusURL, "prometheus-url", "", "Prometheus instance to probe for active alerts") rootCmd.PersistentFlags().Var(®expValue{&alertFilter}, "alert-filter-regexp", "alert names to ignore when checking for active alerts") rootCmd.PersistentFlags().StringVar(&rebootSentinel, "reboot-sentinel", "/var/run/reboot-required", "path to file whose existence signals need to reboot") rootCmd.PersistentFlags().StringVar(&forceRebootSentinel, "force-reboot-sentinel", "/var/run/force-reboot-required", "path to file whose existence signals need to force reboot") rootCmd.PersistentFlags().StringVar(&slackHookURL, "slack-hook-url", "", "slack hook URL for reboot notfications") rootCmd.PersistentFlags().StringVar(&slackUsername, "slack-username", "kured", "slack username for reboot notfications") if err := rootCmd.Execute(); err != nil { log.Fatal(err) } } // newCommand creates a new Command with stdout/stderr wired to our standard logger func newCommand(name string, arg ...string) *exec.Cmd { cmd := exec.Command(name, arg...) cmd.Stdout = log.NewEntry(log.StandardLogger()). WithField("cmd", cmd.Args[0]). WithField("std", "out"). WriterLevel(log.InfoLevel) cmd.Stderr = log.NewEntry(log.StandardLogger()). WithField("cmd", cmd.Args[0]). WithField("std", "err"). WriterLevel(log.WarnLevel) return cmd } func sentinelExists() bool { _, err := os.Stat(rebootSentinel) switch { case err == nil: return true case os.IsNotExist(err): return false default: log.Fatalf("Unable to determine existence of sentinel: %v", err) return false // unreachable; prevents compilation error } } func forceRebootsentinelExists() bool { _, err := os.Stat(forceRebootSentinel) switch { case err == nil: return true case os.IsNotExist(err): return false default: log.Fatalf("Unable to determine existence of force reboot sentinel: %v", err) return false // unreachable; prevents compilation error } } func rebootRequired() bool { if sentinelExists() || forceRebootsentinelExists() { log.Infof("Reboot required") return true } else { log.Infof("Reboot not required") return false } } func rebootBlocked() bool { if forceRebootsentinelExists() { log.Infof("Force reboot sentinel %v exists, force rebooting activated", forceRebootSentinel) return false } if prometheusURL != "" { alertNames, err := alerts.PrometheusActiveAlerts(prometheusURL, alertFilter) if err != nil { log.Warnf("Reboot blocked: prometheus query error: %v", err) return true } count := len(alertNames) if count > 10 { alertNames = append(alertNames[:10], "...") } if count > 0 { log.Warnf("Reboot blocked: %d active alerts: %v", count, alertNames) return true } } return false } func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}) bool { holding, err := lock.Test(metadata) if err != nil { log.Fatalf("Error testing lock: %v", err) } if holding { log.Infof("Holding lock") } return holding } func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}) bool { holding, holder, err := lock.Acquire(metadata) switch { case err != nil: log.Fatalf("Error acquiring lock: %v", err) return false case !holding: log.Warnf("Lock already held: %v", holder) return false default: log.Infof("Acquired reboot lock") return true } } func release(lock *daemonsetlock.DaemonSetLock) { log.Infof("Releasing lock") if err := lock.Release(); err != nil { log.Fatalf("Error releasing lock: %v", err) } } func drain(nodeID string) { log.Infof("Draining node %s", nodeID) drainCmd := newCommand("/usr/bin/kubectl", "drain", "--ignore-daemonsets", "--delete-local-data", "--force", nodeID) if err := drainCmd.Run(); err != nil { log.Fatalf("Error invoking drain command: %v", err) } } func uncordon(nodeID string) { log.Infof("Uncordoning node %s", nodeID) uncordonCmd := newCommand("/usr/bin/kubectl", "uncordon", nodeID) if err := uncordonCmd.Run(); err != nil { log.Fatalf("Error invoking uncordon command: %v", err) } } func commandReboot(nodeID string) { log.Infof("Commanding reboot") if slackHookURL != "" { if err := slack.NotifyReboot(slackHookURL, slackUsername, nodeID); err != nil { log.Warnf("Error notifying slack: %v", err) } } // Relies on /var/run/dbus/system_bus_socket bind mount to talk to systemd rebootCmd := newCommand("/bin/systemctl", "reboot") if err := rebootCmd.Run(); err != nil { log.Fatalf("Error invoking reboot command: %v", err) } } func maintainRebootRequiredMetric(nodeID string) { for { if sentinelExists() { rebootRequiredGauge.WithLabelValues(nodeID).Set(1) } else { rebootRequiredGauge.WithLabelValues(nodeID).Set(0) } time.Sleep(time.Minute) } } // nodeMeta is used to remember information across reboots type nodeMeta struct { Unschedulable bool `json:"unschedulable"` } func rebootAsRequired(nodeID string) { config, err := rest.InClusterConfig() if err != nil { log.Fatal(err) } client, err := kubernetes.NewForConfig(config) if err != nil { log.Fatal(err) } lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation) nodeMeta := nodeMeta{} if holding(lock, &nodeMeta) { if !nodeMeta.Unschedulable { uncordon(nodeID) } release(lock) } source := rand.NewSource(time.Now().UnixNano()) tick := delaytick.New(source, period) for _ = range tick { if rebootRequired() && !rebootBlocked() { node, err := client.CoreV1().Nodes().Get(nodeID, metav1.GetOptions{}) if err != nil { log.Fatal(err) } nodeMeta.Unschedulable = node.Spec.Unschedulable if acquire(lock, &nodeMeta) { if !nodeMeta.Unschedulable { drain(nodeID) } commandReboot(nodeID) for { log.Infof("Waiting for reboot") time.Sleep(time.Minute) } } } } } func root(cmd *cobra.Command, args []string) { log.Infof("Kubernetes Reboot Daemon: %s", version) nodeID := os.Getenv("KURED_NODE_ID") if nodeID == "" { log.Fatal("KURED_NODE_ID environment variable required") } log.Infof("Node ID: %s", nodeID) log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation) log.Infof("Reboot Sentinel: %s every %v", rebootSentinel, period) go rebootAsRequired(nodeID) go maintainRebootRequiredMetric(nodeID) http.Handle("/metrics", promhttp.Handler()) log.Fatal(http.ListenAndServe(":8080", nil)) }