small pixel drawing of a pufferfish cascade

cli ux rework, members -> nodes, register/deregister
Jes Olson j3s@c3f.net
Sat, 23 May 2026 12:20:18 -0500
commit

f15a15ac82a2459745970c202a8a809fddb93a5c

parent

5b06d720bee5aa72d73763dffb7833ada689ec50

A api/catalog.go

@@ -0,0 +1,101 @@

+package api + +import "fmt" + +// Node is a Consul-shaped representation of a cluster member. +type Node struct { + Node string + Address string + Meta map[string]string +} + +// CatalogService describes a single service instance as exposed by the +// /v1/catalog/service/<name> endpoint. +type CatalogService struct { + Node string + Address string + ServiceID string + ServiceName string + ServiceAddress string + ServicePort int + ServiceTags []string + ServiceMeta map[string]string +} + +// CatalogNode bundles a node and the services it owns, returned by +// /v1/catalog/node/<name>. +type CatalogNode struct { + Node *Node + Services map[string]*AgentService +} + +// Catalog provides access to the cluster-wide read views over gossipped +// service state. +type Catalog struct { + c *Client +} + +// Catalog returns a handle to the catalog endpoints. +func (c *Client) Catalog() *Catalog { + return &Catalog{c: c} +} + +// Services returns the cluster-wide service-name -> tag-union view. +func (cat *Catalog) Services() (map[string][]string, error) { + r := cat.c.newRequest("GET", "/v1/catalog/services") + _, resp, err := requireOK(cat.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + var out map[string][]string + if err := decodeJSONBody(resp.Body, &out); err != nil { + return nil, err + } + return out, nil +} + +// Service returns every known instance of the named service. +func (cat *Catalog) Service(name string) ([]*CatalogService, error) { + r := cat.c.newRequest("GET", fmt.Sprintf("/v1/catalog/service/%s", name)) + _, resp, err := requireOK(cat.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + var out []*CatalogService + if err := decodeJSONBody(resp.Body, &out); err != nil { + return nil, err + } + return out, nil +} + +// Nodes returns every node serf knows about. +func (cat *Catalog) Nodes() ([]*Node, error) { + r := cat.c.newRequest("GET", "/v1/catalog/nodes") + _, resp, err := requireOK(cat.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + var out []*Node + if err := decodeJSONBody(resp.Body, &out); err != nil { + return nil, err + } + return out, nil +} + +// Node returns the node and the services it owns. +func (cat *Catalog) Node(name string) (*CatalogNode, error) { + r := cat.c.newRequest("GET", fmt.Sprintf("/v1/catalog/node/%s", name)) + _, resp, err := requireOK(cat.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + var out CatalogNode + if err := decodeJSONBody(resp.Body, &out); err != nil { + return nil, err + } + return &out, nil +}
M internal/agent/agent.gointernal/agent/agent.go

@@ -231,6 +231,51 @@ func (a *Agent) OwnedServices() map[string]*api.AgentService {

return a.services.Owned() } +// CatalogServices returns the cluster-wide service-name -> tag-union view. +func (a *Agent) CatalogServices() map[string][]string { + return a.services.AllServices() +} + +// CatalogServiceInstances returns every known instance of the named service. +func (a *Agent) CatalogServiceInstances(name string) []NodeService { + return a.services.ServiceInstances(name) +} + +// CatalogNodeServices returns the services owned by the named node. +func (a *Agent) CatalogNodeServices(node string) map[string]*api.AgentService { + return a.services.NodeServices(node) +} + +// CatalogNodes returns every node serf currently knows about, regardless of +// whether it has registered services. Nodes that have gossipped services +// but are not yet visible to serf (rare race during join) are also included. +func (a *Agent) CatalogNodes() []serf.Member { + members := a.serf.Members() + seen := map[string]struct{}{} + for _, m := range members { + seen[m.Name] = struct{}{} + } + for _, n := range a.services.Nodes() { + if _, ok := seen[n]; ok { + continue + } + members = append(members, serf.Member{Name: n, Status: serf.StatusNone}) + seen[n] = struct{}{} + } + return members +} + +// MemberByName returns the serf member with the given name, or zero value +// if no such member is known. +func (a *Agent) MemberByName(name string) (serf.Member, bool) { + for _, m := range a.serf.Members() { + if m.Name == name { + return m, true + } + } + return serf.Member{}, false +} + // antiEntropyLoop periodically re-broadcasts this agent's owned services so // peers that missed the original user event eventually catch up. func (a *Agent) antiEntropyLoop() {
A internal/agent/catalog_endpoint.go

