main.go 8.04 KB
Newer Older
Adam Harrison's avatar
Adam Harrison committed
1
2
3
4
package main

import (
	"math/rand"
5
	"net/http"
Adam Harrison's avatar
Adam Harrison committed
6
7
8
9
10
	"os"
	"os/exec"
	"regexp"
	"time"

11
	log "github.com/sirupsen/logrus"
Adam Harrison's avatar
Adam Harrison committed
12
	"github.com/spf13/cobra"
13
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Adam Harrison's avatar
Adam Harrison committed
14
15
16
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"

17
18
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
Adam Harrison's avatar
Adam Harrison committed
19
20
21
	"github.com/weaveworks/kured/pkg/alerts"
	"github.com/weaveworks/kured/pkg/daemonsetlock"
	"github.com/weaveworks/kured/pkg/delaytick"
22
	"github.com/weaveworks/kured/pkg/notifications/slack"
Adam Harrison's avatar
Adam Harrison committed
23
24
25
)

var (
26
27
28
	version = "unreleased"

	// Command line flags
29
30
31
32
33
34
35
36
37
38
	period              time.Duration
	dsNamespace         string
	dsName              string
	lockAnnotation      string
	prometheusURL       string
	alertFilter         *regexp.Regexp
	rebootSentinel      string
	forceRebootSentinel string
	slackHookURL        string
	slackUsername       string
39
40
41
42
43
44
45

	// Metrics
	rebootRequiredGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "kured",
		Name:      "reboot_required",
		Help:      "OS requires reboot due to software updates.",
	}, []string{"node"})
Adam Harrison's avatar
Adam Harrison committed
46
47
)

48
49
50
51
func init() {
	prometheus.MustRegister(rebootRequiredGauge)
}

Adam Harrison's avatar
Adam Harrison committed
52
53
54
55
56
57
func main() {
	rootCmd := &cobra.Command{
		Use:   "kured",
		Short: "Kubernetes Reboot Daemon",
		Run:   root}

58
59
	rootCmd.PersistentFlags().DurationVar(&period, "period", time.Minute*60,
		"reboot check period")
60
	rootCmd.PersistentFlags().StringVar(&dsNamespace, "ds-namespace", "kube-system",
Adam Harrison's avatar
Adam Harrison committed
61
		"namespace containing daemonset on which to place lock")
62
	rootCmd.PersistentFlags().StringVar(&dsName, "ds-name", "kured",
Adam Harrison's avatar
Adam Harrison committed
63
64
65
66
67
68
69
70
71
		"name of daemonset on which to place lock")
	rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", "weave.works/kured-node-lock",
		"annotation in which to record locking node")
	rootCmd.PersistentFlags().StringVar(&prometheusURL, "prometheus-url", "",
		"Prometheus instance to probe for active alerts")
	rootCmd.PersistentFlags().Var(&regexpValue{&alertFilter}, "alert-filter-regexp",
		"alert names to ignore when checking for active alerts")
	rootCmd.PersistentFlags().StringVar(&rebootSentinel, "reboot-sentinel", "/var/run/reboot-required",
		"path to file whose existence signals need to reboot")
72
73
	rootCmd.PersistentFlags().StringVar(&forceRebootSentinel, "force-reboot-sentinel", "/var/run/force-reboot-required",
		"path to file whose existence signals need to force reboot")
74
75
76
77
78
	rootCmd.PersistentFlags().StringVar(&slackHookURL, "slack-hook-url", "",
		"slack hook URL for reboot notfications")
	rootCmd.PersistentFlags().StringVar(&slackUsername, "slack-username", "kured",
		"slack username for reboot notfications")

