small pixel drawing of a pufferfish cascade

use global logger
Jes Olson j3s@c3f.net
Sat, 18 Feb 2023 12:42:55 -0800
commit

2bf8d03280225626fb125ae2f499d01bef339916

parent

3766ee81b160ab5ebc2dbccb531ecd0ed5c7333d

5 files changed, 92 insertions(+), 39 deletions(-)

jump to
M agent.goagent.go

@@ -2,27 +2,27 @@ package main

import ( "fmt" - "log" "sync" "github.com/hashicorp/serf/serf" + "golang.org/x/exp/slog" ) // Agent starts and manages a Serf & adds service discovery (TODO) type Agent struct { serfConf *serf.Config eventCh chan serf.Event - // This is the underlying Serf we are wrapping serf *serf.Serf - + // a lil logger used by the agent, you know how it be + logger *slog.Logger // this channel receiving a signal = shutdown shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex } -func Create(serfConf *serf.Config) *Agent { +func Create(config *Config, serfConf *serf.Config, logger *slog.Logger) *Agent { // Create a channel to listen for events from Serf eventCh := make(chan serf.Event, 64) serfConf.EventCh = eventCh

@@ -31,6 +31,7 @@ // Setup the agent

agent := &Agent{ serfConf: serfConf, eventCh: eventCh, + logger: logger, shutdownCh: make(chan struct{}), }

@@ -42,10 +43,10 @@ serfShutdownCh := a.serf.ShutdownCh()

for { select { case e := <-a.eventCh: - log.Printf("[INFO] cascade: Received event: %s", e.String()) + a.logger.Info("receive event", "event", e.String()) case <-serfShutdownCh: - log.Printf("[INFO] cascade: serf shutdown detected, quitting eventloop") + a.logger.Info("detect serf shutdown, turning off event loop") a.Shutdown() return

@@ -56,12 +57,12 @@ }

} func (a *Agent) Start() error { - log.Printf("[INFO] cascade: Serf agent starting") + a.logger.Info("start serf agent") // Create serf first serf, err := serf.Create(a.serfConf) if err != nil { - return fmt.Errorf("Error creating Serf: %s", err) + return err } a.serf = serf

@@ -84,13 +85,13 @@ if a.serf == nil {

goto EXIT } - log.Println("[INFO] cascade: requesting serf shutdown") + a.logger.Info("request serf shutdown") if err := a.serf.Shutdown(); err != nil { return err } EXIT: - log.Println("[INFO] cascade: shutdown complete") + a.logger.Info("complete serf shutdown") a.shutdown = true close(a.shutdownCh) return nil

@@ -98,12 +99,12 @@ }

// Join asks the Serf instance to join. See the Serf.Join function. func (a *Agent) Join(addrs []string) (n int, err error) { - log.Printf("[INFO] cascade: issuing join request to '%v'", addrs) + a.logger.Info("issue join request", "addrs", addrs) // we always ignore old events because cascade don't // care about the past n, err = a.serf.Join(addrs, true) if err != nil { - return n, fmt.Errorf("[ERR] cascade: error joining: %v\n", err) + return n, fmt.Errorf("Error joining: %v\n", err) } // TODO: when joining fails, we don't get an error here - serf & memberlist // just print to stdout and serf.Join returns without issue.

@@ -116,7 +117,7 @@ if a.serf == nil {

return nil } - log.Println("[INFO] cascade: requesting graceful leave from Serf") + a.logger.Info("request graceful serf leave") return a.serf.Leave() }
M config.goconfig.go

@@ -4,6 +4,8 @@ import (

"fmt" "net" "os" + + "golang.org/x/exp/slog" ) const DefaultBindPort int = 4449

@@ -16,6 +18,7 @@ }

return &Config{ BindAddr: "0.0.0.0", + LogLevel: "INFO", NodeName: hostname, } }

@@ -30,6 +33,7 @@ }

type Config struct { BindAddr string + LogLevel string NodeName string StartJoin []string Services []Service

@@ -57,3 +61,20 @@ }

return addr.IP.String(), addr.Port, nil } + +func (c *Config) GetLogLevel() slog.Level { + var lvl slog.Level + if c.LogLevel == "DEBUG" { + lvl = slog.LevelDebug + } + if c.LogLevel == "INFO" { + lvl = slog.LevelInfo + } + if c.LogLevel == "WARN" || c.LogLevel == "WARNING" { + lvl = slog.LevelWarn + } + if c.LogLevel == "ERR" || c.LogLevel == "ERROR" { + lvl = slog.LevelError + } + return lvl +}
M go.modgo.mod

@@ -2,7 +2,11 @@ module git.j3s.sh/cascade

