huge refactor
Jes Olson j3s@c3f.net
Mon, 20 Feb 2023 00:31:14 -0800
7 files changed,
394 insertions(+),
313 deletions(-)
D
agent.go
@@ -1,128 +0,0 @@
-package main - -import ( - "fmt" - "sync" - - "github.com/hashicorp/serf/serf" - "golang.org/x/exp/slog" -) - -// Agent starts and manages a Serf & adds service discovery (TODO) -type Agent struct { - serfConf *serf.Config - eventCh chan serf.Event - // This is the underlying Serf we are wrapping - serf *serf.Serf - // a lil logger used by the agent, you know how it be - logger *slog.Logger - // this channel receiving a signal = shutdown - shutdown bool - shutdownCh chan struct{} - shutdownLock sync.Mutex -} - -func Create(config *Config, serfConf *serf.Config, logger *slog.Logger) *Agent { - // Create a channel to listen for events from Serf - eventCh := make(chan serf.Event, 64) - serfConf.EventCh = eventCh - - // Setup the agent - agent := &Agent{ - serfConf: serfConf, - eventCh: eventCh, - logger: logger, - shutdownCh: make(chan struct{}), - } - - return agent -} - -func (a *Agent) eventLoop() { - serfShutdownCh := a.serf.ShutdownCh() - for { - select { - case e := <-a.eventCh: - a.logger.Info("receive event", "event", e.String()) - - case <-serfShutdownCh: - a.logger.Info("detect serf shutdown, turning off event loop") - a.Shutdown() - return - - case <-a.shutdownCh: - return - } - } -} - -func (a *Agent) Start() error { - a.logger.Info("start serf agent") - - // Create serf first - serf, err := serf.Create(a.serfConf) - if err != nil { - return err - } - a.serf = serf - - // Start event loop - go a.eventLoop() - return nil -} - -// Shutdown closes this agent and all of its processes. Should be preceded -// by a Leave for a graceful shutdown. -func (a *Agent) Shutdown() error { - a.shutdownLock.Lock() - defer a.shutdownLock.Unlock() - - if a.shutdown { - return nil - } - - if a.serf == nil { - goto EXIT - } - - a.logger.Info("request serf shutdown") - if err := a.serf.Shutdown(); err != nil { - return err - } - -EXIT: - a.logger.Info("complete serf shutdown") - a.shutdown = true - close(a.shutdownCh) - return nil -} - -// Join asks the Serf instance to join. See the Serf.Join function. -func (a *Agent) Join(addrs []string) (n int, err error) { - a.logger.Info("issue join request", "addrs", addrs) - // we always ignore old events because cascade don't - // care about the past - n, err = a.serf.Join(addrs, true) - if err != nil { - return n, fmt.Errorf("Error joining: %v\n", err) - } - // TODO: when joining fails, we don't get an error here - serf & memberlist - // just print to stdout and serf.Join returns without issue. - return n, err -} - -// Leave prepares for a graceful shutdown of the agent and its processes -func (a *Agent) Leave() error { - if a.serf == nil { - return nil - } - - a.logger.Info("request graceful serf leave") - return a.serf.Leave() -} - -// ShutdownCh returns a channel that can be selected to wait -// for the agent to perform a shutdown. -func (a *Agent) ShutdownCh() <-chan struct{} { - return a.shutdownCh -}
A
agent/agent.go
@@ -0,0 +1,351 @@
+// design touchstones +// simple to configure/implement +// - minimal configurable options +// - sparse use of commands/flags/env vars +// - fast to start/join a cluster +// - http-only api +// api auth/encryption is best implemented by a reverse proxy (like caddy or nginx) +// - no RPC (it's too annoying to deal with) +// predictable state +// - no runtime configuration allowed (no reload support) +// for a gossip cluster to work, nodes in the cluster must +// be the source of truth. we do not allow the user to change +// the cluster's view of reality in any way - the user may +// only inspect it. +// - HTTP API only supports read-only commands +// this ensures that cascade's starting state never +// diverges from its running state. +// (restarts are very fast + interruptionless) +// this also helps with security +// one massive global cluster +// - for simplicity, operability, and because gossip can handle it +// compatability +// - attempt to have a consul-compatible API where it matters? +// todo +// dns resolver for services +// read-write exception for maintenance mode? +// opentelemetry metrics + +// key differences from consul +// - services are gossip'd + +// in cascade, node == member, and catalog == agent +// agent api (http-only) +// GET /v1/agent/metrics -> opentelemetry metrics for this agent +// GET /v1/agent/members -> show serf cluster membership state +// GET /v1/agent/services -> show services this agent owns +// aliases: GET /v1/catalog/nodes +// catalog api (http & dns) +// GET /v1/catalog/nodes +// GET /v1/catalog/services +// dns: dig @127.0.0.1 -p 8600 nostromo.node.cascade. ANY +// dns: dig @127.0.0.1 -p 8600 redis.service.cascade. ANY + +// consul uses /catalog because the masters maintain a service +// catalog separate than the agent state + +// in cascade, the agents _are_ expected to maintain service catalogues, +// so we do away with the catalog concept entirely. + +package agent + +import ( + "fmt" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "github.com/hashicorp/memberlist" + "github.com/hashicorp/serf/serf" + "golang.org/x/exp/slog" +) + +const ( + // gracefulTimeout controls how long we wait before forcefully terminating + // note that this value interacts with serfConfig.LeavePropagateDelay + gracefulTimeout = 10 * time.Second +) + +func Run() { + config := readConfig() + logger := config.SetupLogger() + agent := setupAgent(config, logger) + if err := agent.Start(); err != nil { + fmt.Println(err) + os.Exit(1) + } + defer agent.Shutdown() + // join any specified startup nodes + if err := startupJoin(config, agent); err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := handleSignals(config, agent); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +// handleSignals blocks until we get an exit-causing signal +func handleSignals(config *Config, agent *Agent) error { + signalCh := make(chan os.Signal, 4) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Wait for a signal + var sig os.Signal + select { + case s := <-signalCh: + sig = s + case <-agent.ShutdownCh(): + // Agent is already shutdown! + return nil + } + agent.logger.Info("caught signal", "signal", sig) + + // Check if we should do a graceful leave + graceful := false + if sig == os.Interrupt || sig == syscall.SIGTERM { + graceful = true + } + + // Bail fast if not doing a graceful leave + if !graceful { + agent.logger.Warn("leave cluster with zero grace") + return nil + } + + // Attempt a graceful leave + gracefulCh := make(chan struct{}) + agent.logger.Info("shut down agent gracefully") + go func() { + if err := agent.Leave(); err != nil { + agent.logger.Error("Error: %s", err) + return + } + close(gracefulCh) + }() + + // Wait for leave or another signal + select { + case <-signalCh: + return fmt.Errorf("idfk") + case <-time.After(gracefulTimeout): + return fmt.Errorf("leave timed out") + case <-gracefulCh: + return nil + } +} + +func readConfig() *Config { + config := DefaultConfig() + // CASCADE_LOGLEVEL=info + l := os.Getenv("CASCADE_LOGLEVEL") + if l != "" { + config.LogLevel = l + if l == "DEBUG" { + // config. + } + } + // CASCADE_BIND=192.168.0.15:12345 + if os.Getenv("CASCADE_BIND") != "" { + config.BindAddr = os.Getenv("CASCADE_BIND") + } + // CASCADE_JOIN=127.0.0.1,127.0.0.5 + if os.Getenv("CASCADE_JOIN") != "" { + config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",") + } + // CASCADE_NAME=nostromo.j3s.sh + if os.Getenv("CASCADE_NAME") != "" { + config.NodeName = os.Getenv("CASCADE_NAME") + } + return config +} + +func startupJoin(config *Config, agent *Agent) error { + if len(config.StartJoin) == 0 { + return nil + } + + n, err := agent.Join(config.StartJoin) + if err != nil { + return err + } + if n > 0 { + agent.logger.Info("issue join request", "nodes", n) + } + + return nil +} + +func setupAgent(config *Config, logger *slog.Logger) *Agent { + bindIP, bindPort, err := config.AddrParts(config.BindAddr) + if err != nil { + panic(err) + } + + serfConfig := serf.DefaultConfig() + // logLogger is just a bridge from the old logger to the + // new slog handler. + logLogger := slog.NewLogLogger(logger.Handler(), config.GetLogLevel()) + serfConfig.Logger = logLogger + serfConfig.NodeName = config.NodeName + serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) + // TODO: how should cascade handle name conflicts? + // defaulting to just blowing up for now, but + // we _could_ take the tailscale route & append + // -1 or whatever to the node. that would be more user friendly. + // TODO: some of these serf settings were pulled + // from consul[1]. re-examine them eventually. + serfConfig.EnableNameConflictResolution = false + serfConfig.LeavePropagateDelay = 3 * time.Second + serfConfig.MinQueueDepth = 4096 + serfConfig.QueueDepthWarning = 1000000 + serfConfig.ReconnectTimeout = 3 * 24 * time.Hour + + serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() + serfConfig.MemberlistConfig.Logger = logLogger + serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second + serfConfig.MemberlistConfig.BindAddr = bindIP + serfConfig.MemberlistConfig.BindPort = bindPort + + agent := Create(config, serfConfig, logger) + agent.logger.Info("setup cascade agent", "nodename", config.NodeName, "bindport", bindPort, "bindip", bindIP, "loglevel", config.GetLogLevel()) + return agent +} + +// [1]: sources for consul serf tweaks +// https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go +// https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go + +// Agent starts and manages a Serf & adds service discovery (TODO) +type Agent struct { + serfConf *serf.Config + eventCh chan serf.Event + // This is the underlying Serf we are wrapping + serf *serf.Serf + // a lil logger used by the agent, you know how it be + logger *slog.Logger + // this channel receiving a signal = shutdown + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex +} + +func Create(config *Config, serfConf *serf.Config, logger *slog.Logger) *Agent { + // Create a channel to listen for events from Serf + eventCh := make(chan serf.Event, 64) + serfConf.EventCh = eventCh + + // Setup the agent + agent := &Agent{ + serfConf: serfConf, + eventCh: eventCh, + logger: logger, + shutdownCh: make(chan struct{}), + } + + return agent +} + +func (a *Agent) eventLoop() { + serfShutdownCh := a.serf.ShutdownCh() + for { + select { + case e := <-a.eventCh: + a.logger.Info("receive event", "event", e.String()) + + case <-serfShutdownCh: + a.logger.Info("detect serf shutdown, turning off event loop") + a.Shutdown() + return + + case <-a.shutdownCh: + return + } + } +} + +func (a *Agent) Start() error { + // Create serf first + serf, err := serf.Create(a.serfConf) + if err != nil { + return err + } + a.serf = serf + + // Start event loop + go a.eventLoop() + return nil +} + +// Shutdown closes this agent and all of its processes. Should be preceded +// by a Leave for a graceful shutdown. +func (a *Agent) Shutdown() error { + a.shutdownLock.Lock() + defer a.shutdownLock.Unlock() + + if a.shutdown { + return nil + } + + if a.serf == nil { + goto EXIT + } + + a.logger.Info("request serf shutdown") + if err := a.serf.Shutdown(); err != nil { + return err + } + +EXIT: + a.logger.Info("complete serf shutdown") + a.shutdown = true + close(a.shutdownCh) + return nil +} + +// Join asks the Serf instance to join. See the Serf.Join function. +func (a *Agent) Join(addrs []string) (n int, err error) { + a.logger.Info("issue join request", "addrs", addrs) + // we always ignore old events because cascade don't + // care about the past + n, err = a.serf.Join(addrs, true) + if err != nil { + return n, fmt.Errorf("Error joining: %v\n", err) + } + // TODO: when joining fails, we don't get an error here - serf & memberlist + // just print to stdout and serf.Join returns without issue. + return n, err +} + +// Leave prepares for a graceful shutdown of the agent and its processes +func (a *Agent) Leave() error { + if a.serf == nil { + return nil + } + + a.logger.Info("request graceful serf leave") + return a.serf.Leave() +} + +// ShutdownCh returns a channel that can be selected to wait +// for the agent to perform a shutdown. +func (a *Agent) ShutdownCh() <-chan struct{} { + return a.shutdownCh +} + +func (c *Config) SetupLogger() *slog.Logger { + opts := slog.HandlerOptions{ + Level: c.GetLogLevel(), + } + if c.LogLevel == "DEBUG" { + opts.AddSource = true + } + handler := opts.NewTextHandler(os.Stderr) + logger := slog.New(handler) + slog.SetDefault(logger) + return logger +}
M
config.go
→
agent/config.go
@@ -1,4 +1,4 @@
-package main +package agent import ( "fmt"@@ -8,7 +8,7 @@
"golang.org/x/exp/slog" ) -const DefaultBindPort int = 4449 +const DefaultBindPort int = 4440 func DefaultConfig() *Config { hostname, err := os.Hostname()@@ -20,6 +20,7 @@ return &Config{
BindAddr: "0.0.0.0", LogLevel: "INFO", NodeName: hostname, + RpcAddr: "127.0.0.1:4441", } }@@ -35,6 +36,7 @@ type Config struct {
BindAddr string LogLevel string NodeName string + RpcAddr string StartJoin []string Services []Service }
M
go.mod
→
go.mod
@@ -17,6 +17,7 @@ github.com/hashicorp/go-msgpack v0.5.3 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect github.com/hashicorp/go-sockaddr v1.0.0 // indirect github.com/hashicorp/golang-lru v0.5.0 // indirect + github.com/hashicorp/logutils v1.0.0 // indirect github.com/miekg/dns v1.1.41 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 // indirect
M
go.sum
→
go.sum
@@ -26,6 +26,7 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM=
A
list/nodes.go
@@ -0,0 +1,16 @@
+package list + +import ( + "fmt" +) + +const usage = `usage: cascade list <command> + +commands: + cascade list nodes + cascade list services +` + +func Run(args []string) { + fmt.Printf("%+v", usage) +}
M
main.go
→
main.go
@@ -1,197 +1,35 @@
-// design touchstones -// configured entirely via environment variables -// minimal configurable options -// a single global cluster -// easy cluster formation -// todo -// dns resolver for services - package main import ( "fmt" "os" - "os/signal" - "strings" - "syscall" - "time" - "github.com/hashicorp/memberlist" - "github.com/hashicorp/serf/serf" - "golang.org/x/exp/slog" + "git.j3s.sh/cascade/agent" + "git.j3s.sh/cascade/list" ) -const ( - // gracefulTimeout controls how long we wait before forcefully terminating - // note that this value interacts with serfConfig.LeavePropagateDelay - gracefulTimeout = 10 * time.Second -) +// TODO: rename agent to something cooler +const help = `cascade agent start a cascade agent +cascade list|ls list nodes or services +cascade members show serf cluster members +cascade rtt estimate latency between nodes +` func main() { - config := readConfig() - logger := setupLogger(config) - agent := setupAgent(config, logger) - if err := agent.Start(); err != nil { - panic(err) - } - defer agent.Shutdown() - // join any specified startup nodes - if err := startupJoin(config, agent); err != nil { - panic(err) - } - go debugPrints(agent) - if err := handleSignals(config, agent); err != nil { - panic(err) - } -} - -func setupLogger(config *Config) *slog.Logger { - debugOn := false - if config.LogLevel == "DEBUG" { - debugOn = true - } - opts := slog.HandlerOptions{ - Level: config.GetLogLevel(), - AddSource: debugOn, - } - handler := opts.NewTextHandler(os.Stderr) - logger := slog.New(handler) - slog.SetDefault(logger) - return logger -} - - -func debugPrints(a *Agent) { - for { - for _, m := range a.serf.Members() { - a.logger.Debug("debug-loop", "name", m.Name, "addr", m.Addr, "status", m.Status) - } - time.Sleep(time.Second * 5) - } -} - -// handleSignals blocks until we get an exit-causing signal -func handleSignals(config *Config, agent *Agent) error { - signalCh := make(chan os.Signal, 4) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - // Wait for a signal - var sig os.Signal - select { - case s := <-signalCh: - sig = s - case <-agent.ShutdownCh(): - // Agent is already shutdown! - return nil - } - agent.logger.Info("caught signal", "signal", sig) - - // Check if we should do a graceful leave - graceful := false - if sig == os.Interrupt || sig == syscall.SIGTERM { - graceful = true - } - - // Bail fast if not doing a graceful leave - if !graceful { - agent.logger.Warn("leave cluster with zero grace") - return nil - } - - // Attempt a graceful leave - gracefulCh := make(chan struct{}) - agent.logger.Info("shut down agent gracefully") - go func() { - if err := agent.Leave(); err != nil { - agent.logger.Error("Error: %s", err) - return - } - close(gracefulCh) - }() - - // Wait for leave or another signal - select { - case <-signalCh: - return fmt.Errorf("idfk") - case <-time.After(gracefulTimeout): - return fmt.Errorf("leave timed out") - case <-gracefulCh: - return nil - } -} - -func readConfig() *Config { - config := DefaultConfig() - // CASCADE_LOGLEVEL=info - if os.Getenv("CASCADE_LOGLEVEL") != "" { - config.LogLevel = strings.ToUpper(os.Getenv("CASCADE_LOGLEVEL")) - } - // CASCADE_BIND=192.168.0.15:12345 - if os.Getenv("CASCADE_BIND") != "" { - config.BindAddr = os.Getenv("CASCADE_BIND") - } - // CASCADE_JOIN=127.0.0.1,127.0.0.5 - if os.Getenv("CASCADE_JOIN") != "" { - config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",") - } - // CASCADE_NAME=nostromo.j3s.sh - if os.Getenv("CASCADE_NAME") != "" { - config.NodeName = os.Getenv("CASCADE_NAME") - } - return config -} - -func startupJoin(config *Config, agent *Agent) error { - if len(config.StartJoin) == 0 { - return nil - } - - n, err := agent.Join(config.StartJoin) - if err != nil { - return err - } - if n > 0 { - agent.logger.Info("issue join request", "nodes", n) + if len(os.Args) == 1 { + fmt.Fprintf(os.Stderr, "%s", help) + os.Exit(1) } - return nil -} - -func setupAgent(config *Config, logger *slog.Logger) *Agent { - bindIP, bindPort, err := config.AddrParts(config.BindAddr) - if err != nil { - panic(err) + cmd := os.Args[1] + args := os.Args[2:] + switch cmd { + case "agent": + agent.Run() + case "list", "ls": + list.Run(args) + default: + fmt.Fprintf(os.Stderr, "'%s' is not a valid command\n\n%s", cmd, help) + os.Exit(1) } - - serfConfig := serf.DefaultConfig() - // logLogger is just a bridge from the old logger to the - // new slog handler. - logLogger := slog.NewLogLogger(logger.Handler(), config.GetLogLevel()) - serfConfig.Logger = logLogger - serfConfig.NodeName = config.NodeName - serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) - // TODO: how should cascade handle name conflicts? - // defaulting to just blowing up for now, but - // we _could_ take the tailscale route & append - // -1 or whatever to the node. that would be more user friendly. - // TODO: some of these serf settings were pulled - // from consul[1]. re-examine them eventually. - serfConfig.EnableNameConflictResolution = false - serfConfig.LeavePropagateDelay = 3 * time.Second - serfConfig.MinQueueDepth = 4096 - serfConfig.QueueDepthWarning = 1000000 - serfConfig.ReconnectTimeout = 3 * 24 * time.Hour - - serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() - serfConfig.MemberlistConfig.Logger = logLogger - serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second - serfConfig.MemberlistConfig.BindAddr = bindIP - serfConfig.MemberlistConfig.BindPort = bindPort - - agent := Create(config, serfConfig, logger) - return agent } - -// [1]: sources for consul serf tweaks -// https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go -// https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go