main.go 12.6 KB
Newer Older
Adam Harrison's avatar
Adam Harrison committed
1
2
3
package main

import (
4
	"context"
5
	"fmt"
Adam Harrison's avatar
Adam Harrison committed
6
	"math/rand"
7
	"net/http"
Adam Harrison's avatar
Adam Harrison committed
8
9
10
11
12
	"os"
	"os/exec"
	"regexp"
	"time"

13
	log "github.com/sirupsen/logrus"
Adam Harrison's avatar
Adam Harrison committed
14
	"github.com/spf13/cobra"
15
	v1 "k8s.io/api/core/v1"
16
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Adam Harrison's avatar
Adam Harrison committed
17
18
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
19
	kubectldrain "k8s.io/kubectl/pkg/drain"
Adam Harrison's avatar
Adam Harrison committed
20

21
22
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
Adam Harrison's avatar
Adam Harrison committed
23
24
25
	"github.com/weaveworks/kured/pkg/alerts"
	"github.com/weaveworks/kured/pkg/daemonsetlock"
	"github.com/weaveworks/kured/pkg/delaytick"
26
	"github.com/weaveworks/kured/pkg/notifications/slack"
27
	"github.com/weaveworks/kured/pkg/taints"
JJ Jordan's avatar
JJ Jordan committed
28
	"github.com/weaveworks/kured/pkg/timewindow"
Adam Harrison's avatar
Adam Harrison committed
29
30
31
)

var (
32
33
34
	version = "unreleased"

	// Command line flags
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
	period                    time.Duration
	dsNamespace               string
	dsName                    string
	lockAnnotation            string
	lockTTL                   time.Duration
	prometheusURL             string
	alertFilter               *regexp.Regexp
	rebootSentinel            string
	preferNoScheduleTaintName string
	slackHookURL              string
	slackUsername             string
	slackChannel              string
	messageTemplateDrain      string
	messageTemplateReboot     string
	podSelectors              []string
50

JJ Jordan's avatar
JJ Jordan committed
51
52
53
54
55
	rebootDays  []string
	rebootStart string
	rebootEnd   string
	timezone    string

56
57
58
59
60
61
	// Metrics
	rebootRequiredGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "kured",
		Name:      "reboot_required",
		Help:      "OS requires reboot due to software updates.",
	}, []string{"node"})
Adam Harrison's avatar
Adam Harrison committed
62
63
)

64
65
66
67
func init() {
	prometheus.MustRegister(rebootRequiredGauge)
}

Adam Harrison's avatar
Adam Harrison committed
68
69
70
71
72
73
func main() {
	rootCmd := &cobra.Command{
		Use:   "kured",
		Short: "Kubernetes Reboot Daemon",
		Run:   root}

74
75
	rootCmd.PersistentFlags().DurationVar(&period, "period", time.Minute*60,
		"reboot check period")
76
	rootCmd.PersistentFlags().StringVar(&dsNamespace, "ds-namespace", "kube-system",
Adam Harrison's avatar
Adam Harrison committed
77
		"namespace containing daemonset on which to place lock")
78
	rootCmd.PersistentFlags().StringVar(&dsName, "ds-name", "kured",
Adam Harrison's avatar
Adam Harrison committed
79
80
81
		"name of daemonset on which to place lock")
	rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", "weave.works/kured-node-lock",
		"annotation in which to record locking node")
82
83
	rootCmd.PersistentFlags().DurationVar(&lockTTL, "lock-ttl", 0,
		"expire lock annotation after this duration (default: 0, disabled)")
Adam Harrison's avatar
Adam Harrison committed
84
85
86
87
88
89
	rootCmd.PersistentFlags().StringVar(&prometheusURL, "prometheus-url", "",
		"Prometheus instance to probe for active alerts")
	rootCmd.PersistentFlags().Var(&regexpValue{&alertFilter}, "alert-filter-regexp",
		"alert names to ignore when checking for active alerts")
	rootCmd.PersistentFlags().StringVar(&rebootSentinel, "reboot-sentinel", "/var/run/reboot-required",
		"path to file whose existence signals need to reboot")
90
91
	rootCmd.PersistentFlags().StringVar(&preferNoScheduleTaintName, "prefer-no-schedule-taint", "weave.works/kured-node-reboot",
		"taint name applied during pending node reboot (to prevent receiving additional pods from other rebooting nodes)")
Adam Harrison's avatar
Adam Harrison committed
92

