pretty things up
Jes Olson j3s@c3f.net
Fri, 17 Feb 2023 19:59:19 -0800
M
agent.go
→
agent.go
@@ -1,11 +1,10 @@
package main import ( + "fmt" "log" "sync" - "time" - "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" )@@ -46,43 +45,6 @@
return agent } -func setupAgent(config *Config) *Agent { - bindIP, bindPort, err := config.AddrParts(config.BindAddr) - if err != nil { - log.Panic(err) - } - - serfConfig := serf.DefaultConfig() - serfConfig.MemberlistConfig.BindAddr = bindIP - serfConfig.MemberlistConfig.BindPort = bindPort - serfConfig.MemberlistConfig.AdvertiseAddr = "" - serfConfig.MemberlistConfig.AdvertisePort = 0 - serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) - serfConfig.CoalescePeriod = 3 * time.Second - serfConfig.QuiescentPeriod = time.Second - serfConfig.QueryResponseSizeLimit = 1024 - serfConfig.QuerySizeLimit = 1024 - serfConfig.UserEventSizeLimit = 512 - serfConfig.UserCoalescePeriod = 3 * time.Second - serfConfig.UserQuiescentPeriod = time.Second - // TODO: look at reconnect/tombstone settings w more scrutiny - serfConfig.ReconnectInterval = 0 - serfConfig.ReconnectTimeout = 0 - serfConfig.TombstoneTimeout = 0 - serfConfig.BroadcastTimeout = 0 - // TODO: what are the implications of true here o_O - serfConfig.EnableNameConflictResolution = true - - // hardcode DefaultWANConfig because cascade is designed to be - // used as a single global system. - serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() - serfConfig.MemberlistConfig.BindAddr = bindIP - serfConfig.MemberlistConfig.BindPort = bindPort - - agent := Create(config, serfConfig) - - return agent -} func (a *Agent) eventLoop() { serfShutdownCh := a.serf.ShutdownCh()@@ -102,6 +64,21 @@ }
} } +func (a *Agent) Start() error { + log.Printf("[INFO] agent: Serf agent starting") + + // Create serf first + serf, err := serf.Create(a.serfConf) + if err != nil { + return fmt.Errorf("Error creating Serf: %s", err) + } + a.serf = serf + + // Start event loop + go a.eventLoop() + return nil +} + // Shutdown closes this agent and all of its processes. Should be preceded // by a Leave for a graceful shutdown. func (a *Agent) Shutdown() error {@@ -127,3 +104,27 @@ a.shutdown = true
close(a.shutdownCh) return nil } + +// Join asks the Serf instance to join. See the Serf.Join function. +func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { + log.Printf("[INFO] agent: joining: %v replay: %v", addrs, replay) + ignoreOld := !replay + n, err = a.serf.Join(addrs, ignoreOld) + if n > 0 { + log.Printf("[INFO] agent: joined: %d nodes", n) + } + if err != nil { + log.Printf("[WARN] agent: error joining: %v", err) + } + return +} + +// Leave prepares for a graceful shutdown of the agent and its processes +func (a *Agent) Leave() error { + if a.serf == nil { + return nil + } + + log.Println("[INFO] agent: requesting graceful leave from Serf") + return a.serf.Leave() +}
M
main.go
→
main.go
@@ -11,6 +11,10 @@
import ( "log" "os" + "time" + + "github.com/hashicorp/memberlist" + "github.com/hashicorp/serf/serf" ) func main() {@@ -23,3 +27,39 @@ agent := setupAgent(c)
log.Println(agent) // agent.Run() } + +func setupAgent(config *Config) *Agent { + bindIP, bindPort, err := config.AddrParts(config.BindAddr) + if err != nil { + log.Panic(err) + } + serfConfig := serf.DefaultConfig() + serfConfig.MemberlistConfig.BindAddr = bindIP + serfConfig.MemberlistConfig.BindPort = bindPort + serfConfig.MemberlistConfig.AdvertiseAddr = "" + serfConfig.MemberlistConfig.AdvertisePort = 0 + serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax) + serfConfig.CoalescePeriod = 3 * time.Second + serfConfig.QuiescentPeriod = time.Second + serfConfig.QueryResponseSizeLimit = 1024 + serfConfig.QuerySizeLimit = 1024 + serfConfig.UserEventSizeLimit = 512 + serfConfig.UserCoalescePeriod = 3 * time.Second + serfConfig.UserQuiescentPeriod = time.Second + // TODO: look at reconnect/tombstone settings w more scrutiny + serfConfig.ReconnectInterval = 0 + serfConfig.ReconnectTimeout = 0 + serfConfig.TombstoneTimeout = 0 + serfConfig.BroadcastTimeout = 0 + // TODO: what are the implications of true here o_O + serfConfig.EnableNameConflictResolution = true + + // hardcode DefaultWANConfig because cascade is designed to be + // used as a single global system. + serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() + serfConfig.MemberlistConfig.BindAddr = bindIP + serfConfig.MemberlistConfig.BindPort = bindPort + + agent := Create(config, serfConfig) + return agent +}