go 1.20 -require github.com/hashicorp/serf v0.10.1 +require ( + github.com/hashicorp/memberlist v0.5.0 + github.com/hashicorp/serf v0.10.1 + golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb +) require ( github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect

@@ -13,9 +17,8 @@ github.com/hashicorp/go-msgpack v0.5.3 // indirect

github.com/hashicorp/go-multierror v1.1.0 // indirect github.com/hashicorp/go-sockaddr v1.0.0 // indirect github.com/hashicorp/golang-lru v0.5.0 // indirect - github.com/hashicorp/memberlist v0.5.0 // indirect github.com/miekg/dns v1.1.41 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 // indirect - golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect + golang.org/x/sys v0.1.0 // indirect )
M go.sumgo.sum

@@ -56,6 +56,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=

github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= +golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w= +golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=

@@ -77,6 +79,8 @@ golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
M main.gomain.go

@@ -10,7 +10,6 @@ package main

import ( "fmt" - "log" "os" "os/signal" "strings"

@@ -19,6 +18,7 @@ "time"

"github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" + "golang.org/x/exp/slog" ) const (

@@ -29,28 +29,42 @@ )

func main() { config := readConfig() - agent := setupAgent(config) + logger := setupLogger(config) + agent := setupAgent(config, logger) if err := agent.Start(); err != nil { - log.Panic(err) + panic(err) } defer agent.Shutdown() // join any specified startup nodes if err := startupJoin(config, agent); err != nil { - log.Fatal(err) + panic(err) } - if os.Getenv("CASCADE_DEBUG") != "" { - go debugPrints(agent) + go debugPrints(agent) + if err := handleSignals(config, agent); err != nil { + panic(err) + } +} + +func setupLogger(config *Config) *slog.Logger { + debugOn := false + if config.LogLevel == "DEBUG" { + debugOn = true } - if err := handleSignals(config, agent); err != nil { - log.Panic(err) + opts := slog.HandlerOptions{ + Level: config.GetLogLevel(), + AddSource: debugOn, } + handler := opts.NewTextHandler(os.Stderr) + logger := slog.New(handler) + slog.SetDefault(logger) + return logger } -func debugPrints(agent *Agent) { + +func debugPrints(a *Agent) { for { - for _, m := range agent.serf.Members() { - log.Printf("[DEBUG] cascade: name '%s' addr '%s' status '%s' \n", - m.Name, m.Addr, m.Status) + for _, m := range a.serf.Members() { + a.logger.Debug("debug-loop", "name", m.Name, "addr", m.Addr, "status", m.Status) } time.Sleep(time.Second * 5) }

@@ -70,7 +84,7 @@ case <-agent.ShutdownCh():

// Agent is already shutdown! return nil } - log.Printf("Caught signal: %v", sig) + agent.logger.Info("caught signal", "signal", sig) // Check if we should do a graceful leave graceful := false

@@ -80,15 +94,16 @@ }

// Bail fast if not doing a graceful leave if !graceful { - log.Fatal("[WARN] cascade: non-graceful leave detected") + agent.logger.Warn("leave cluster with zero grace") + return nil } // Attempt a graceful leave gracefulCh := make(chan struct{}) - log.Println("Gracefully shutting down agent...") + agent.logger.Info("shut down agent gracefully") go func() { if err := agent.Leave(); err != nil { - log.Printf("Error: %s", err) + agent.logger.Error("Error: %s", err) return } close(gracefulCh)

@@ -97,9 +112,9 @@

// Wait for leave or another signal select { case <-signalCh: - return fmt.Errorf("[WARN] cascade: idk") + return fmt.Errorf("idfk") case <-time.After(gracefulTimeout): - return fmt.Errorf("[WARN] cascade: leave timed out") + return fmt.Errorf("leave timed out") case <-gracefulCh: return nil }

@@ -107,7 +122,11 @@ }

func readConfig() *Config { config := DefaultConfig() - // CASCADE_BIND=127.0.0.1:1234 + // CASCADE_LOGLEVEL=info + if os.Getenv("CASCADE_LOGLEVEL") != "" { + config.LogLevel = strings.ToUpper(os.Getenv("CASCADE_LOGLEVEL")) + } + // CASCADE_BIND=192.168.0.15:12345 if os.Getenv("CASCADE_BIND") != "" { config.BindAddr = os.Getenv("CASCADE_BIND") }

@@ -132,19 +151,23 @@ if err != nil {

return err } if n > 0 { - log.Printf("[INFO] cascade: join request issued to %d nodes", n) + agent.logger.Info("issue join request", "nodes", n) } return nil } -func setupAgent(config *Config) *Agent { +func setupAgent(config *Config, logger *slog.Logger) *Agent { bindIP, bindPort, err := config.AddrParts(config.BindAddr) if err != nil { - log.Panic(err) + panic(err) } serfConfig := serf.DefaultConfig() + // logLogger is just a bridge from the old logger to the + // new slog handler. + logLogger := slog.NewLogLogger(logger.Handler(), config.GetLogLevel()) + serfConfig.Logger = logLogger serfConfig.NodeName = config.NodeName serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) // TODO: how should cascade handle name conflicts?

@@ -160,11 +183,12 @@ serfConfig.QueueDepthWarning = 1000000

serfConfig.ReconnectTimeout = 3 * 24 * time.Hour serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() + serfConfig.MemberlistConfig.Logger = logLogger serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second serfConfig.MemberlistConfig.BindAddr = bindIP serfConfig.MemberlistConfig.BindPort = bindPort - agent := Create(serfConfig) + agent := Create(config, serfConfig, logger) return agent }