idfk, simplify agent? maybe?
Jes Olson j3s@c3f.net
Mon, 20 Feb 2023 03:28:39 -0800
3 files changed,
174 insertions(+),
249 deletions(-)
M
agent/agent.go
→
agent/agent.go
@@ -25,6 +25,9 @@ // todo
// dns resolver for services // read-write exception for maintenance mode? // opentelemetry metrics +// how should cascade handle name conflicts? defaulting to just blowing up +// for now, but we _could_ take the tailscale route & append -1 or whatever +// to the node. that would be more user friendly. // key differences from consul // - services are gossip'd@@ -52,10 +55,7 @@
import ( "fmt" "os" - "os/signal" - "strings" "sync" - "syscall" "time" "github.com/hashicorp/memberlist"@@ -63,191 +63,62 @@ "github.com/hashicorp/serf/serf"
"golang.org/x/exp/slog" ) -const ( - // gracefulTimeout controls how long we wait before forcefully terminating - // note that this value interacts with serfConfig.LeavePropagateDelay - gracefulTimeout = 10 * time.Second -) - -func Run() { - config := readConfig() - logger := config.SetupLogger() - agent := setupAgent(config, logger) - if err := agent.Start(); err != nil { - fmt.Println(err) - os.Exit(1) - } - defer agent.Shutdown() - // join any specified startup nodes - if err := startupJoin(config, agent); err != nil { - fmt.Println(err) - os.Exit(1) - } - if err := handleSignals(config, agent); err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -// handleSignals blocks until we get an exit-causing signal -func handleSignals(config *Config, agent *Agent) error { - signalCh := make(chan os.Signal, 4) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - // Wait for a signal - var sig os.Signal - select { - case s := <-signalCh: - sig = s - case <-agent.ShutdownCh(): - // Agent is already shutdown! - return nil - } - agent.logger.Info("caught signal", "signal", sig) - - // Check if we should do a graceful leave - graceful := false - if sig == os.Interrupt || sig == syscall.SIGTERM { - graceful = true - } - - // Bail fast if not doing a graceful leave - if !graceful { - agent.logger.Warn("leave cluster with zero grace") - return nil - } +// Agent starts and manages a Serf & adds service discovery (TODO) +type Agent struct { + // this is the agent's configuration + Config *Config + serfConfig *serf.Config - // Attempt a graceful leave - gracefulCh := make(chan struct{}) - agent.logger.Info("shut down agent gracefully") - go func() { - if err := agent.Leave(); err != nil { - agent.logger.Error("Error: %s", err) - return - } - close(gracefulCh) - }() + // This is the underlying Serf we are wrapping + serf *serf.Serf - // Wait for leave or another signal - select { - case <-signalCh: - return fmt.Errorf("idfk") - case <-time.After(gracefulTimeout): - return fmt.Errorf("leave timed out") - case <-gracefulCh: - return nil - } -} + // We receive serf events on this channel + eventCh chan serf.Event + logger *slog.Logger -func readConfig() *Config { - config := DefaultConfig() - // CASCADE_LOGLEVEL=info - l := os.Getenv("CASCADE_LOGLEVEL") - if l != "" { - config.LogLevel = l - if l == "DEBUG" { - // config. - } - } - // CASCADE_BIND=192.168.0.15:12345 - if os.Getenv("CASCADE_BIND") != "" { - config.BindAddr = os.Getenv("CASCADE_BIND") - } - // CASCADE_JOIN=127.0.0.1,127.0.0.5 - if os.Getenv("CASCADE_JOIN") != "" { - config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",") - } - // CASCADE_NAME=nostromo.j3s.sh - if os.Getenv("CASCADE_NAME") != "" { - config.NodeName = os.Getenv("CASCADE_NAME") - } - return config + // if this channel recieves a signal, cascade shuts down + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex } -func startupJoin(config *Config, agent *Agent) error { - if len(config.StartJoin) == 0 { - return nil - } +// New returns an agent lmao +func New(config *Config) *Agent { + agent := Agent{} + serfConfig := serf.DefaultConfig() + serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() - n, err := agent.Join(config.StartJoin) - if err != nil { - return err - } - if n > 0 { - agent.logger.Info("issue join request", "nodes", n) - } + // XXX: why do serf and cascade use the same event channel? + eventCh := make(chan serf.Event, 1024) + agent.eventCh = eventCh + serfConfig.EventCh = eventCh - return nil -} + // TODO: make log level configurable + // XXX: reconsider this agent-scoped logger? what does consul do? + // logLogger is a bridge from the log.Logger to slog.Logger + h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(os.Stderr) + logger := slog.New(h) + slog.SetDefault(logger) + agent.logger = logger + serfConfig.Logger = slog.NewLogLogger(h, slog.LevelInfo) -func setupAgent(config *Config, logger *slog.Logger) *Agent { - bindIP, bindPort, err := config.AddrParts(config.BindAddr) - if err != nil { - panic(err) - } - - serfConfig := serf.DefaultConfig() - // logLogger is just a bridge from the old logger to the - // new slog handler. - logLogger := slog.NewLogLogger(logger.Handler(), config.GetLogLevel()) - serfConfig.Logger = logLogger - serfConfig.NodeName = config.NodeName - serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) - // TODO: how should cascade handle name conflicts? - // defaulting to just blowing up for now, but - // we _could_ take the tailscale route & append - // -1 or whatever to the node. that would be more user friendly. - // TODO: some of these serf settings were pulled + // TODO: some of these serf settings were cargo-culted // from consul[1]. re-examine them eventually. serfConfig.EnableNameConflictResolution = false serfConfig.LeavePropagateDelay = 3 * time.Second + serfConfig.MemberlistConfig.BindAddr = config.BindAddr.IP.String() + serfConfig.MemberlistConfig.BindPort = config.BindAddr.Port + serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second serfConfig.MinQueueDepth = 4096 + serfConfig.NodeName = config.NodeName + serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) serfConfig.QueueDepthWarning = 1000000 serfConfig.ReconnectTimeout = 3 * 24 * time.Hour - serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() - serfConfig.MemberlistConfig.Logger = logLogger - serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second - serfConfig.MemberlistConfig.BindAddr = bindIP - serfConfig.MemberlistConfig.BindPort = bindPort - - agent := Create(config, serfConfig, logger) - agent.logger.Info("setup cascade agent", "nodename", config.NodeName, "bindport", bindPort, "bindip", bindIP, "loglevel", config.GetLogLevel()) - return agent -} - -// [1]: sources for consul serf tweaks -// https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go -// https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go - -// Agent starts and manages a Serf & adds service discovery (TODO) -type Agent struct { - serfConf *serf.Config - eventCh chan serf.Event - // This is the underlying Serf we are wrapping - serf *serf.Serf - // a lil logger used by the agent, you know how it be - logger *slog.Logger - // this channel receiving a signal = shutdown - shutdown bool - shutdownCh chan struct{} - shutdownLock sync.Mutex -} - -func Create(config *Config, serfConf *serf.Config, logger *slog.Logger) *Agent { - // Create a channel to listen for events from Serf - eventCh := make(chan serf.Event, 64) - serfConf.EventCh = eventCh - - // Setup the agent - agent := &Agent{ - serfConf: serfConf, - eventCh: eventCh, - logger: logger, - shutdownCh: make(chan struct{}), - } - - return agent + agent.Config = config + agent.serfConfig = serfConfig + agent.shutdownCh = make(chan struct{}) + return &agent } func (a *Agent) eventLoop() {@@ -268,9 +139,11 @@ }
} } +// Start creates and starts an agent's serf client, eventloop, api, and more! +// The agent config should not be modified after an agend has been started. func (a *Agent) Start() error { // Create serf first - serf, err := serf.Create(a.serfConf) + serf, err := serf.Create(a.serfConfig) if err != nil { return err }@@ -337,15 +210,6 @@ func (a *Agent) ShutdownCh() <-chan struct{} {
return a.shutdownCh } -func (c *Config) SetupLogger() *slog.Logger { - opts := slog.HandlerOptions{ - Level: c.GetLogLevel(), - } - if c.LogLevel == "DEBUG" { - opts.AddSource = true - } - handler := opts.NewTextHandler(os.Stderr) - logger := slog.New(handler) - slog.SetDefault(logger) - return logger -} +// [1]: sources for consul serf tweaks +// https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go +// https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go
M
agent/config.go
→
agent/config.go
@@ -1,14 +1,12 @@
package agent import ( - "fmt" "net" "os" - - "golang.org/x/exp/slog" ) -const DefaultBindPort int = 4440 +const DefaultBindPort int = 4443 +const DefaultClientPort int = 4443 func DefaultConfig() *Config { hostname, err := os.Hostname()@@ -16,67 +14,18 @@ if err != nil {
panic(err) } - return &Config{ - BindAddr: "0.0.0.0", - ClientAddr: "0.0.0.0", - LogLevel: "INFO", - NodeName: hostname, - } -} + // TODO: figure out how to default the listeners + cfg := Config{} + cfg.BindAddr = &net.TCPAddr{IP: []byte{0, 0, 0, 0}, Port: DefaultBindPort} + cfg.ClientAddr = &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: DefaultClientPort} + cfg.NodeName = hostname -// Services are built on top of serf -// tags. The tag format under the hood -// is service=name=<name>,port=<port> -type Service struct { - name string - port int + return &cfg } type Config struct { - BindAddr string - ClientAddr string - LogLevel string + BindAddr *net.TCPAddr + ClientAddr *net.TCPAddr NodeName string StartJoin []string - Services []Service -} - -// lifted from serf, could be simplified -func (c *Config) AddrParts(address string) (string, int, error) { - checkAddr := address - -START: - _, _, err := net.SplitHostPort(checkAddr) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - checkAddr = fmt.Sprintf("%s:%d", checkAddr, DefaultBindPort) - goto START - } - if err != nil { - return "", 0, err - } - - // Get the address - addr, err := net.ResolveTCPAddr("tcp", checkAddr) - if err != nil { - return "", 0, err - } - - return addr.IP.String(), addr.Port, nil -} - -func (c *Config) GetLogLevel() slog.Level { - var lvl slog.Level - if c.LogLevel == "DEBUG" { - lvl = slog.LevelDebug - } - if c.LogLevel == "INFO" { - lvl = slog.LevelInfo - } - if c.LogLevel == "WARN" || c.LogLevel == "WARNING" { - lvl = slog.LevelWarn - } - if c.LogLevel == "ERR" || c.LogLevel == "ERROR" { - lvl = slog.LevelError - } - return lvl }
M
main.go
→
main.go
@@ -2,19 +2,28 @@ package main
import ( "fmt" + "log" "os" + "os/signal" + "strings" + "syscall" + "time" "git.j3s.sh/cascade/agent" "git.j3s.sh/cascade/list" ) -// TODO: rename agent to something cooler const usage = `cascade agent start a cascade agent cascade list|ls list nodes or services cascade members show serf cluster members cascade rtt estimate latency between nodes ` +// gracefulTimeout controls how long we wait before forcefully terminating +// note that this value interacts with serf's LeavePropagateDelay config +const gracefulTimeout = 10 * time.Second + +// TODO: rename agent to something cooler func main() { if len(os.Args) == 1 { fmt.Fprintf(os.Stderr, "%s", usage)@@ -27,7 +36,7 @@
func run(command string, args []string) { switch command { case "agent": - agent.Run() + handleAgent() case "list", "ls": list.Run(args) default:@@ -35,3 +44,106 @@ fmt.Fprintf(os.Stderr, "'%s' is not a valid command\n\n%s", command, usage)
os.Exit(1) } } + +func handleAgent() { + config := getAgentConfig() + agent := agent.New(config) + if err := agent.Start(); err != nil { + fmt.Println(err) + os.Exit(1) + } + defer agent.Shutdown() + // join any specified startup nodes + if err := startupJoin(agent); err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := handleSignals(agent); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +// handleSignals blocks until we get an exit-causing signal +func handleSignals(agent *agent.Agent) error { + signalCh := make(chan os.Signal, 4) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Wait for a signal + var sig os.Signal + select { + case s := <-signalCh: + sig = s + case <-agent.ShutdownCh(): + // Agent is already shutdown! + return nil + } + fmt.Fprintf(os.Stderr, "caught signal %s", sig) + + // Check if we should do a graceful leave + graceful := false + if sig == os.Interrupt || sig == syscall.SIGTERM { + graceful = true + } + + // Bail fast if not doing a graceful leave + if !graceful { + fmt.Fprintf(os.Stderr, "leave cluster with zero grace") + return nil + } + + // Attempt a graceful leave + gracefulCh := make(chan struct{}) + fmt.Printf("shutting down gracefully") + go func() { + if err := agent.Leave(); err != nil { + fmt.Println("Error: ", err) + return + } + close(gracefulCh) + }() + + // Wait for leave or another signal + select { + case <-signalCh: + return fmt.Errorf("idfk") + case <-time.After(gracefulTimeout): + return fmt.Errorf("leave timed out") + case <-gracefulCh: + return nil + } +} + +func startupJoin(a *agent.Agent) error { + if len(a.Config.StartJoin) == 0 { + return nil + } + + n, err := a.Join(a.Config.StartJoin) + if err != nil { + return err + } + if n > 0 { + log.Printf("issue join request nodes=%d\n", n) + } + + return nil +} + +func getAgentConfig() *agent.Config { + config := agent.DefaultConfig() + // CASCADE_BIND=192.168.0.15:12345 + if os.Getenv("CASCADE_BIND") != "" { + //TODO + // config.BindAddr = os.Getenv("CASCADE_BIND") + } + // CASCADE_JOIN=127.0.0.1,127.0.0.5 + if os.Getenv("CASCADE_JOIN") != "" { + config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",") + } + // CASCADE_NAME=nostromo.j3s.sh + if os.Getenv("CASCADE_NAME") != "" { + config.NodeName = os.Getenv("CASCADE_NAME") + } + return config +}