@@ -0,0 +1,106 @@

+package agent + +import ( + "fmt" + "net/http" + "strings" + + "git.j3s.sh/cascade/api" +) + +// catalogServices serves GET /v1/catalog/services. Returns the cluster-wide +// map of service name -> union of tags seen on any instance. +func (s *HTTPHandlers) catalogServices(w http.ResponseWriter, r *http.Request) { + out := s.agent.CatalogServices() + if out == nil { + out = map[string][]string{} + } + s.writeJSON(w, r, out) +} + +// catalogService serves GET /v1/catalog/service/<name>. Returns one entry +// per known instance, joined with the owning node's serf membership info. +func (s *HTTPHandlers) catalogService(w http.ResponseWriter, r *http.Request) { + name := strings.TrimPrefix(r.URL.Path, "/v1/catalog/service/") + if name == "" || strings.Contains(name, "/") { + http.Error(w, "service name required in path", http.StatusBadRequest) + return + } + + instances := s.agent.CatalogServiceInstances(name) + out := make([]api.CatalogService, 0, len(instances)) + for _, inst := range instances { + entry := api.CatalogService{ + Node: inst.Node, + ServiceID: inst.Service.ID, + ServiceName: inst.Service.Service, + ServiceAddress: inst.Service.Address, + ServicePort: inst.Service.Port, + ServiceTags: inst.Service.Tags, + ServiceMeta: inst.Service.Meta, + } + if m, ok := s.agent.MemberByName(inst.Node); ok { + entry.Address = m.Addr.String() + } + out = append(out, entry) + } + s.writeJSON(w, r, out) +} + +// catalogNodes serves GET /v1/catalog/nodes. Returns the serf membership +// list shaped as Consul-style Node entries. +func (s *HTTPHandlers) catalogNodes(w http.ResponseWriter, r *http.Request) { + members := s.agent.CatalogNodes() + out := make([]api.Node, 0, len(members)) + for _, m := range members { + out = append(out, api.Node{ + Node: m.Name, + Address: m.Addr.String(), + Meta: m.Tags, + }) + } + s.writeJSON(w, r, out) +} + +// catalogNode serves GET /v1/catalog/node/<name>. Returns the node and the +// services it owns. +func (s *HTTPHandlers) catalogNode(w http.ResponseWriter, r *http.Request) { + name := strings.TrimPrefix(r.URL.Path, "/v1/catalog/node/") + if name == "" || strings.Contains(name, "/") { + http.Error(w, "node name required in path", http.StatusBadRequest) + return + } + + out := api.CatalogNode{ + Services: s.agent.CatalogNodeServices(name), + } + if out.Services == nil { + out.Services = map[string]*api.AgentService{} + } + if m, ok := s.agent.MemberByName(name); ok { + out.Node = &api.Node{ + Node: m.Name, + Address: m.Addr.String(), + Meta: m.Tags, + } + } else if len(out.Services) > 0 { + // Node has gossipped services but serf hasn't surfaced it yet. + out.Node = &api.Node{Node: name} + } else { + http.Error(w, fmt.Sprintf("unknown node %q", name), http.StatusNotFound) + return + } + s.writeJSON(w, r, out) +} + +// writeJSON is the standard 200 + application/json response helper. +func (s *HTTPHandlers) writeJSON(w http.ResponseWriter, r *http.Request, obj interface{}) { + buf, err := s.marshalJSON(r, obj) + if err != nil { + s.agent.logger.Error("marshal response", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(buf) +}
M internal/agent/http.gointernal/agent/http.go

@@ -110,11 +110,15 @@ }