Adam Harrison's avatar
Adam Harrison committed
79
80
81
82
83
	if err := rootCmd.Execute(); err != nil {
		log.Fatal(err)
	}
}

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// newCommand creates a new Command with stdout/stderr wired to our standard logger
func newCommand(name string, arg ...string) *exec.Cmd {
	cmd := exec.Command(name, arg...)

	cmd.Stdout = log.NewEntry(log.StandardLogger()).
		WithField("cmd", cmd.Args[0]).
		WithField("std", "out").
		WriterLevel(log.InfoLevel)

	cmd.Stderr = log.NewEntry(log.StandardLogger()).
		WithField("cmd", cmd.Args[0]).
		WithField("std", "err").
		WriterLevel(log.WarnLevel)

	return cmd
}

101
func sentinelExists() bool {
Adam Harrison's avatar
Adam Harrison committed
102
103
104
105
106
107
108
	_, err := os.Stat(rebootSentinel)
	switch {
	case err == nil:
		return true
	case os.IsNotExist(err):
		return false
	default:
109
		log.Fatalf("Unable to determine existence of sentinel: %v", err)
Adam Harrison's avatar
Adam Harrison committed
110
111
112
		return false // unreachable; prevents compilation error
	}
}
113
114
115
116
117
118
119
120
121
122
123
124
func forceRebootsentinelExists() bool {
	_, err := os.Stat(forceRebootSentinel)
	switch {
	case err == nil:
		return true
	case os.IsNotExist(err):
		return false
	default:
		log.Fatalf("Unable to determine existence of force reboot sentinel: %v", err)
		return false // unreachable; prevents compilation error
	}
}
Adam Harrison's avatar
Adam Harrison committed
125

126
func rebootRequired() bool {
127
	if sentinelExists() || forceRebootsentinelExists() {
128
129
130
131
132
133
134
135
		log.Infof("Reboot required")
		return true
	} else {
		log.Infof("Reboot not required")
		return false
	}
}

Adam Harrison's avatar
Adam Harrison committed
136
func rebootBlocked() bool {
137
	if forceRebootsentinelExists() {
138
		log.Infof("Force reboot sentinel %v exists, force rebooting activated", forceRebootSentinel)
139
140
		return false
	}
Adam Harrison's avatar
Adam Harrison committed
141
	if prometheusURL != "" {
142
		alertNames, err := alerts.PrometheusActiveAlerts(prometheusURL, alertFilter)
Adam Harrison's avatar
Adam Harrison committed
143
144
145
146
		if err != nil {
			log.Warnf("Reboot blocked: prometheus query error: %v", err)
			return true
		}
147
148
149
150
		count := len(alertNames)
		if count > 10 {
			alertNames = append(alertNames[:10], "...")
		}
Adam Harrison's avatar
Adam Harrison committed
151
		if count > 0 {
152
			log.Warnf("Reboot blocked: %d active alerts: %v", count, alertNames)
Adam Harrison's avatar
Adam Harrison committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
			return true
		}
	}
	return false
}

func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}) bool {
	holding, err := lock.Test(metadata)
	if err != nil {
		log.Fatalf("Error testing lock: %v", err)
	}
	if holding {
		log.Infof("Holding lock")
	}
	return holding
}

func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}) bool {
	holding, holder, err := lock.Acquire(metadata)
	switch {
	case err != nil:
		log.Fatalf("Error acquiring lock: %v", err)
		return false
	case !holding:
		log.Warnf("Lock already held: %v", holder)
		return false
	default:
		log.Infof("Acquired reboot lock")
		return true
	}
}

func release(lock *daemonsetlock.DaemonSetLock) {
	log.Infof("Releasing lock")
	if err := lock.Release(); err != nil {
		log.Fatalf("Error releasing lock: %v", err)
	}
}

func drain(nodeID string) {
	log.Infof("Draining node %s", nodeID)
194
	drainCmd := newCommand("/usr/bin/kubectl", "drain",
Adam Harrison's avatar
Adam Harrison committed
195
		"--ignore-daemonsets", "--delete-local-data", "--force", nodeID)
196

Adam Harrison's avatar
Adam Harrison committed
197
198
199
200
201
202
203
	if err := drainCmd.Run(); err != nil {
		log.Fatalf("Error invoking drain command: %v", err)
	}
}