93
94
95
96
	rootCmd.PersistentFlags().StringVar(&slackHookURL, "slack-hook-url", "",
		"slack hook URL for reboot notfications")
	rootCmd.PersistentFlags().StringVar(&slackUsername, "slack-username", "kured",
		"slack username for reboot notfications")
97
98
	rootCmd.PersistentFlags().StringVar(&slackChannel, "slack-channel", "",
		"slack channel for reboot notfications")
99
100
101
102
	rootCmd.PersistentFlags().StringVar(&messageTemplateDrain, "message-template-drain", "Draining node %s",
		"message template used to notify about a node being drained")
	rootCmd.PersistentFlags().StringVar(&messageTemplateReboot, "message-template-reboot", "Rebooting node %s",
		"message template used to notify about a node being rebooted")
103

104
105
106
	rootCmd.PersistentFlags().StringArrayVar(&podSelectors, "blocking-pod-selector", nil,
		"label selector identifying pods whose presence should prevent reboots")

JJ Jordan's avatar
JJ Jordan committed
107
108
	rootCmd.PersistentFlags().StringSliceVar(&rebootDays, "reboot-days", timewindow.EveryDay,
		"schedule reboot on these days")
JJ Jordan's avatar
JJ Jordan committed
109
	rootCmd.PersistentFlags().StringVar(&rebootStart, "start-time", "0:00",
JJ Jordan's avatar
JJ Jordan committed
110
		"schedule reboot only after this time of day")
111
	rootCmd.PersistentFlags().StringVar(&rebootEnd, "end-time", "23:59:59",
JJ Jordan's avatar
JJ Jordan committed
112
		"schedule reboot only before this time of day")
JJ Jordan's avatar
JJ Jordan committed
113
	rootCmd.PersistentFlags().StringVar(&timezone, "time-zone", "UTC",
JJ Jordan's avatar
JJ Jordan committed
114
		"use this timezone for schedule inputs")
JJ Jordan's avatar
JJ Jordan committed
115

Adam Harrison's avatar
Adam Harrison committed
116
117
118
119
120
	if err := rootCmd.Execute(); err != nil {
		log.Fatal(err)
	}
}

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// newCommand creates a new Command with stdout/stderr wired to our standard logger
func newCommand(name string, arg ...string) *exec.Cmd {
	cmd := exec.Command(name, arg...)

	cmd.Stdout = log.NewEntry(log.StandardLogger()).
		WithField("cmd", cmd.Args[0]).
		WithField("std", "out").
		WriterLevel(log.InfoLevel)

	cmd.Stderr = log.NewEntry(log.StandardLogger()).
		WithField("cmd", cmd.Args[0]).
		WithField("std", "err").
		WriterLevel(log.WarnLevel)

	return cmd
}

138
func sentinelExists() bool {
Adam Harrison's avatar
Adam Harrison committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
	// Relies on hostPID:true and privileged:true to enter host mount space
	sentinelCmd := newCommand("/usr/bin/nsenter", "-m/proc/1/ns/mnt", "--", "/usr/bin/test", "-f", rebootSentinel)
	if err := sentinelCmd.Run(); err != nil {
		switch err := err.(type) {
		case *exec.ExitError:
			// We assume a non-zero exit code means 'reboot not required', but of course
			// the user could have misconfigured the sentinel command or something else
			// went wrong during its execution. In that case, not entering a reboot loop
			// is the right thing to do, and we are logging stdout/stderr of the command
			// so it should be obvious what is wrong.
			return false
		default:
			// Something was grossly misconfigured, such as the command path being wrong.
			log.Fatalf("Error invoking sentinel command: %v", err)
		}
Adam Harrison's avatar
Adam Harrison committed
154
	}
Adam Harrison's avatar
Adam Harrison committed
155
	return true
Adam Harrison's avatar
Adam Harrison committed
156
157
}

158
159
160
161
162
func rebootRequired() bool {
	if sentinelExists() {
		log.Infof("Reboot required")
		return true
	}
Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
163
164
	log.Infof("Reboot not required")
	return false
165
166
}

