small pixel drawing of a pufferfish cascade

add bbolt, datadir, service endpoints, services cmd
Jes Olson j3s@c3f.net
Sat, 23 May 2026 09:08:08 -0500
commit

f96d10a3733a136c56b543525ff55f64c31a6ccb

parent

f082682140b7e38eb9b22f4d466cd732484184de

M .gitignore.gitignore

@@ -1,2 +1,3 @@

cascade *.prof +cascade-data
A api/service.go

@@ -0,0 +1,68 @@

+package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" +) + +// AgentService describes a service registered on an agent. The JSON shape +// mirrors Consul's so existing SDKs round-trip cleanly. +type AgentService struct { + ID string + Service string + Tags []string + Address string + Port int + Meta map[string]string +} + +// ServiceRegister registers a service with the local agent. The agent owns +// the entry and gossips it to the rest of the cluster. +func (a *Agent) ServiceRegister(svc *AgentService) error { + body, err := json.Marshal(svc) + if err != nil { + return err + } + r := a.c.newRequest("PUT", "/v1/agent/service/register") + r.body = bytes.NewReader(body) + r.header.Set("Content-Type", "application/json") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return err + } + defer closeResponseBody(resp) + return nil +} + +// ServiceDeregister removes a service the local agent owns. +func (a *Agent) ServiceDeregister(id string) error { + r := a.c.newRequest("PUT", fmt.Sprintf("/v1/agent/service/deregister/%s", id)) + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return err + } + defer closeResponseBody(resp) + return nil +} + +// Services lists services registered on the local agent. +func (a *Agent) Services() (map[string]*AgentService, error) { + r := a.c.newRequest("GET", "/v1/agent/services") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + + var out map[string]*AgentService + if err := decodeJSONBody(resp.Body, &out); err != nil { + return nil, err + } + return out, nil +} + +func decodeJSONBody(body io.Reader, out interface{}) error { + return json.NewDecoder(body).Decode(out) +}
M go.modgo.mod

@@ -1,6 +1,6 @@