func uncordon(nodeID string) {
	log.Infof("Uncordoning node %s", nodeID)
204
	uncordonCmd := newCommand("/usr/bin/kubectl", "uncordon", nodeID)
Adam Harrison's avatar
Adam Harrison committed
205
206
207
208
209
	if err := uncordonCmd.Run(); err != nil {
		log.Fatalf("Error invoking uncordon command: %v", err)
	}
}

210
func commandReboot(nodeID string) {
Adam Harrison's avatar
Adam Harrison committed
211
	log.Infof("Commanding reboot")
212
213
214
215
216
217
218

	if slackHookURL != "" {
		if err := slack.NotifyReboot(slackHookURL, slackUsername, nodeID); err != nil {
			log.Warnf("Error notifying slack: %v", err)
		}
	}

Adam Harrison's avatar
Adam Harrison committed
219
	// Relies on /var/run/dbus/system_bus_socket bind mount to talk to systemd
220
	rebootCmd := newCommand("/bin/systemctl", "reboot")
Adam Harrison's avatar
Adam Harrison committed
221
222
223
224
225
	if err := rebootCmd.Run(); err != nil {
		log.Fatalf("Error invoking reboot command: %v", err)
	}
}

226
func maintainRebootRequiredMetric(nodeID string) {
Adam Harrison's avatar
Adam Harrison committed
227
	for {
228
229
230
231
232
		if sentinelExists() {
			rebootRequiredGauge.WithLabelValues(nodeID).Set(1)
		} else {
			rebootRequiredGauge.WithLabelValues(nodeID).Set(0)
		}
Adam Harrison's avatar
Adam Harrison committed
233
234
235
236
237
238
239
240
241
		time.Sleep(time.Minute)
	}
}

// nodeMeta is used to remember information across reboots
type nodeMeta struct {
	Unschedulable bool `json:"unschedulable"`
}

242
func rebootAsRequired(nodeID string) {
Adam Harrison's avatar
Adam Harrison committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
	config, err := rest.InClusterConfig()
	if err != nil {
		log.Fatal(err)
	}

	client, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatal(err)
	}

	lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation)

	nodeMeta := nodeMeta{}
	if holding(lock, &nodeMeta) {
		if !nodeMeta.Unschedulable {
			uncordon(nodeID)
		}
		release(lock)
	}

	source := rand.NewSource(time.Now().UnixNano())
264
	tick := delaytick.New(source, period)
Adam Harrison's avatar
Adam Harrison committed
265
	for _ = range tick {
266
		if rebootRequired() && !rebootBlocked() {
267
			node, err := client.CoreV1().Nodes().Get(nodeID, metav1.GetOptions{})
268
269
270
271
272
273
274
275
276
			if err != nil {
				log.Fatal(err)
			}
			nodeMeta.Unschedulable = node.Spec.Unschedulable

			if acquire(lock, &nodeMeta) {
				if !nodeMeta.Unschedulable {
					drain(nodeID)
				}
277
278
279
280
281
				commandReboot(nodeID)
				for {
					log.Infof("Waiting for reboot")
					time.Sleep(time.Minute)
				}
Adam Harrison's avatar
Adam Harrison committed
282
283
284
			}
		}
	}
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
}

func root(cmd *cobra.Command, args []string) {
	log.Infof("Kubernetes Reboot Daemon: %s", version)

	nodeID := os.Getenv("KURED_NODE_ID")
	if nodeID == "" {
		log.Fatal("KURED_NODE_ID environment variable required")
	}

	log.Infof("Node ID: %s", nodeID)
	log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
	log.Infof("Reboot Sentinel: %s every %v", rebootSentinel, period)

	go rebootAsRequired(nodeID)
	go maintainRebootRequiredMetric(nodeID)
Adam Harrison's avatar
Adam Harrison committed
301

302
303
	http.Handle("/metrics", promhttp.Handler())
	log.Fatal(http.ListenAndServe(":8080", nil))
Adam Harrison's avatar
Adam Harrison committed
304
}