small pixel drawing of a pufferfish cascade

reorganize everything for my sanity
Jes Olson j3s@c3f.net
Mon, 06 Mar 2023 21:57:09 -0800
commit

9adc0c6c1fc0becd3f6f73cf35144bebf1592bc4

parent

485812f5ede798d62cd842a726abf6250a23fe70

D command/agent/agent.go

@@ -1,232 +0,0 @@

-package agent - -import ( - "flag" - "fmt" - "log" - "net" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - - "git.j3s.sh/cascade/agent" -) - -// gracefulTimeout controls how long we wait before forcefully terminating -// note that this value interacts with serf's LeavePropagateDelay config -const gracefulTimeout = 10 * time.Second - -const usage = `cascade agent [options] - - this command starts the cascade agent, which is responsible - for basically everything, including service registration, - health checking, cluster membership, and hosting the API. - -options: - -bind-dns=<addr> - address the DNS server binds to (default = 127.0.0.1:8600) - - -bind-http=<addr> - address the http api/interace binds to (default = 127.0.0.1:8500) - - -bind-serf=<addr> - address the serf agent binds to (default = 0.0.0.0:8301) - - -join=<addrs> - comma-separated address of agents to join at start time (default = nil) - - -node=<name> - name of this node, must be globally unique (default = hostname) -` - -type Flags struct { - bindDNS string - bindHTTP string - bindSerf string - join string - node string -} - -var agentFlags Flags - -func Run(args []string) { - flags := flag.NewFlagSet("agent", flag.ContinueOnError) - flags.Usage = func() { fmt.Printf(usage) } - flags.StringVar(&agentFlags.bindDNS, "bind-dns", "", "") - flags.StringVar(&agentFlags.bindHTTP, "bind-http", "", "") - flags.StringVar(&agentFlags.bindSerf, "bind-serf", "", "") - flags.StringVar(&agentFlags.join, "join", "", "") - flags.StringVar(&agentFlags.node, "node", "", "") - if err := flags.Parse(args); err != nil { - os.Exit(1) - } - - config, err := getAgentConfig() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - agent := agent.New(config) - - fmt.Printf("-> starting cascade agent\n") - fmt.Printf(" node '%s'\n", config.NodeName) - if len(config.StartJoin) > 0 { - fmt.Printf(" join '%s'\n", config.StartJoin) - } - fmt.Printf(" bind addrs:\n") - fmt.Printf(" dns '%s'\n", config.DNSBindAddr) - fmt.Printf(" http '%s'\n", config.HTTPBindAddr) - fmt.Printf(" serf '%s'\n", config.SerfBindAddr) - fmt.Printf("\n-> logs\n") - - if err := agent.Start(); err != nil { - fmt.Println(err) - os.Exit(1) - } - defer agent.Shutdown() - - if err := startupJoin(agent); err != nil { - fmt.Println(err) - os.Exit(1) - } - if err := handleSignals(agent); err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -// handleSignals blocks until we get an exit-causing signal -func handleSignals(agent *agent.Agent) error { - signalCh := make(chan os.Signal, 4) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - // Wait for a signal - var sig os.Signal - select { - case s := <-signalCh: - sig = s - case <-agent.ShutdownCh(): - // Agent is already shutdown! - return nil - } - fmt.Fprintf(os.Stderr, "caught signal %s", sig) - - // Check if we should do a graceful leave - graceful := false - if sig == os.Interrupt || sig == syscall.SIGTERM { - graceful = true - } - - // Bail fast if not doing a graceful leave - if !graceful { - fmt.Fprintf(os.Stderr, "leave cluster with zero grace") - return nil - } - - // Attempt a graceful leave - gracefulCh := make(chan struct{}) - fmt.Printf("shutting down gracefully") - go func() { - if err := agent.Leave(); err != nil { - fmt.Println("Error: ", err) - return - } - close(gracefulCh) - }() - - // Wait for leave or another signal - select { - case <-signalCh: - return fmt.Errorf("idfk") - case <-time.After(gracefulTimeout): - return fmt.Errorf("leave timed out") - case <-gracefulCh: - return nil - } -} - -func startupJoin(a *agent.Agent) error { - if len(a.Config.StartJoin) == 0 { - return nil - } - - n, err := a.Join(a.Config.StartJoin) - if err != nil { - return err - } - if n > 0 { - log.Printf("issue join request nodes=%d\n", n) - } - - return nil -} - -// getAgentConfig takes a default agent config and modifies it based -// on user specified flags. It also does some a little input validation. -func getAgentConfig() (*agent.Config, error) { - config := agent.DefaultConfig() - - if agentFlags.bindDNS != "" { - if err := parseFlagAddress(agentFlags.bindDNS, config.DNSBindAddr); err != nil { - return nil, err - } - } - if agentFlags.bindHTTP != "" { - if err := parseFlagAddress(agentFlags.bindHTTP, config.HTTPBindAddr); err != nil { - return nil, err - } - } - if agentFlags.bindSerf != "" { - if err := parseFlagAddress(agentFlags.bindSerf, config.SerfBindAddr); err != nil { - return nil, err - } - } - if agentFlags.join != "" { - // TODO: moar validation - config.StartJoin = strings.Split(agentFlags.join, ",") - } - - if agentFlags.node != "" { - config.NodeName = agentFlags.node - } - return config, nil -} - -// parseFlagAddress takes a colon-delimited host:port pair as a string, parses -// out the ip (and optionally, the port), and modifies the passed TCPAddr with -// the resulting values. -func parseFlagAddress(hostPort string, tcpAddr *net.TCPAddr) error { - addr, portStr, err := net.SplitHostPort(hostPort) - if err != nil { - if !strings.Contains(err.Error(), "missing port in address") { - return fmt.Errorf("Error parsing address: %v", err) - } - - // If we get a missing port error, we try to coerce the whole hostPort - // into an address. This allows the user to supply just a host address - // instead of always requiring a host:ip pair. - addr = hostPort - } - - if addr == "" { - return fmt.Errorf("Error parsing blank address") - } - ip := net.ParseIP(addr) - if ip == nil { - return fmt.Errorf("Error parsing address %q: not a valid IP address", ip) - } - - if portStr != "" { - port, err := strconv.Atoi(portStr) - if err != nil { - return fmt.Errorf("Error parsing port: %s", err) - } - tcpAddr.Port = port - } - tcpAddr.IP = ip - - return nil -}
M command/agent/agent_test.gointernal/cli/agent_test.go