module git.j3s.sh/cascade -go 1.20 +go 1.23 require ( github.com/hashicorp/serf v0.10.1

@@ -19,6 +19,7 @@ github.com/hashicorp/golang-lru v0.5.0 // indirect

github.com/hashicorp/memberlist v0.5.0 // indirect github.com/miekg/dns v1.1.41 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect + go.etcd.io/bbolt v1.4.3 // indirect golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 // indirect - golang.org/x/sys v0.1.0 // indirect + golang.org/x/sys v0.29.0 // indirect )
M go.sumgo.sum

@@ -59,6 +59,9 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w=

@@ -72,6 +75,7 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=

golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

@@ -86,6 +90,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
M internal/agent/agent.gointernal/agent/agent.go

@@ -55,16 +55,23 @@

package agent import ( + "encoding/json" "fmt" "net/http" "os" + "path/filepath" "sync" "time" + "git.j3s.sh/cascade/api" "github.com/hashicorp/serf/serf" "golang.org/x/exp/slog" ) +// antiEntropyInterval is how often each agent re-broadcasts its owned +// service state so peers that missed the original user event catch up. +const antiEntropyInterval = 60 * time.Second + // Agent starts and manages a Serf & adds service discovery (TODO) type Agent struct { // this is the agent's configuration

@@ -76,6 +83,9 @@ serf *serf.Serf

// This is the http api handler - see http.go and *_endpoint.go httpHandlers *HTTPHandlers + + // services holds the agent's own + gossipped service catalog. + services *ServiceStore // We receive serf events on this channel eventCh chan serf.Event

@@ -135,7 +145,7 @@ serfShutdownCh := a.serf.ShutdownCh()

for { select { case e := <-a.eventCh: - a.logger.Info("receive event", "event", e.String()) + a.handleSerfEvent(e) case <-serfShutdownCh: a.logger.Info("detect serf shutdown, turning off event loop")

@@ -148,18 +158,130 @@ }

} } +func (a *Agent) handleSerfEvent(e serf.Event) { + switch ev := e.(type) { + case serf.UserEvent: + a.handleUserEvent(ev) + case serf.MemberEvent: + if ev.Type == serf.EventMemberFailed || ev.Type == serf.EventMemberLeave || ev.Type == serf.EventMemberReap { + for _, m := range ev.Members { + a.logger.Info("drop services for departed member", "node", m.Name, "event", ev.Type.String()) + a.services.DropNode(m.Name) + } + } + default: + a.logger.Debug("receive event", "event", e.String()) + } +} + +func (a *Agent) handleUserEvent(ev serf.UserEvent) { + switch ev.Name { + case serfRegisterEvent: + var p registerPayload + if err := json.Unmarshal(ev.Payload, &p); err != nil { + a.logger.Warn("decode svc-register event", "err", err) + return + } + if p.Service == nil { + return + } + a.services.ApplyRegister(p.Node, p.Service) + case serfDeregisterEvent: + var p deregisterPayload + if err := json.Unmarshal(ev.Payload, &p); err != nil { + a.logger.Warn("decode svc-deregister event", "err", err) + return + } + a.services.ApplyDeregister(p.Node, p.ID) + } +} + +// RegisterService stores a service in the local agent and gossips the new +// state to the cluster. +func (a *Agent) RegisterService(svc *api.AgentService) error { + if err := a.services.Register(svc); err != nil { + return err + } + return a.broadcastRegister(svc) +} + +// DeregisterService removes a service from the local agent and gossips the +// removal to the cluster. +func (a *Agent) DeregisterService(id string) error { + if err := a.services.Deregister(id); err != nil { + return err + } + payload, err := json.Marshal(deregisterPayload{Node: a.Config.NodeName, ID: id}) + if err != nil { + return err + } + return a.serf.UserEvent(serfDeregisterEvent, payload, false) +} + +func (a *Agent) broadcastRegister(svc *api.AgentService) error { + payload, err := json.Marshal(registerPayload{Node: a.Config.NodeName, Service: svc}) + if err != nil { + return err + } + return a.serf.UserEvent(serfRegisterEvent, payload, false) +} + +// OwnedServices returns this agent's currently-owned service registrations. +func (a *Agent) OwnedServices() map[string]*api.AgentService { + return a.services.Owned() +} + +// antiEntropyLoop periodically re-broadcasts this agent's owned services so +// peers that missed the original user event eventually catch up. +func (a *Agent) antiEntropyLoop() { + t := time.NewTicker(antiEntropyInterval) + defer t.Stop() + for { + select { + case <-t.C: + for _, svc := range a.services.Owned() { + if err := a.broadcastRegister(svc); err != nil { + a.logger.Warn("anti-entropy rebroadcast", "id", svc.ID, "err", err) + } + } + case <-a.shutdownCh: + return + } + } +} + // Start creates and starts an agent's serf client, eventloop, api, and more! // The agent config should not be modified after an agent has been started. func (a *Agent) Start() error { + // open the service store before serf so we never gossip without it + if err := os.MkdirAll(a.Config.DataDir, 0o700); err != nil { + return fmt.Errorf("create data dir %q: %w", a.Config.DataDir, err) + } + dbPath := filepath.Join(a.Config.DataDir, a.Config.NodeName+".db") + store, err := NewServiceStore(a.Config.NodeName, dbPath) + if err != nil { + return err + } + a.services = store + // Create serf first - serf, err := serf.Create(a.serfConfig) + srf, err := serf.Create(a.serfConfig) if err != nil { return err } - a.serf = serf + a.serf = srf // Start event loop go a.eventLoop() + go a.antiEntropyLoop() + + // Re-broadcast any services that survived restart so peers catch up + // without waiting for the first anti-entropy tick. + for _, svc := range a.services.Owned() { + if err := a.broadcastRegister(svc); err != nil { + a.logger.Warn("startup rebroadcast", "id", svc.ID, "err", err) + } + } // Start API Server apiHandler := a.httpHandlers.handler()

@@ -196,6 +318,11 @@ }

EXIT: a.logger.Info("complete serf shutdown") + if a.services != nil { + if err := a.services.Close(); err != nil { + a.logger.Warn("close service store", "err", err) + } + } a.shutdown = true close(a.shutdownCh) return nil
M internal/agent/agent_endpoint.gointernal/agent/agent_endpoint.go