167
func rebootBlocked(client *kubernetes.Clientset, nodeID string) bool {
Adam Harrison's avatar
Adam Harrison committed
168
	if prometheusURL != "" {
169
		alertNames, err := alerts.PrometheusActiveAlerts(prometheusURL, alertFilter)
Adam Harrison's avatar
Adam Harrison committed
170
171
172
173
		if err != nil {
			log.Warnf("Reboot blocked: prometheus query error: %v", err)
			return true
		}
174
175
176
177
		count := len(alertNames)
		if count > 10 {
			alertNames = append(alertNames[:10], "...")
		}
Adam Harrison's avatar
Adam Harrison committed
178
		if count > 0 {
179
			log.Warnf("Reboot blocked: %d active alerts: %v", count, alertNames)
Adam Harrison's avatar
Adam Harrison committed
180
181
182
			return true
		}
	}
183
184
185

	fieldSelector := fmt.Sprintf("spec.nodeName=%s", nodeID)
	for _, labelSelector := range podSelectors {
186
		podList, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
			LabelSelector: labelSelector,
			FieldSelector: fieldSelector,
			Limit:         10})
		if err != nil {
			log.Warnf("Reboot blocked: pod query error: %v", err)
			return true
		}

		if len(podList.Items) > 0 {
			podNames := make([]string, 0, len(podList.Items))
			for _, pod := range podList.Items {
				podNames = append(podNames, pod.Name)
			}
			if len(podList.Continue) > 0 {
				podNames = append(podNames, "...")
			}
			log.Warnf("Reboot blocked: matching pods: %v", podNames)
			return true
		}
	}

Adam Harrison's avatar
Adam Harrison committed
208
209
210
211
212
213
214
215
216
217
218
219
220
221
	return false
}

func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}) bool {
	holding, err := lock.Test(metadata)
	if err != nil {
		log.Fatalf("Error testing lock: %v", err)
	}
	if holding {
		log.Infof("Holding lock")
	}
	return holding
}

Michal Schott's avatar
Michal Schott committed
222
223
func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}, TTL time.Duration) bool {
	holding, holder, err := lock.Acquire(metadata, TTL)
Adam Harrison's avatar
Adam Harrison committed
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
	switch {
	case err != nil:
		log.Fatalf("Error acquiring lock: %v", err)
		return false
	case !holding:
		log.Warnf("Lock already held: %v", holder)
		return false
	default:
		log.Infof("Acquired reboot lock")
		return true
	}
}

func release(lock *daemonsetlock.DaemonSetLock) {
	log.Infof("Releasing lock")
	if err := lock.Release(); err != nil {
		log.Fatalf("Error releasing lock: %v", err)
	}
}

Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
244
func drain(client *kubernetes.Clientset, node *v1.Node) {
245
246
247
	nodename := node.GetName()

	log.Infof("Draining node %s", nodename)
248
249

	if slackHookURL != "" {
250
		if err := slack.NotifyDrain(slackHookURL, slackUsername, slackChannel, messageTemplateDrain, nodename); err != nil {
251
252
253
254
			log.Warnf("Error notifying slack: %v", err)
		}
	}

Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
255
256
	drainer := &kubectldrain.Helper{
		Client:              client,
257
		GracePeriodSeconds:  -1,
Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
258
259
260
261
262
263
		Force:               true,
		DeleteLocalData:     true,
		IgnoreAllDaemonSets: true,
		ErrOut:              os.Stderr,
		Out:                 os.Stdout,
	}
264
	if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
265
		log.Fatalf("Error cordonning %s: %v", nodename, err)
266
	}
267

268
	if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil {
269
		log.Fatalf("Error draining %s: %v", nodename, err)
Adam Harrison's avatar
Adam Harrison committed
270
271
272
	}
}

Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
273
func uncordon(client *kubernetes.Clientset, node *v1.Node) {
274
275
	nodename := node.GetName()
	log.Infof("Uncordoning node %s", nodename)
Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
276
277
278
279
280
	drainer := &kubectldrain.Helper{
		Client: client,
		ErrOut: os.Stderr,
		Out:    os.Stdout,
	}
281
	if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil {
282
		log.Fatalf("Error uncordonning %s: %v", nodename, err)
Adam Harrison's avatar
Adam Harrison committed
283
284
285
	}
}

286
func commandReboot(nodeID string) {
287
	log.Infof("Commanding reboot for node: %s", nodeID)
288
289

	if slackHookURL != "" {
290
		if err := slack.NotifyReboot(slackHookURL, slackUsername, slackChannel, messageTemplateReboot, nodeID); err != nil {
291
292
293
294
			log.Warnf("Error notifying slack: %v", err)
		}
	}

Adam Harrison's avatar
Adam Harrison committed
295
296
	// Relies on hostPID:true and privileged:true to enter host mount space
	rebootCmd := newCommand("/usr/bin/nsenter", "-m/proc/1/ns/mnt", "/bin/systemctl", "reboot")
Adam Harrison's avatar
Adam Harrison committed
297
298
299
300
301
	if err := rebootCmd.Run(); err != nil {
		log.Fatalf("Error invoking reboot command: %v", err)
	}
}