@@ -1,10 +1,10 @@

-package agent +package cli import ( "strings" "testing" - "git.j3s.sh/cascade/agent" + "git.j3s.sh/cascade/internal/agent" ) func TestParseFlagAddress(t *testing.T) {
D command/ls/ls.go

@@ -1,28 +0,0 @@

-package ls - -import ( - "fmt" - "os" - - "git.j3s.sh/cascade/command/ls/members" -) - -const usage = `cascade ls members -cascade ls nodes -cascade ls services -` - -func Run(args []string) { - if len(args) == 0 { - fmt.Printf(usage) - os.Exit(1) - } - subcommand := args[0] - switch subcommand { - case "members": - members.Run(args[1:]) - default: - fmt.Fprintf(os.Stderr, "'%s' is not a valid subcommand\n\n%s", subcommand, usage) - os.Exit(1) - } -}
D command/ls/members/members.go

@@ -1,84 +0,0 @@

-package members - -import ( - "bytes" - "flag" - "fmt" - "os" - "strings" - "text/tabwriter" - - "git.j3s.sh/cascade/api" -) - -const usage = `cascade ls members [options] - - this command lists the members of the cascade serf - cluster. - -options: - - -api - address of the api to target (default = 127.0.0.1:8500) - - -l | -details - show more details (default = nil) -` - -type Flags struct { - apiAddr string - details bool -} - -var membersFlags Flags - -func Run(args []string) { - flags := flag.NewFlagSet("agent", flag.ContinueOnError) - flags.Usage = func() { fmt.Printf(usage) } - flags.StringVar(&membersFlags.apiAddr, "api", "", "") - flags.BoolVar(&membersFlags.details, "l", false, "") - flags.BoolVar(&membersFlags.details, "details", false, "") - flags.Parse(args) - - cfg := api.DefaultConfig() - client, err := api.NewClient(cfg) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - members, err := client.Agent().Members() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - if membersFlags.details { - var b bytes.Buffer - tw := tabwriter.NewWriter(&b, 0, 2, 2, ' ', 0) - fmt.Fprintf(tw, "node\taddr\tstatus\ttags\n") - for _, m := range members { - fmt.Fprintf(tw, "%s\t", m.Name) - fmt.Fprintf(tw, "%s:%d\t", m.Addr, m.Port) - fmt.Fprintf(tw, "%s\t", m.StatusPretty()) - fmt.Fprintf(tw, "%s\t", printTags(m.Tags)) - fmt.Fprintln(tw) - } - if err := tw.Flush(); err != nil { - fmt.Printf("error flushing tabwriter: %s", err) - os.Exit(1) - } - fmt.Print(b.String()) - } else { - for _, m := range members { - fmt.Println(m.Name) - } - } -} - -func printTags(tags map[string]string) string { - var results []string - for k, v := range tags { - results = append(results, fmt.Sprintf("%s=%s", k, v)) - } - return strings.Join(results, ",") -}
M command/ls/nodes/nodes.gointernal/cli/services.go

@@ -1,4 +1,4 @@

-package nodes +package cli import ( "fmt"
D command/ls/services/services.go

@@ -1,14 +0,0 @@

-package services - -import ( - "fmt" -) - -const usage = `cascade list|ls nodes -cascade list|ls members -cascade list|ls services -` - -func Run(args []string) { - fmt.Printf("%+v", usage) -}
A internal/cli/agent.go

@@ -0,0 +1,206 @@

+package cli + +import ( + "flag" + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "git.j3s.sh/cascade/internal/agent" +) + +type agentCommand struct { + // gracefulTimeout controls how long we wait before forcefully terminating + // note that this value interacts with serf's LeavePropagateDelay config + gracefulTimeout time.Duration + + flagBindDNS string + flagBindHTTP string + flagBindSerf string + flagJoin string + flagNode string +} + +func (c *agentCommand) Usage() { + fmt.Print( + `usage: cascade agent [flags] + + this command starts the cascade agent, which is responsible + for basically everything, including service registration, + health checking, cluster membership, and hosting the API. + +flags: + -bind-dns=<addr> + address the DNS server binds to (default = 127.0.0.1:8600) + + -bind-http=<addr> + address the http api/interace binds to (default = 127.0.0.1:8500) + + -bind-serf=<addr> + address the serf agent binds to (default = 0.0.0.0:8301) + + -join=<addrs> + comma-separated address of agents to join at start time (default = nil) + + -node=<name> + name of this node, must be globally unique (default = hostname) +`) +} + +func (c *agentCommand) Init(args []string) agentCommand { + c.gracefulTimeout = 10 * time.Second + + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.Usage = c.Usage + flags.StringVar(&c.flagBindDNS, "bind-dns", "", "") + flags.StringVar(&c.flagBindHTTP, "bind-http", "", "") + flags.StringVar(&c.flagBindSerf, "bind-serf", "", "") + flags.StringVar(&c.flagJoin, "join", "", "") + flags.StringVar(&c.flagNode, "node", "", "") + + if err := flags.Parse(args); err != nil { + fmt.Println(err) + os.Exit(1) + } + + return *c +} + +func RunAgent(args []string) { + c := agentCommand{} + c.Init(args) + + config, err := c.getAgentConfig() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + agent := agent.New(config) + + fmt.Printf("-> starting cascade agent\n") + fmt.Printf(" node '%s'\n", config.NodeName) + if len(config.StartJoin) > 0 { + fmt.Printf(" join '%s'\n", config.StartJoin) + } + fmt.Printf(" bind addrs:\n") + fmt.Printf(" dns '%s'\n", config.DNSBindAddr) + fmt.Printf(" http '%s'\n", config.HTTPBindAddr) + fmt.Printf(" serf '%s'\n", config.SerfBindAddr) + fmt.Printf("\n-> logs\n") + + if err := agent.Start(); err != nil { + fmt.Println(err) + os.Exit(1) + } + defer agent.Shutdown() + + if err := c.startupJoin(agent); err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := c.handleSignals(agent); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +// handleSignals blocks until we get an exit-causing signal +func (c agentCommand) handleSignals(agent *agent.Agent) error { + signalCh := make(chan os.Signal, 4) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Wait for a signal + var sig os.Signal + select { + case s := <-signalCh: + sig = s + case <-agent.ShutdownCh(): + // Agent is already shutdown! + return nil + } + fmt.Fprintf(os.Stderr, "caught signal %s", sig) + + // Check if we should do a graceful leave + graceful := false + if sig == os.Interrupt || sig == syscall.SIGTERM { + graceful = true + } + + // Bail fast if not doing a graceful leave + if !graceful { + fmt.Fprintf(os.Stderr, "leave cluster with zero grace") + return nil + } + + // Attempt a graceful leave + gracefulCh := make(chan struct{}) + fmt.Printf("shutting down gracefully") + go func() { + if err := agent.Leave(); err != nil { + fmt.Println("Error: ", err) + return + } + close(gracefulCh) + }() + + // Wait for leave or another signal + select { + case <-signalCh: + return fmt.Errorf("idfk") + case <-time.After(c.gracefulTimeout): + return fmt.Errorf("leave timed out") + case <-gracefulCh: + return nil + } +} + +func (c agentCommand) startupJoin(a *agent.Agent) error { + if len(a.Config.StartJoin) == 0 { + return nil + } + + n, err := a.Join(a.Config.StartJoin) + if err != nil { + return err + } + if n > 0 { + log.Printf("issue join request nodes=%d\n", n) + } + + return nil +} + +// getAgentConfig takes a default agent config and modifies it based +// on user specified flags. It also does some a little input validation. +func (c agentCommand) getAgentConfig() (*agent.Config, error) { + config := agent.DefaultConfig() + + if c.flagBindDNS != "" { + if err := parseFlagAddress(c.flagBindDNS, config.DNSBindAddr); err != nil { + return nil, err + } + } + if c.flagBindHTTP != "" { + if err := parseFlagAddress(c.flagBindHTTP, config.HTTPBindAddr); err != nil { + return nil, err + } + } + if c.flagBindSerf != "" { + if err := parseFlagAddress(c.flagBindSerf, config.SerfBindAddr); err != nil { + return nil, err + } + } + if c.flagJoin != "" { + // TODO: moar validation + config.StartJoin = strings.Split(c.flagJoin, ",") + } + + if c.flagNode != "" { + config.NodeName = c.flagNode + } + return config, nil +}
A internal/cli/members.go

@@ -0,0 +1,91 @@

+package cli + +import ( + "bytes" + "flag" + "fmt" + "os" + "strings" + "text/tabwriter" + + "git.j3s.sh/cascade/api" +) + +type membersCommand struct { + usage string + + flagAPIAddr string + flagDetails bool +} + +func (c membersCommand) Usage() { + fmt.Printf(`usage: cascade members [flags] + list the members of the cascade serf cluster + +flags: + -api + address of the cascade http api to target (default = 127.0.0.1:8500) + + -details | -l + show more details (default = nil) +`) +} + +func (c *membersCommand) Init(args []string) { + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.Usage = c.Usage + flags.StringVar(&c.flagAPIAddr, "api", "", "") + flags.BoolVar(&c.flagDetails, "details", false, "") + flags.BoolVar(&c.flagDetails, "l", false, "") + if err := flags.Parse(args); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func RunMembers(args []string) { + c := membersCommand{} + c.Init(args) + + cfg := api.DefaultConfig() + client, err := api.NewClient(cfg) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + members, err := client.Agent().Members() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + if c.flagDetails { + var b bytes.Buffer + tw := tabwriter.NewWriter(&b, 0, 2, 2, ' ', 0) + fmt.Fprintf(tw, "node\taddr\tstatus\ttags\n") + for _, m := range members { + fmt.Fprintf(tw, "%s\t", m.Name) + fmt.Fprintf(tw, "%s:%d\t", m.Addr, m.Port) + fmt.Fprintf(tw, "%s\t", m.StatusPretty()) + fmt.Fprintf(tw, "%s\t", c.printTags(m.Tags)) + fmt.Fprintln(tw) + } + if err := tw.Flush(); err != nil { + fmt.Printf("error flushing tabwriter: %s", err) + os.Exit(1) + } + fmt.Print(b.String()) + } else { + for _, m := range members { + fmt.Println(m.Name) + } + } +} + +func (c membersCommand) printTags(tags map[string]string) string { + var results []string + for k, v := range tags { + results = append(results, fmt.Sprintf("%s=%s", k, v)) + } + return strings.Join(results, ",") +}
A internal/cli/utils.go

@@ -0,0 +1,44 @@

+package cli + +import ( + "fmt" + "net" + "strconv" + "strings" +) + +// parseFlagAddress takes a colon-delimited host:port pair as a string, parses +// out the ip (and optionally, the port), and modifies the passed TCPAddr with +// the resulting values. +func parseFlagAddress(hostPort string, tcpAddr *net.TCPAddr) error { + addr, portStr, err := net.SplitHostPort(hostPort) + if err != nil { + if !strings.Contains(err.Error(), "missing port in address") { + return fmt.Errorf("Error parsing address: %v", err) + } + + // If we get a missing port error, we try to coerce the whole hostPort + // into an address. This allows the user to supply just a host address + // instead of always requiring a host:ip pair. + addr = hostPort + } + + if addr == "" { + return fmt.Errorf("Error parsing blank address") + } + ip := net.ParseIP(addr) + if ip == nil { + return fmt.Errorf("Error parsing address %q: not a valid IP address", ip) + } + + if portStr != "" { + port, err := strconv.Atoi(portStr) + if err != nil { + return fmt.Errorf("Error parsing port: %s", err) + } + tcpAddr.Port = port + } + tcpAddr.IP = ip + + return nil +}
M main.gomain.go

@@ -4,13 +4,16 @@ import (

"fmt" "os" - "git.j3s.sh/cascade/command/agent" - "git.j3s.sh/cascade/command/ls" + "git.j3s.sh/cascade/internal/cli" ) -const usage = `cascade agent start a cascade agent -cascade ls list nodes, services, or members -cascade rtt estimate latency between nodes +const usage = ` usage: cascade [command] [flags] + + commands: + cascade agent start a cascade agent + cascade members list cluster members + cascade status overview of the cascade cluster + cascade rtt estimate latency between nodes ` // TODO: rename agent to something cooler

@@ -24,9 +27,9 @@ command := os.Args[1]

args := os.Args[2:] switch os.Args[1] { case "agent": - agent.Run(args) - case "ls": - ls.Run(args) + cli.RunAgent(args) + case "members": + cli.RunMembers(args) default: fmt.Fprintf(os.Stderr, "'%s' is not a valid command\n\n%s", command, usage) os.Exit(1)