@@ -1,9 +1,12 @@

package agent import ( + "encoding/json" "fmt" "net/http" + "strings" + "git.j3s.sh/cascade/api" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" )

@@ -58,3 +61,64 @@ return

} fmt.Fprintf(w, string(json)) } + +func (s *HTTPHandlers) agentServices(w http.ResponseWriter, r *http.Request) { + services := s.agent.OwnedServices() + buf, err := s.marshalJSON(r, services) + if err != nil { + s.agent.logger.Error("marshal services", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(buf) +} + +func (s *HTTPHandlers) agentServiceRegister(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPut && r.Method != http.MethodPost { + w.Header().Set("Allow", "PUT, POST") + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + var svc api.AgentService + if err := json.NewDecoder(r.Body).Decode(&svc); err != nil { + http.Error(w, fmt.Sprintf("bad request: %s", err), http.StatusBadRequest) + return + } + if svc.ID == "" { + svc.ID = svc.Service + } + if svc.Service == "" { + http.Error(w, "Service name required", http.StatusBadRequest) + return + } + + if err := s.agent.RegisterService(&svc); err != nil { + s.agent.logger.Error("register service", err, "id", svc.ID) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) +} + +func (s *HTTPHandlers) agentServiceDeregister(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPut && r.Method != http.MethodPost { + w.Header().Set("Allow", "PUT, POST") + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + id := strings.TrimPrefix(r.URL.Path, "/v1/agent/service/deregister/") + if id == "" || strings.Contains(id, "/") { + http.Error(w, "service ID required in path", http.StatusBadRequest) + return + } + + if err := s.agent.DeregisterService(id); err != nil { + s.agent.logger.Error("deregister service", err, "id", id) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) +}
M internal/agent/config.gointernal/agent/config.go

@@ -6,9 +6,10 @@ "os"

) const ( - DefaultDNSPort int = 8600 - DefaultHTTPPort int = 8500 - DefaultSerfPort int = 8301 + DefaultDNSPort int = 8600 + DefaultHTTPPort int = 8500 + DefaultSerfPort int = 8301 + DefaultDataDir string = "./cascade-data" ) func DefaultConfig() *Config {

@@ -22,6 +23,7 @@ cfg.DNSBindAddr = &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: DefaultDNSPort}

cfg.HTTPBindAddr = &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: DefaultHTTPPort} cfg.SerfBindAddr = &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: DefaultSerfPort} cfg.NodeName = hostname + cfg.DataDir = DefaultDataDir return &cfg }

@@ -33,6 +35,7 @@ HTTPBindAddr *net.TCPAddr

SerfBindAddr *net.TCPAddr NodeName string StartJoin []string + DataDir string //// non user-configurable // HTTPResponseHeaders are used to add HTTP header response fields to the HTTP API responses.
M internal/agent/http.gointernal/agent/http.go

@@ -110,8 +110,11 @@ }

