small pixel drawing of a pufferfish cascade

logging, shutdown, etc
Jes Olson j3s@c3f.net
Fri, 17 Feb 2023 20:39:29 -0800
commit

eec039d450abf878a6f9257dbff6ad14eea0d255

parent

878dba56eedb1a99e98789363b6067cea6b52127

3 files changed, 55 insertions(+), 23 deletions(-)

jump to
M agent.goagent.go

@@ -11,7 +11,7 @@

// Agent starts and manages a Serf & adds // service discovery type Agent struct { - conf *Config +// conf *Config serfConf *serf.Config eventCh chan serf.Event

@@ -29,14 +29,13 @@ shutdownCh chan struct{}

shutdownLock sync.Mutex } -func Create(conf *Config, serfConf *serf.Config) *Agent { +func Create(serfConf *serf.Config) *Agent { // Create a channel to listen for events from Serf eventCh := make(chan serf.Event, 64) serfConf.EventCh = eventCh // Setup the agent agent := &Agent{ - conf: conf, serfConf: serfConf, eventCh: eventCh, shutdownCh: make(chan struct{}),

@@ -51,10 +50,10 @@ serfShutdownCh := a.serf.ShutdownCh()

for { select { case e := <-a.eventCh: - log.Printf("[INFO] agent: Received event: %s", e.String()) + log.Printf("[INFO] cascade: Received event: %s", e.String()) case <-serfShutdownCh: - log.Printf("[WARN] agent: Serf shutdown detected, quitting") + log.Printf("[WARN] cascade: Serf shutdown detected, quitting") a.Shutdown() return

@@ -65,7 +64,7 @@ }

} func (a *Agent) Start() error { - log.Printf("[INFO] agent: Serf agent starting") + log.Printf("[INFO] cascade: Serf agent starting") // Create serf first serf, err := serf.Create(a.serfConf)

@@ -93,30 +92,31 @@ if a.serf == nil {

goto EXIT } - log.Println("[INFO] agent: requesting serf shutdown") + log.Println("[INFO] cascade: requesting serf shutdown") if err := a.serf.Shutdown(); err != nil { return err } EXIT: - log.Println("[INFO] agent: shutdown complete") + log.Println("[INFO] cascade: shutdown complete") a.shutdown = true close(a.shutdownCh) return nil } // Join asks the Serf instance to join. See the Serf.Join function. -func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { - log.Printf("[INFO] agent: joining: %v replay: %v", addrs, replay) - ignoreOld := !replay - n, err = a.serf.Join(addrs, ignoreOld) +func (a *Agent) Join(addrs []string) (n int, err error) { + log.Printf("[INFO] cascade: joining: %v", addrs) + // we always ignore old events because cascade don't + // care about the past + n, err = a.serf.Join(addrs, true) if n > 0 { - log.Printf("[INFO] agent: joined: %d nodes", n) + log.Printf("[INFO] cascade: joined: %d nodes", n) } if err != nil { - log.Printf("[WARN] agent: error joining: %v", err) + log.Printf("[WARN] cascade: error joining: %v", err) } - return + return n, err } // Leave prepares for a graceful shutdown of the agent and its processes

@@ -125,6 +125,6 @@ if a.serf == nil {

return nil } - log.Println("[INFO] agent: requesting graceful leave from Serf") + log.Println("[INFO] cascade: requesting graceful leave from Serf") return a.serf.Leave() }
M config.goconfig.go

@@ -23,6 +23,7 @@ }

type Config struct { BindAddr string + StartJoin []string Services []Service }
M main.gomain.go

@@ -11,6 +11,7 @@

import ( "log" "os" + "strings" "time" "github.com/hashicorp/memberlist"

@@ -18,14 +19,44 @@ "github.com/hashicorp/serf/serf"

) func main() { - c := DefaultConfig() + config := readConfig() + agent := setupAgent(config) + if err := agent.Start(); err != nil { + log.Panic(err) + } + defer agent.Shutdown() + // join any specified startup nodes + if err := startupJoin(config, agent); err != nil { + log.Panic(err) + } +// handleSignals(config, agent) +} + +func readConfig() *Config { + config := DefaultConfig() if os.Getenv("CASCADE_BIND") != "" { - c.BindAddr = os.Getenv("CASCADE_BIND") + config.BindAddr = os.Getenv("CASCADE_BIND") + } + // CASCADE_JOIN=127.0.0.1,127.0.0.5 + if os.Getenv("CASCADE_JOIN") != "" { + config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",") + } + return config +} + +func startupJoin(config *Config, agent *Agent) error { + if len(config.StartJoin) == 0 { + return nil } - // agent does everything tbh - agent := setupAgent(c) - log.Println(agent) - // agent.Run() + + log.Printf("Joining cluster...") + n, err := agent.Join(config.StartJoin) + if err != nil { + return err + } + + log.Printf("Join completed. Synced with %d initial agents", n) + return nil } func setupAgent(config *Config) *Agent {

@@ -60,6 +91,6 @@ serfConfig.MemberlistConfig = memberlist.DefaultWANConfig()

serfConfig.MemberlistConfig.BindAddr = bindIP serfConfig.MemberlistConfig.BindPort = bindPort - agent := Create(config, serfConfig) + agent := Create(serfConfig) return agent }