302
func maintainRebootRequiredMetric(nodeID string) {
Adam Harrison's avatar
Adam Harrison committed
303
	for {
304
305
306
307
308
		if sentinelExists() {
			rebootRequiredGauge.WithLabelValues(nodeID).Set(1)
		} else {
			rebootRequiredGauge.WithLabelValues(nodeID).Set(0)
		}
Adam Harrison's avatar
Adam Harrison committed
309
310
311
312
313
314
315
316
317
		time.Sleep(time.Minute)
	}
}

// nodeMeta is used to remember information across reboots
type nodeMeta struct {
	Unschedulable bool `json:"unschedulable"`
}

Michal Schott's avatar
Michal Schott committed
318
func rebootAsRequired(nodeID string, window *timewindow.TimeWindow, TTL time.Duration) {
Adam Harrison's avatar
Adam Harrison committed
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
	config, err := rest.InClusterConfig()
	if err != nil {
		log.Fatal(err)
	}

	client, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatal(err)
	}

	lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation)

	nodeMeta := nodeMeta{}
	if holding(lock, &nodeMeta) {
		if !nodeMeta.Unschedulable {
334
335
336
337
			node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
			if err != nil {
				log.Fatal(err)
			}
Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
338
			uncordon(client, node)
Adam Harrison's avatar
Adam Harrison committed
339
340
341
342
		}
		release(lock)
	}

343
344
345
346
347
348
349
	preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)

	// Remove taint immediately during startup to quickly allow scheduling again.
	if !rebootRequired() {
		preferNoScheduleTaint.Disable()
	}

Adam Harrison's avatar
Adam Harrison committed
350
	source := rand.NewSource(time.Now().UnixNano())
351
	tick := delaytick.New(source, period)
Jean-Philippe Evrard's avatar
Jean-Philippe Evrard committed
352
	for range tick {
353
		if !window.Contains(time.Now()) {
354
355
			// Remove taint outside the reboot time window to allow for normal operation.
			preferNoScheduleTaint.Disable()
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
			continue
		}

		if !rebootRequired() {
			continue
		}

		if rebootBlocked(client, nodeID) {
			continue
		}

		node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
		if err != nil {
			log.Fatal(err)
		}
		nodeMeta.Unschedulable = node.Spec.Unschedulable

		if !acquire(lock, &nodeMeta, TTL) {
374
375
			// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
			preferNoScheduleTaint.Enable()
376
377
378
379
380
381
382
383
384
385
			continue
		}

		if !nodeMeta.Unschedulable {
			drain(client, node)
		}
		commandReboot(nodeID)
		for {
			log.Infof("Waiting for reboot")
			time.Sleep(time.Minute)
Adam Harrison's avatar
Adam Harrison committed
386
387
		}
	}
388
389
390
391
392
393
394
395
396
397
}

func root(cmd *cobra.Command, args []string) {
	log.Infof("Kubernetes Reboot Daemon: %s", version)

	nodeID := os.Getenv("KURED_NODE_ID")
	if nodeID == "" {
		log.Fatal("KURED_NODE_ID environment variable required")
	}

JJ Jordan's avatar
JJ Jordan committed
398
399
400
401
402
	window, err := timewindow.New(rebootDays, rebootStart, rebootEnd, timezone)
	if err != nil {
		log.Fatalf("Failed to build time window: %v", err)
	}

403
404
	log.Infof("Node ID: %s", nodeID)
	log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
405
406
407
408
409
	if lockTTL > 0 {
		log.Infof("Lock TTL set, lock will expire after: %v", lockTTL)
	} else {
		log.Info("Lock TTL not set, lock will remain until being released")
	}
410
	log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName)
411
	log.Infof("Reboot Sentinel: %s every %v", rebootSentinel, period)
412
	log.Infof("Blocking Pod Selectors: %v", podSelectors)
JJ Jordan's avatar
JJ Jordan committed
413
	log.Infof("Reboot on: %v", window)
414

415
	go rebootAsRequired(nodeID, window, lockTTL)
416
	go maintainRebootRequiredMetric(nodeID)
Adam Harrison's avatar
Adam Harrison committed
417

418
419
	http.Handle("/metrics", promhttp.Handler())
	log.Fatal(http.ListenAndServe(":8080", nil))
Adam Harrison's avatar
Adam Harrison committed
420
}