mux.HandleFunc("/", s.Index) endpoints := map[string]func(resp http.ResponseWriter, req *http.Request){ - "/v1/agent/members": s.agentMembers, - "/v1/agent/self": s.agentSelf, + "/v1/agent/members": s.agentMembers, + "/v1/agent/self": s.agentSelf, + "/v1/agent/services": s.agentServices, + "/v1/agent/service/register": s.agentServiceRegister, + "/v1/agent/service/deregister/": s.agentServiceDeregister, } for pattern, fn := range endpoints {
A internal/agent/services.go

@@ -0,0 +1,193 @@

+package agent + +import ( + "encoding/json" + "fmt" + "sync" + + "git.j3s.sh/cascade/api" + "go.etcd.io/bbolt" +) + +// serfRegisterEvent / serfDeregisterEvent are the user event names we +// gossip when an agent's owned service set changes. +const ( + serfRegisterEvent = "cascade:svc-register" + serfDeregisterEvent = "cascade:svc-deregister" +) + +// servicesBucket is the bbolt bucket holding this agent's owned services, +// keyed by service ID. +var servicesBucket = []byte("services") + +// registerPayload is the wire format for svc-register events. +type registerPayload struct { + Node string `json:"node"` + Service *api.AgentService `json:"service"` +} + +// deregisterPayload is the wire format for svc-deregister events. +type deregisterPayload struct { + Node string `json:"node"` + ID string `json:"id"` +} + +// ServiceStore tracks services known to this agent — both services this +// agent owns (which are persisted) and services owned by peers (which +// are kept in memory only and refreshed via gossip). +type ServiceStore struct { + mu sync.RWMutex + nodeName string + db *bbolt.DB + + // services indexed by node name then service ID. + services map[string]map[string]*api.AgentService +} + +// NewServiceStore opens the on-disk store and loads any previously-owned +// services for this node into memory. +func NewServiceStore(nodeName, dbPath string) (*ServiceStore, error) { + db, err := bbolt.Open(dbPath, 0o600, nil) + if err != nil { + return nil, fmt.Errorf("open service db %q: %w", dbPath, err) + } + + s := &ServiceStore{ + nodeName: nodeName, + db: db, + services: map[string]map[string]*api.AgentService{}, + } + + if err := db.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(servicesBucket) + if err != nil { + return err + } + return b.ForEach(func(k, v []byte) error { + var svc api.AgentService + if err := json.Unmarshal(v, &svc); err != nil { + return fmt.Errorf("decode persisted service %q: %w", string(k), err) + } + s.putLocked(nodeName, &svc) + return nil + }) + }); err != nil { + _ = db.Close() + return nil, err + } + + return s, nil +} + +// Close releases the on-disk handle. Safe to call multiple times. +func (s *ServiceStore) Close() error { + if s.db == nil { + return nil + } + err := s.db.Close() + s.db = nil + return err +} + +// Register adds or replaces a service owned by this agent. Persists to disk +// before returning so a crash after the response can't lose the write. +func (s *ServiceStore) Register(svc *api.AgentService) error { + if svc.ID == "" { + return fmt.Errorf("service ID required") + } + if svc.Service == "" { + return fmt.Errorf("service name required") + } + + buf, err := json.Marshal(svc) + if err != nil { + return err + } + + if err := s.db.Update(func(tx *bbolt.Tx) error { + return tx.Bucket(servicesBucket).Put([]byte(svc.ID), buf) + }); err != nil { + return err + } + + s.mu.Lock() + s.putLocked(s.nodeName, svc) + s.mu.Unlock() + return nil +} + +// Deregister removes a service owned by this agent. +func (s *ServiceStore) Deregister(id string) error { + if err := s.db.Update(func(tx *bbolt.Tx) error { + return tx.Bucket(servicesBucket).Delete([]byte(id)) + }); err != nil { + return err + } + + s.mu.Lock() + s.deleteLocked(s.nodeName, id) + s.mu.Unlock() + return nil +} + +// ApplyRegister handles a gossipped service registration from a peer. +// No-op if the event is one we originated (already in our store). +func (s *ServiceStore) ApplyRegister(node string, svc *api.AgentService) { + if node == s.nodeName { + return + } + s.mu.Lock() + s.putLocked(node, svc) + s.mu.Unlock() +} + +// ApplyDeregister handles a gossipped service deregistration from a peer. +func (s *ServiceStore) ApplyDeregister(node, id string) { + if node == s.nodeName { + return + } + s.mu.Lock() + s.deleteLocked(node, id) + s.mu.Unlock() +} + +// DropNode forgets every service owned by the given node. Called when a +// peer leaves or is declared failed. +func (s *ServiceStore) DropNode(node string) { + if node == s.nodeName { + return + } + s.mu.Lock() + delete(s.services, node) + s.mu.Unlock() +} + +// Owned returns the services this agent owns. Copy, safe to mutate. +func (s *ServiceStore) Owned() map[string]*api.AgentService { + s.mu.RLock() + defer s.mu.RUnlock() + out := map[string]*api.AgentService{} + for id, svc := range s.services[s.nodeName] { + cp := *svc + out[id] = &cp + } + return out +} + +// putLocked must be called with s.mu held. +func (s *ServiceStore) putLocked(node string, svc *api.AgentService) { + if s.services[node] == nil { + s.services[node] = map[string]*api.AgentService{} + } + s.services[node][svc.ID] = svc +} + +// deleteLocked must be called with s.mu held. +func (s *ServiceStore) deleteLocked(node, id string) { + if m, ok := s.services[node]; ok { + delete(m, id) + if len(m) == 0 { + delete(s.services, node) + } + } +}
M internal/cli/agent.gointernal/cli/agent.go

@@ -23,6 +23,7 @@ flagBindHTTP string

flagBindSerf string flagJoin string flagNode string + flagDataDir string } func (c *agentCommand) Usage() {

@@ -48,6 +49,9 @@ comma-separated address of agents to join at start time (default = nil)

-node=<name> name of this node, must be globally unique (default = hostname) + + -data-dir=<path> + directory for cascade's local state (default = ./cascade-data) `) }

@@ -61,6 +65,7 @@ flags.StringVar(&c.flagBindHTTP, "bind-http", "", "")

flags.StringVar(&c.flagBindSerf, "bind-serf", "", "") flags.StringVar(&c.flagJoin, "join", "", "") flags.StringVar(&c.flagNode, "node", "", "") + flags.StringVar(&c.flagDataDir, "data-dir", "", "") if err := flags.Parse(args); err != nil { fmt.Println(err)

@@ -201,6 +206,9 @@ }

if c.flagNode != "" { config.NodeName = c.flagNode + } + if c.flagDataDir != "" { + config.DataDir = c.flagDataDir } return config, nil }
M internal/cli/services.gointernal/cli/services.go

