small pixel drawing of a pufferfish cascade

THEY JOIN!
Jes Olson j3s@c3f.net
Fri, 17 Feb 2023 21:05:27 -0800
commit

c816221ade03a4019d605c91950cd81241aa7c7d

parent

eec039d450abf878a6f9257dbff6ad14eea0d255

4 files changed, 77 insertions(+), 7 deletions(-)

jump to
M .gitignore.gitignore

@@ -1,1 +1,1 @@

-,* +cascade
M agent.goagent.go

@@ -128,3 +128,9 @@

log.Println("[INFO] cascade: requesting graceful leave from Serf") return a.serf.Leave() } + +// ShutdownCh returns a channel that can be selected to wait +// for the agent to perform a shutdown. +func (a *Agent) ShutdownCh() <-chan struct{} { + return a.shutdownCh +}
M config.goconfig.go

@@ -23,6 +23,7 @@ }

type Config struct { BindAddr string + NodeName string StartJoin []string Services []Service }
M main.gomain.go

@@ -9,15 +9,23 @@

package main import ( + "fmt" "log" "os" + "os/signal" "strings" + "syscall" "time" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" ) +const ( + // gracefulTimeout controls how long we wait before forcefully terminating + gracefulTimeout = 3 * time.Second +) + func main() { config := readConfig() agent := setupAgent(config)

@@ -29,11 +37,64 @@ // join any specified startup nodes

if err := startupJoin(config, agent); err != nil { log.Panic(err) } -// handleSignals(config, agent) + if err := handleSignals(config, agent); err != nil { + log.Panic(err) + } +} + +// handleSignals blocks until we get an exit-causing signal +func handleSignals(config *Config, agent *Agent) error { + signalCh := make(chan os.Signal, 4) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Wait for a signal + var sig os.Signal + select { + case s := <-signalCh: + sig = s + case <-agent.ShutdownCh(): + // Agent is already shutdown! + return nil + } + log.Printf("Caught signal: %v", sig) + + // Check if we should do a graceful leave + graceful := false + if sig == os.Interrupt || sig == syscall.SIGTERM { + graceful = true + } + + // Bail fast if not doing a graceful leave + if !graceful { + log.Printf("[WARN] cascade: non-graceful leave detected") + return nil + } + + // Attempt a graceful leave + gracefulCh := make(chan struct{}) + log.Println("Gracefully shutting down agent...") + go func() { + if err := agent.Leave(); err != nil { + log.Printf("Error: %s", err) + return + } + close(gracefulCh) + }() + + // Wait for leave or another signal + select { + case <-signalCh: + return fmt.Errorf("[WARN] cascade: idk") + case <-time.After(gracefulTimeout): + return fmt.Errorf("[WARN] cascade: leave timed out") + case <-gracefulCh: + return nil + } } func readConfig() *Config { config := DefaultConfig() + // CASCADE_BIND=127.0.0.1:1234 if os.Getenv("CASCADE_BIND") != "" { config.BindAddr = os.Getenv("CASCADE_BIND") }

@@ -41,6 +102,10 @@ // CASCADE_JOIN=127.0.0.1,127.0.0.5

if os.Getenv("CASCADE_JOIN") != "" { config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",") } + // CASCADE_NAME=nostromo.j3s.sh + if os.Getenv("CASCADE_NAME") != "" { + config.NodeName = os.Getenv("CASCADE_NAME") + } return config }

@@ -65,10 +130,7 @@ if err != nil {

log.Panic(err) } serfConfig := serf.DefaultConfig() - serfConfig.MemberlistConfig.BindAddr = bindIP - serfConfig.MemberlistConfig.BindPort = bindPort - serfConfig.MemberlistConfig.AdvertiseAddr = "" - serfConfig.MemberlistConfig.AdvertisePort = 0 + serfConfig.NodeName = config.NodeName serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) serfConfig.CoalescePeriod = 3 * time.Second serfConfig.QuiescentPeriod = time.Second

@@ -84,12 +146,13 @@ serfConfig.TombstoneTimeout = 0

serfConfig.BroadcastTimeout = 0 // TODO: what are the implications of true here o_O serfConfig.EnableNameConflictResolution = true - // hardcode DefaultWANConfig because cascade is designed to be // used as a single global system. serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() serfConfig.MemberlistConfig.BindAddr = bindIP serfConfig.MemberlistConfig.BindPort = bindPort + serfConfig.MemberlistConfig.AdvertiseAddr = "" + serfConfig.MemberlistConfig.AdvertisePort = 0 agent := Create(serfConfig) return agent