mux.HandleFunc("/", s.Index) endpoints := map[string]func(resp http.ResponseWriter, req *http.Request){ - "/v1/agent/members": s.agentMembers, - "/v1/agent/self": s.agentSelf, - "/v1/agent/services": s.agentServices, - "/v1/agent/service/register": s.agentServiceRegister, + "/v1/agent/members": s.agentMembers, + "/v1/agent/self": s.agentSelf, + "/v1/agent/services": s.agentServices, + "/v1/agent/service/register": s.agentServiceRegister, "/v1/agent/service/deregister/": s.agentServiceDeregister, + "/v1/catalog/services": s.catalogServices, + "/v1/catalog/service/": s.catalogService, + "/v1/catalog/nodes": s.catalogNodes, + "/v1/catalog/node/": s.catalogNode, } for pattern, fn := range endpoints {

@@ -246,7 +250,6 @@

// Give them something helpful if there's no UI so they at least know // what this server is. fmt.Fprint(resp, "cascade agent\n") - return } func decodeBody(body io.Reader, out interface{}) error {
M internal/agent/services.gointernal/agent/services.go

@@ -3,6 +3,7 @@

import ( "encoding/json" "fmt" + "sort" "sync" "git.j3s.sh/cascade/api"

@@ -171,6 +172,92 @@ for id, svc := range s.services[s.nodeName] {

cp := *svc out[id] = &cp } + return out +} + +// NodeService pairs a service with the node that owns it. +type NodeService struct { + Node string + Service *api.AgentService +} + +// AllServices returns the cluster-wide map of service name to the union of +// tags seen across all instances. The empty slice means the service exists +// with no tags somewhere. +func (s *ServiceStore) AllServices() map[string][]string { + s.mu.RLock() + defer s.mu.RUnlock() + tagsByName := map[string]map[string]struct{}{} + for _, byID := range s.services { + for _, svc := range byID { + set, ok := tagsByName[svc.Service] + if !ok { + set = map[string]struct{}{} + tagsByName[svc.Service] = set + } + for _, t := range svc.Tags { + set[t] = struct{}{} + } + } + } + out := make(map[string][]string, len(tagsByName)) + for name, set := range tagsByName { + tags := make([]string, 0, len(set)) + for t := range set { + tags = append(tags, t) + } + sort.Strings(tags) + out[name] = tags + } + return out +} + +// ServiceInstances returns every gossipped instance of the named service, +// across all nodes. +func (s *ServiceStore) ServiceInstances(name string) []NodeService { + s.mu.RLock() + defer s.mu.RUnlock() + var out []NodeService + for node, byID := range s.services { + for _, svc := range byID { + if svc.Service != name { + continue + } + cp := *svc + out = append(out, NodeService{Node: node, Service: &cp}) + } + } + sort.Slice(out, func(i, j int) bool { + if out[i].Node != out[j].Node { + return out[i].Node < out[j].Node + } + return out[i].Service.ID < out[j].Service.ID + }) + return out +} + +// NodeServices returns every service owned by the named node. +func (s *ServiceStore) NodeServices(node string) map[string]*api.AgentService { + s.mu.RLock() + defer s.mu.RUnlock() + out := map[string]*api.AgentService{} + for id, svc := range s.services[node] { + cp := *svc + out[id] = &cp + } + return out +} + +// Nodes returns the set of node names that currently have at least one +// gossipped service. +func (s *ServiceStore) Nodes() []string { + s.mu.RLock() + defer s.mu.RUnlock() + out := make([]string, 0, len(s.services)) + for node := range s.services { + out = append(out, node) + } + sort.Strings(out) return out }
M internal/cli/members.gointernal/cli/nodes.go

@@ -11,24 +11,22 @@

"git.j3s.sh/cascade/api" ) -type membersCommand struct { - usage string - +type nodesCommand struct { flagAPIAddr string flagDetails bool } -func (c membersCommand) Usage() { - fmt.Printf(`usage: cascade members [flags] - list the members of the cascade serf cluster - +func (c nodesCommand) Usage() { + fmt.Printf(`usage: cascade nodes [flags] + list the nodes in the cascade cluster. + flags: -api address of the cascade http api to target (default = 127.0.0.1:8500) `) } -func (c *membersCommand) Init(args []string) { +func (c *nodesCommand) Init(args []string) { flags := flag.NewFlagSet("", flag.ContinueOnError) flags.Usage = c.Usage flags.StringVar(&c.flagAPIAddr, "api", "", "")

@@ -40,17 +38,20 @@ os.Exit(1)

} } -func RunMembers(args []string) { - c := membersCommand{} +func RunNodes(args []string) { + c := nodesCommand{} c.Init(args) cfg := api.DefaultConfig() + if c.flagAPIAddr != "" { + cfg.Address = c.flagAPIAddr + } client, err := api.NewClient(cfg) if err != nil { fmt.Println(err) os.Exit(1) } - members, err := client.Agent().Members() + nodes, err := client.Agent().Members() if err != nil { fmt.Println(err) os.Exit(1)

@@ -59,11 +60,11 @@

var b bytes.Buffer tw := tabwriter.NewWriter(&b, 0, 2, 2, ' ', 0) fmt.Fprintf(tw, "node\taddr\tstatus\ttags\n") - for _, m := range members { - fmt.Fprintf(tw, "%s\t", m.Name) - fmt.Fprintf(tw, "%s:%d\t", m.Addr, m.Port) - fmt.Fprintf(tw, "%s\t", m.StatusPretty()) - fmt.Fprintf(tw, "%s\t", c.printTags(m.Tags)) + for _, n := range nodes { + fmt.Fprintf(tw, "%s\t", n.Name) + fmt.Fprintf(tw, "%s:%d\t", n.Addr, n.Port) + fmt.Fprintf(tw, "%s\t", n.StatusPretty()) + fmt.Fprintf(tw, "%s\t", c.printTags(n.Tags)) fmt.Fprintln(tw) } if err := tw.Flush(); err != nil {

@@ -73,10 +74,13 @@ }

fmt.Print(b.String()) } -func (c membersCommand) printTags(tags map[string]string) string { +func (c nodesCommand) printTags(tags map[string]string) string { var results []string for k, v := range tags { results = append(results, fmt.Sprintf("%s=%s", k, v)) + } + if len(results) == 0 { + return "<none>" } return strings.Join(results, ",") }
M internal/cli/services.gointernal/cli/services.go

@@ -1,9 +1,12 @@

package cli import ( + "bufio" "bytes" + "encoding/json" "flag" "fmt" + "io" "net" "os" "sort"

@@ -19,8 +22,9 @@ flagAPIAddr string

} func (c servicesCommand) Usage() { - fmt.Printf(`usage: cascade services [flags] - list the services registered to a cascade agent. + fmt.Printf(`usage: cascade services [name] [flags] + with no name, list every service in the cluster. + with a name, list every instance of that service. flags: -api

@@ -28,7 +32,7 @@ address of the cascade http api to target (default = 127.0.0.1:8500)

`) } -func (c *servicesCommand) Init(args []string) { +func (c *servicesCommand) Init(args []string) []string { flags := flag.NewFlagSet("", flag.ContinueOnError) flags.Usage = c.Usage flags.StringVar(&c.flagAPIAddr, "api", "", "")

@@ -36,11 +40,12 @@ if err := flags.Parse(args); err != nil {

fmt.Println(err) os.Exit(1) } + return flags.Args() } func RunServices(args []string) { c := servicesCommand{} - c.Init(args) + rest := c.Init(args) cfg := api.DefaultConfig() if c.flagAPIAddr != "" {

@@ -52,42 +57,322 @@ fmt.Println(err)

os.Exit(1) } - services, err := client.Agent().Services() + if len(rest) == 0 { + listAllServices(client) + return + } + listServiceInstances(client, rest[0]) +} + +func listAllServices(client *api.Client) { + services, err := client.Catalog().Services() if err != nil { fmt.Println(err) os.Exit(1) } - if len(services) == 0 { - fmt.Printf("no services registered on %s\n", cfg.Address) + fmt.Println("no services known in the cluster") return } - ids := make([]string, 0, len(services)) - for id := range services { - ids = append(ids, id) + names := make([]string, 0, len(services)) + for n := range services { + names = append(names, n) + } + sort.Strings(names) + + var b bytes.Buffer + tw := tabwriter.NewWriter(&b, 0, 2, 2, ' ', 0) + fmt.Fprintln(tw, "service\ttags") + for _, n := range names { + fmt.Fprintf(tw, "%s\t%s\n", n, joinTagsOrNone(services[n])) } - sort.Strings(ids) + tw.Flush() + fmt.Print(b.String()) +} - nodeName, err := client.Agent().NodeName() +func listServiceInstances(client *api.Client, name string) { + instances, err := client.Catalog().Service(name) if err != nil { fmt.Println(err) os.Exit(1) } + if len(instances) == 0 { + fmt.Printf("no instances of %q known in the cluster\n", name) + return + } + var b bytes.Buffer tw := tabwriter.NewWriter(&b, 0, 2, 2, ' ', 0) - fmt.Fprintf(tw, "node\tid\tname\taddr\ttags\n") - for _, id := range ids { - s := services[id] - addr := s.Address - if s.Port != 0 { - addr = net.JoinHostPort(addr, strconv.Itoa(s.Port)) + fmt.Fprintln(tw, "node\tid\taddr\ttags") + for _, inst := range instances { + addr := inst.ServiceAddress + if addr == "" { + addr = inst.Address + } + if inst.ServicePort != 0 { + addr = net.JoinHostPort(addr, strconv.Itoa(inst.ServicePort)) } - fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", nodeName, s.ID, s.Service, addr, strings.Join(s.Tags, ",")) + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", inst.Node, inst.ServiceID, addr, joinTagsOrNone(inst.ServiceTags)) + } + tw.Flush() + fmt.Print(b.String()) +} + +type registerCmd struct { + flagAPIAddr string + flagName string + flagID string + flagAddr string + flagPort int + flagTags string + flagFile string +} + +func (c registerCmd) Usage() { + fmt.Printf(`usage: cascade register [flags] + register a service on the local cascade agent. with no flags, prompts + interactively. with -f, reads a full AgentService JSON document from a + file (use '-' for stdin). + +flags: + -api address of the cascade http api to target (default = 127.0.0.1:8500) + -name service name (required) + -port service port (required) + -id service id (default = name) + -addr service address (default = empty; consumers use the node address) + -tags comma-separated tags + -f read AgentService JSON from file or '-' for stdin +`) +} + +func RunRegister(args []string) { + c := registerCmd{} + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.Usage = c.Usage + flags.StringVar(&c.flagAPIAddr, "api", "", "") + flags.StringVar(&c.flagName, "name", "", "") + flags.StringVar(&c.flagID, "id", "", "") + flags.StringVar(&c.flagAddr, "addr", "", "") + flags.IntVar(&c.flagPort, "port", 0, "") + flags.StringVar(&c.flagTags, "tags", "", "") + flags.StringVar(&c.flagFile, "f", "", "") + if err := flags.Parse(args); err != nil { + os.Exit(1) + } + + var svc *api.AgentService + var err error + switch { + case c.flagFile != "": + svc, err = readServiceFile(c.flagFile) + case c.flagName != "" && c.flagPort != 0: + svc = c.fromFlags() + default: + svc, err = c.prompt() + } + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if svc.Service == "" { + fmt.Println("service name required") + os.Exit(1) + } + if svc.ID == "" { + svc.ID = svc.Service + } + + cfg := api.DefaultConfig() + if c.flagAPIAddr != "" { + cfg.Address = c.flagAPIAddr + } + client, err := api.NewClient(cfg) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := client.Agent().ServiceRegister(svc); err != nil { + fmt.Printf("register failed: %s\n", err) + os.Exit(1) + } + + addrDesc := svc.Address + if addrDesc == "" { + addrDesc = "<node addr>" + } + if svc.Port != 0 { + addrDesc = net.JoinHostPort(addrDesc, strconv.Itoa(svc.Port)) + } + fmt.Printf("service registered: %s (%s)", svc.Service, addrDesc) + if len(svc.Tags) > 0 { + fmt.Printf(" tags=%s", strings.Join(svc.Tags, ",")) + } + fmt.Printf("\n") +} + +func (c registerCmd) fromFlags() *api.AgentService { + svc := &api.AgentService{ + ID: c.flagID, + Service: c.flagName, + Address: c.flagAddr, + Port: c.flagPort, + } + if c.flagTags != "" { + svc.Tags = splitTags(c.flagTags) + } + return svc +} + +func (c registerCmd) prompt() (*api.AgentService, error) { + if !isTerminal(os.Stdin) { + return nil, fmt.Errorf("register requires -name and -port (or -f) when stdin is not a tty") + } + r := bufio.NewReader(os.Stdin) + + name := promptLine(r, "service name", c.flagName) + if name == "" { + return nil, fmt.Errorf("service name required") } - if err := tw.Flush(); err != nil { - fmt.Printf("error flushing tabwriter: %s", err) + + portDef := "" + if c.flagPort != 0 { + portDef = strconv.Itoa(c.flagPort) + } + portStr := promptLine(r, "port", portDef) + port, err := strconv.Atoi(portStr) + if err != nil || port <= 0 { + return nil, fmt.Errorf("port must be a positive integer") + } + + id := promptLine(r, "id", firstNonEmpty(c.flagID, name)) + addr := promptLine(r, "address (empty = use node addr)", c.flagAddr) + tags := promptLine(r, "tags (comma-separated)", c.flagTags) + + svc := &api.AgentService{ + ID: id, + Service: name, + Address: addr, + Port: port, + } + if tags != "" { + svc.Tags = splitTags(tags) + } + return svc, nil +} + +type deregisterCmd struct { + flagAPIAddr string +} + +func (c deregisterCmd) Usage() { + fmt.Printf(`usage: cascade deregister <id> [flags] + remove a service from the local cascade agent. + +flags: + -api address of the cascade http api to target (default = 127.0.0.1:8500) +`) +} + +func RunDeregister(args []string) { + c := deregisterCmd{} + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.Usage = c.Usage + flags.StringVar(&c.flagAPIAddr, "api", "", "") + if err := flags.Parse(args); err != nil { os.Exit(1) } - fmt.Print(b.String()) + rest := flags.Args() + if len(rest) == 0 { + c.Usage() + os.Exit(1) + } + id := rest[0] + + cfg := api.DefaultConfig() + if c.flagAPIAddr != "" { + cfg.Address = c.flagAPIAddr + } + client, err := api.NewClient(cfg) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := client.Agent().ServiceDeregister(id); err != nil { + fmt.Printf("deregister failed: %s\n", err) + os.Exit(1) + } + fmt.Printf("deregistered %s\n", id) +} + +func readServiceFile(path string) (*api.AgentService, error) { + var r io.Reader + if path == "-" { + r = os.Stdin + } else { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + r = f + } + var svc api.AgentService + if err := json.NewDecoder(r).Decode(&svc); err != nil { + return nil, fmt.Errorf("decode %s: %w", path, err) + } + return &svc, nil +} + +func promptLine(r *bufio.Reader, label, def string) string { + if def != "" { + fmt.Printf("%s [%s]: ", label, def) + } else { + fmt.Printf("%s: ", label) + } + line, err := r.ReadString('\n') + if err != nil && err != io.EOF { + return def + } + line = strings.TrimRight(line, "\r\n") + if line == "" { + return def + } + return line +} + +func joinTagsOrNone(tags []string) string { + if len(tags) == 0 { + return "<none>" + } + return strings.Join(tags, ",") +} + +func splitTags(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func firstNonEmpty(vals ...string) string { + for _, v := range vals { + if v != "" { + return v + } + } + return "" +} + +func isTerminal(f *os.File) bool { + fi, err := f.Stat() + if err != nil { + return false + } + return (fi.Mode() & os.ModeCharDevice) != 0 }
M internal/cli/status.gointernal/cli/status.go

@@ -6,6 +6,7 @@ "flag"

"fmt" "os" "sort" + "strings" "text/tabwriter" "time"

@@ -54,10 +55,10 @@

agent := client.Agent() start := time.Now() - members, err := agent.Members() + nodes, err := agent.Members() apiRTT := time.Since(start) if err != nil { - fmt.Printf("error fetching members from %s: %s\n", cfg.Address, err) + fmt.Printf("error fetching nodes from %s: %s\n", cfg.Address, err) os.Exit(1) }

@@ -73,14 +74,17 @@ }

statusCounts := map[string]int{} tagKeyCounts := map[string]int{} - for _, m := range members { - statusCounts[m.StatusPretty()]++ - for k := range m.Tags { + for _, n := range nodes { + statusCounts[n.StatusPretty()]++ + for k := range n.Tags { tagKeyCounts[k]++ } } - total := len(members) + localServices, _ := agent.Services() + catalogServices, _ := client.Catalog().Services() + + total := len(nodes) alive := statusCounts["alive"] healthPct := 0.0 if total > 0 {

@@ -94,7 +98,7 @@ fmt.Fprintln(tw, "cluster")

fmt.Fprintf(tw, " agent\t%s\n", nodeName) fmt.Fprintf(tw, " api\t%s\n", cfg.Address) fmt.Fprintf(tw, " api rtt\t%s\n", apiRTT.Round(time.Microsecond)) - fmt.Fprintf(tw, " members\t%d\n", total) + fmt.Fprintf(tw, " nodes\t%d\n", total) fmt.Fprintf(tw, " health\t%.1f%% alive (%d/%d)\n", healthPct, alive, total) fmt.Fprintln(tw)

@@ -106,6 +110,11 @@ fmt.Fprintf(tw, " serf\t%s\n", serfAddr)

fmt.Fprintln(tw) } + fmt.Fprintln(tw, "services") + fmt.Fprintf(tw, " local agent\t%s\n", summarizeLocalServices(localServices)) + fmt.Fprintf(tw, " cluster\t%s\n", summarizeCatalog(catalogServices)) + fmt.Fprintln(tw) + fmt.Fprintln(tw, "status") for _, s := range sortedKeys(statusCounts) { fmt.Fprintf(tw, " %s\t%d\n", s, statusCounts[s])

@@ -138,3 +147,22 @@ }

sort.Strings(out) return out } + +func summarizeLocalServices(services map[string]*api.AgentService) string { + if len(services) == 0 { + return "none" + } + names := make([]string, 0, len(services)) + for _, s := range services { + names = append(names, s.Service) + } + sort.Strings(names) + return fmt.Sprintf("%d (%s)", len(names), strings.Join(names, ", ")) +} + +func summarizeCatalog(services map[string][]string) string { + if len(services) == 0 { + return "none" + } + return fmt.Sprintf("%d unique services", len(services)) +}
M main.gomain.go

@@ -9,14 +9,14 @@ )

const usage = ` usage: cascade [command] [flags] [-h] - common commands: - cascade agent start a cascade agent - cascade members list cascade cluster members - cascade services list services on a particular agent - cascade status overview of the cascade cluster - - advanced: - cascade rtt estimate latency between nodes + commands: + cascade agent start a cascade agent + cascade status overview of the cascade cluster + cascade nodes list nodes + cascade services [name] list services + cascade register register a service to an agent + cascade deregister <id> remove a service from an agent + cascade rtt estimate latency between nodes ` // TODO: rename agent to something cooler

@@ -31,12 +31,16 @@ args := os.Args[2:]

switch os.Args[1] { case "agent": cli.RunAgent(args) - case "members": - cli.RunMembers(args) + case "nodes": + cli.RunNodes(args) case "status": cli.RunStatus(args) case "services": cli.RunServices(args) + case "register": + cli.RunRegister(args) + case "deregister": + cli.RunDeregister(args) default: fmt.Fprintf(os.Stderr, "'%s' is not a valid command\n\n%s", command, usage) os.Exit(1)