@@ -1,14 +1,93 @@

package cli import ( + "bytes" + "flag" "fmt" + "net" + "os" + "sort" + "strconv" + "strings" + "text/tabwriter" + + "git.j3s.sh/cascade/api" ) -const usage = `cascade list|ls nodes -cascade list|ls members -cascade list|ls services -` +type servicesCommand struct { + flagAPIAddr string +} + +func (c servicesCommand) Usage() { + fmt.Printf(`usage: cascade services [flags] + list the services registered to a cascade agent. + +flags: + -api + address of the cascade http api to target (default = 127.0.0.1:8500) +`) +} + +func (c *servicesCommand) Init(args []string) { + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.Usage = c.Usage + flags.StringVar(&c.flagAPIAddr, "api", "", "") + if err := flags.Parse(args); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func RunServices(args []string) { + c := servicesCommand{} + c.Init(args) + + cfg := api.DefaultConfig() + if c.flagAPIAddr != "" { + cfg.Address = c.flagAPIAddr + } + client, err := api.NewClient(cfg) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + services, err := client.Agent().Services() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + if len(services) == 0 { + fmt.Printf("no services registered on %s\n", cfg.Address) + return + } + + ids := make([]string, 0, len(services)) + for id := range services { + ids = append(ids, id) + } + sort.Strings(ids) -func Run(args []string) { - fmt.Printf("%+v", usage) + nodeName, err := client.Agent().NodeName() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + var b bytes.Buffer + tw := tabwriter.NewWriter(&b, 0, 2, 2, ' ', 0) + fmt.Fprintf(tw, "node\tid\tname\taddr\ttags\n") + for _, id := range ids { + s := services[id] + addr := s.Address + if s.Port != 0 { + addr = net.JoinHostPort(addr, strconv.Itoa(s.Port)) + } + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\n", nodeName, s.ID, s.Service, addr, strings.Join(s.Tags, ",")) + } + if err := tw.Flush(); err != nil { + fmt.Printf("error flushing tabwriter: %s", err) + os.Exit(1) + } + fmt.Print(b.String()) }
M main.gomain.go

@@ -7,12 +7,15 @@

"git.j3s.sh/cascade/internal/cli" ) -const usage = ` usage: cascade [command] [flags] +const usage = ` usage: cascade [command] [flags] [-h] - commands: + common commands: cascade agent start a cascade agent cascade members list cascade cluster members + cascade services list services on a particular agent cascade status overview of the cascade cluster + + advanced: cascade rtt estimate latency between nodes `

@@ -32,6 +35,8 @@ case "members", "list":

cli.RunMembers(args) case "status": cli.RunStatus(args) + case "services": + cli.RunServices(args) default: fmt.Fprintf(os.Stderr, "'%s' is not a valid command\n\n%s", command, usage) os.Exit(1)