internal/agent/agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
// design touchstones
// simple to configure/implement
// - minimal configurable options
// - sparse use of commands/flags/env vars
// - fast to start/join a cluster
// - http-only api
// api auth/encryption is best implemented by a reverse proxy (like caddy or nginx)
// - no RPC (it's too annoying to deal with)
// predictable state
// - no runtime configuration allowed (no reload support)
// for a gossip cluster to work, nodes in the cluster must
// be the source of truth. we do not allow the user to change
// the cluster's view of reality in any way - the user may
// only inspect it.
// - HTTP API only supports read-only commands
// this ensures that cascade's starting state never
// diverges from its running state.
// (restarts are very fast + interruptionless)
// this also helps with security
// one massive global cluster
// - for simplicity, operability, and because gossip can handle it
// compatability
// - attempt to have a consul-compatible API where it matters?
// todo
// think about checks
// think about DNS weights
// dns resolver for services
// read-write exception for maintenance mode?
// opentelemetry metrics
// how should cascade handle name conflicts? defaulting to just blowing up
// for now, but we _could_ take the tailscale route & append -1 or whatever
// to the node. that would be more user friendly.
// key differences from consul
// - services are gossip'd
// in cascade, node == member, and catalog == agent
// agent api (http-only)
// GET /v1/agent/metrics -> opentelemetry metrics for this agent
// GET /v1/agent/members -> show serf cluster membership state
// GET /v1/agent/services -> show services this agent owns
// aliases: GET /v1/catalog/nodes
// catalog api (http & dns)
// GET /v1/catalog/nodes
// GET /v1/catalog/services
// dns: dig @127.0.0.1 -p 8600 nostromo.node.cascade. ANY
// dns: dig @127.0.0.1 -p 8600 redis.service.cascade. ANY
// consul uses /catalog because the masters maintain a service
// catalog separate than the agent state
// in cascade, the agents _are_ expected to maintain service catalogues,
// so we do away with the catalog concept entirely.
package agent
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"git.j3s.sh/cascade/api"
"github.com/hashicorp/serf/serf"
"golang.org/x/exp/slog"
)
// antiEntropyInterval is how often each agent re-broadcasts its owned
// service state so peers that missed the original user event catch up.
const antiEntropyInterval = 60 * time.Second
// Agent starts and manages a Serf & adds service discovery (TODO)
type Agent struct {
// this is the agent's configuration
Config *Config
serfConfig *serf.Config
// This is the underlying Serf we are wrapping
serf *serf.Serf
// This is the http api handler - see http.go and *_endpoint.go
httpHandlers *HTTPHandlers
// services holds the agent's own + gossipped service catalog.
services *ServiceStore
// dns is the optional DNS server that resolves
// <node>.node.<domain> and <svc>.service.<domain>.
dns *DNSServer
// We receive serf events on this channel
eventCh chan serf.Event
logger *slog.Logger
// if this channel recieves a signal, cascade shuts down
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// New returns an agent lmao
func New(config *Config) *Agent {
eventCh := make(chan serf.Event, 1024)
// TODO: make log level configurable
// XXX: reconsider this agent-scoped logger? what does consul do?
// logLogger is a bridge from the log.Logger to slog.Logger
h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(os.Stderr)
logger := slog.New(h)
slog.SetDefault(logger)
// serf
serfConfig := serf.DefaultConfig()
serfConfig.EventCh = eventCh
serfConfig.Logger = slog.NewLogLogger(h, slog.LevelInfo)
// TODO: some of these serf settings were cargo-culted
// from consul[1]. re-examine them eventually.
serfConfig.EnableNameConflictResolution = false
serfConfig.LeavePropagateDelay = 3 * time.Second
serfConfig.MemberlistConfig.BindAddr = config.SerfBindAddr.IP.String()
serfConfig.MemberlistConfig.BindPort = config.SerfBindAddr.Port
serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
serfConfig.MinQueueDepth = 4096
serfConfig.NodeName = config.NodeName
serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)
serfConfig.QueueDepthWarning = 1000000
serfConfig.ReconnectTimeout = 3 * 24 * time.Hour
agent := Agent{
Config: config,
serfConfig: serfConfig,
httpHandlers: &HTTPHandlers{},
eventCh: eventCh,
logger: logger,
shutdownCh: make(chan struct{}),
}
// we put the completed agent inside of httpHandlers because
// it needs access to a bunch of Agent{} funcs. basically a hardlink.
agent.httpHandlers.agent = &agent
return &agent
}
func (a *Agent) eventLoop() {
serfShutdownCh := a.serf.ShutdownCh()
for {
select {
case e := <-a.eventCh:
a.handleSerfEvent(e)
case <-serfShutdownCh:
a.logger.Info("detect serf shutdown, turning off event loop")
a.Shutdown()
return
case <-a.shutdownCh:
return
}
}
}
func (a *Agent) handleSerfEvent(e serf.Event) {
switch ev := e.(type) {
case serf.UserEvent:
a.handleUserEvent(ev)
case serf.MemberEvent:
if ev.Type == serf.EventMemberFailed || ev.Type == serf.EventMemberLeave || ev.Type == serf.EventMemberReap {
for _, m := range ev.Members {
a.logger.Info("drop services for departed member", "node", m.Name, "event", ev.Type.String())
a.services.DropNode(m.Name)
}
}
default:
a.logger.Debug("receive event", "event", e.String())
}
}
func (a *Agent) handleUserEvent(ev serf.UserEvent) {
switch ev.Name {
case serfRegisterEvent:
var p registerPayload
if err := json.Unmarshal(ev.Payload, &p); err != nil {
a.logger.Warn("decode svc-register event", "err", err)
return
}
if p.Service == nil {
return
}
a.services.ApplyRegister(p.Node, p.Service)
case serfDeregisterEvent:
var p deregisterPayload
if err := json.Unmarshal(ev.Payload, &p); err != nil {
a.logger.Warn("decode svc-deregister event", "err", err)
return
}
a.services.ApplyDeregister(p.Node, p.ID)
}
}
// RegisterService stores a service in the local agent and gossips the new
// state to the cluster.
func (a *Agent) RegisterService(svc *api.AgentService) error {
if err := a.services.Register(svc); err != nil {
return err
}
return a.broadcastRegister(svc)
}
// DeregisterService removes a service from the local agent and gossips the
// removal to the cluster.
func (a *Agent) DeregisterService(id string) error {
if err := a.services.Deregister(id); err != nil {
return err
}
payload, err := json.Marshal(deregisterPayload{Node: a.Config.NodeName, ID: id})
if err != nil {
return err
}
return a.serf.UserEvent(serfDeregisterEvent, payload, false)
}
func (a *Agent) broadcastRegister(svc *api.AgentService) error {
payload, err := json.Marshal(registerPayload{Node: a.Config.NodeName, Service: svc})
if err != nil {
return err
}
return a.serf.UserEvent(serfRegisterEvent, payload, false)
}
// OwnedServices returns this agent's currently-owned service registrations.
func (a *Agent) OwnedServices() map[string]*api.AgentService {
return a.services.Owned()
}
// CatalogServices returns the cluster-wide service-name -> tag-union view.
func (a *Agent) CatalogServices() map[string][]string {
return a.services.AllServices()
}
// CatalogServiceInstances returns every known instance of the named service.
func (a *Agent) CatalogServiceInstances(name string) []NodeService {
return a.services.ServiceInstances(name)
}
// CatalogNodeServices returns the services owned by the named node.
func (a *Agent) CatalogNodeServices(node string) map[string]*api.AgentService {
return a.services.NodeServices(node)
}
// CatalogNodes returns every node serf currently knows about, regardless of
// whether it has registered services. Nodes that have gossipped services
// but are not yet visible to serf (rare race during join) are also included.
func (a *Agent) CatalogNodes() []serf.Member {
members := a.serf.Members()
seen := map[string]struct{}{}
for _, m := range members {
seen[m.Name] = struct{}{}
}
for _, n := range a.services.Nodes() {
if _, ok := seen[n]; ok {
continue
}
members = append(members, serf.Member{Name: n, Status: serf.StatusNone})
seen[n] = struct{}{}
}
return members
}
// MemberByName returns the serf member with the given name, or zero value
// if no such member is known.
func (a *Agent) MemberByName(name string) (serf.Member, bool) {
for _, m := range a.serf.Members() {
if m.Name == name {
return m, true
}
}
return serf.Member{}, false
}
// antiEntropyLoop periodically re-broadcasts this agent's owned services so
// peers that missed the original user event eventually catch up.
func (a *Agent) antiEntropyLoop() {
t := time.NewTicker(antiEntropyInterval)
defer t.Stop()
for {
select {
case <-t.C:
for _, svc := range a.services.Owned() {
if err := a.broadcastRegister(svc); err != nil {
a.logger.Warn("anti-entropy rebroadcast", "id", svc.ID, "err", err)
}
}
case <-a.shutdownCh:
return
}
}
}
// Start creates and starts an agent's serf client, eventloop, api, and more!
// The agent config should not be modified after an agent has been started.
func (a *Agent) Start() error {
// open the service store before serf so we never gossip without it
if err := os.MkdirAll(a.Config.DataDir, 0o700); err != nil {
return fmt.Errorf("create data dir %q: %w", a.Config.DataDir, err)
}
dbPath := filepath.Join(a.Config.DataDir, a.Config.NodeName+".db")
store, err := NewServiceStore(a.Config.NodeName, dbPath)
if err != nil {
return err
}
a.services = store
// Create serf first
srf, err := serf.Create(a.serfConfig)
if err != nil {
return err
}
a.serf = srf
// Start event loop
go a.eventLoop()
go a.antiEntropyLoop()
// Re-broadcast any services that survived restart so peers catch up
// without waiting for the first anti-entropy tick.
for _, svc := range a.services.Owned() {
if err := a.broadcastRegister(svc); err != nil {
a.logger.Warn("startup rebroadcast", "id", svc.ID, "err", err)
}
}
// Start API Server
apiHandler := a.httpHandlers.handler()
go a.serveAPI(apiHandler)
// Start DNS server
a.dns = newDNSServer(a)
if err := a.dns.Start(); err != nil {
return err
}
return nil
}
func (a *Agent) serveAPI(handler http.Handler) {
portStr := fmt.Sprintf("%s:%d", a.Config.HTTPBindAddr.IP.String(), a.Config.HTTPBindAddr.Port)
if err := http.ListenAndServe(portStr, handler); err != nil {
a.logger.Error("api error", err)
}
}
// Shutdown closes this agent and all of its processes. Should be preceded
// by a Leave for a graceful shutdown.
func (a *Agent) Shutdown() error {
a.shutdownLock.Lock()
defer a.shutdownLock.Unlock()
if a.shutdown {
return nil
}
if a.serf == nil {
goto EXIT
}
a.logger.Info("request serf shutdown")
if err := a.serf.Shutdown(); err != nil {
return err
}
EXIT:
a.logger.Info("complete serf shutdown")
if a.dns != nil {
a.dns.Shutdown()
}
if a.services != nil {
if err := a.services.Close(); err != nil {
a.logger.Warn("close service store", "err", err)
}
}
a.shutdown = true
close(a.shutdownCh)
return nil
}
// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) Join(addrs []string) (n int, err error) {
a.logger.Info("issue join request", "addrs", addrs)
// we always ignore old events because cascade don't
// care about the past
n, err = a.serf.Join(addrs, true)
if err != nil {
return n, fmt.Errorf("Error joining: %v\n", err)
}
return n, err
}
// Leave prepares for a graceful shutdown of the agent and its processes
func (a *Agent) Leave() error {
if a.serf == nil {
return nil
}
a.logger.Info("request graceful serf leave")
return a.serf.Leave()
}
// ShutdownCh returns a channel that can be selected to wait
// for the agent to perform a shutdown.
func (a *Agent) ShutdownCh() <-chan struct{} {
return a.shutdownCh
}
func (a *Agent) GetCoordinate() error {
return nil
}
// TODO: filter support
// Members returns all of the serf members in the gossip cluster.
func (a *Agent) Members() []serf.Member {
// todo: move this to cascade/client perhaps
return a.serf.Members()
}
// [1]: sources for consul serf tweaks
// https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go
// https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go