small pixel drawing of a pufferfish cascade

internal/agent/services.go

package agent

import (
	"encoding/json"
	"fmt"
	"sort"
	"sync"

	"j3s.sh/cascade/api"
	"go.etcd.io/bbolt"
)

// serfRegisterEvent / serfDeregisterEvent are the user event names we
// gossip when an agent's owned service set changes.
const (
	serfRegisterEvent   = "cascade:svc-register"
	serfDeregisterEvent = "cascade:svc-deregister"
)

// servicesBucket is the bbolt bucket holding this agent's owned services,
// keyed by service ID.
var servicesBucket = []byte("services")

// registerPayload is the wire format for svc-register events.
type registerPayload struct {
	Node    string            `json:"node"`
	Service *api.AgentService `json:"service"`
}

// deregisterPayload is the wire format for svc-deregister events.
type deregisterPayload struct {
	Node string `json:"node"`
	ID   string `json:"id"`
}

// ServiceStore tracks services known to this agent — both services this
// agent owns (which are persisted) and services owned by peers (which
// are kept in memory only and refreshed via gossip).
type ServiceStore struct {
	mu       sync.RWMutex
	nodeName string
	db       *bbolt.DB

	// services indexed by node name then service ID.
	services map[string]map[string]*api.AgentService
}

// NewServiceStore opens the on-disk store and loads any previously-owned
// services for this node into memory.
func NewServiceStore(nodeName, dbPath string) (*ServiceStore, error) {
	db, err := bbolt.Open(dbPath, 0o600, nil)
	if err != nil {
		return nil, fmt.Errorf("open service db %q: %w", dbPath, err)
	}

	s := &ServiceStore{
		nodeName: nodeName,
		db:       db,
		services: map[string]map[string]*api.AgentService{},
	}

	if err := db.Update(func(tx *bbolt.Tx) error {
		b, err := tx.CreateBucketIfNotExists(servicesBucket)
		if err != nil {
			return err
		}
		return b.ForEach(func(k, v []byte) error {
			var svc api.AgentService
			if err := json.Unmarshal(v, &svc); err != nil {
				return fmt.Errorf("decode persisted service %q: %w", string(k), err)
			}
			s.putLocked(nodeName, &svc)
			return nil
		})
	}); err != nil {
		_ = db.Close()
		return nil, err
	}

	return s, nil
}

// Close releases the on-disk handle. Safe to call multiple times.
func (s *ServiceStore) Close() error {
	if s.db == nil {
		return nil
	}
	err := s.db.Close()
	s.db = nil
	return err
}

// Register adds or replaces a service owned by this agent. Persists to disk
// before returning so a crash after the response can't lose the write.
func (s *ServiceStore) Register(svc *api.AgentService) error {
	if svc.ID == "" {
		return fmt.Errorf("service ID required")
	}
	if svc.Service == "" {
		return fmt.Errorf("service name required")
	}

	buf, err := json.Marshal(svc)
	if err != nil {
		return err
	}

	if err := s.db.Update(func(tx *bbolt.Tx) error {
		return tx.Bucket(servicesBucket).Put([]byte(svc.ID), buf)
	}); err != nil {
		return err
	}

	s.mu.Lock()
	s.putLocked(s.nodeName, svc)
	s.mu.Unlock()
	return nil
}

// Deregister removes a service owned by this agent.
func (s *ServiceStore) Deregister(id string) error {
	if err := s.db.Update(func(tx *bbolt.Tx) error {
		return tx.Bucket(servicesBucket).Delete([]byte(id))
	}); err != nil {
		return err
	}

	s.mu.Lock()
	s.deleteLocked(s.nodeName, id)
	s.mu.Unlock()
	return nil
}

// ApplyRegister handles a gossipped service registration from a peer.
// No-op if the event is one we originated (already in our store).
func (s *ServiceStore) ApplyRegister(node string, svc *api.AgentService) {
	if node == s.nodeName {
		return
	}
	s.mu.Lock()
	s.putLocked(node, svc)
	s.mu.Unlock()
}

// ApplyDeregister handles a gossipped service deregistration from a peer.
func (s *ServiceStore) ApplyDeregister(node, id string) {
	if node == s.nodeName {
		return
	}
	s.mu.Lock()
	s.deleteLocked(node, id)
	s.mu.Unlock()
}

// DropNode forgets every service owned by the given node. Called when a
// peer leaves or is declared failed.
func (s *ServiceStore) DropNode(node string) {
	if node == s.nodeName {
		return
	}
	s.mu.Lock()
	delete(s.services, node)
	s.mu.Unlock()
}

// Owned returns the services this agent owns. Copy, safe to mutate.
func (s *ServiceStore) Owned() map[string]*api.AgentService {
	s.mu.RLock()
	defer s.mu.RUnlock()
	out := map[string]*api.AgentService{}
	for id, svc := range s.services[s.nodeName] {
		cp := *svc
		out[id] = &cp
	}
	return out
}

// NodeService pairs a service with the node that owns it.
type NodeService struct {
	Node    string
	Service *api.AgentService
}

// AllServices returns the cluster-wide map of service name to the union of
// tags seen across all instances. The empty slice means the service exists
// with no tags somewhere.
func (s *ServiceStore) AllServices() map[string][]string {
	s.mu.RLock()
	defer s.mu.RUnlock()
	tagsByName := map[string]map[string]struct{}{}
	for _, byID := range s.services {
		for _, svc := range byID {
			set, ok := tagsByName[svc.Service]
			if !ok {
				set = map[string]struct{}{}
				tagsByName[svc.Service] = set
			}
			for _, t := range svc.Tags {
				set[t] = struct{}{}
			}
		}
	}
	out := make(map[string][]string, len(tagsByName))
	for name, set := range tagsByName {
		tags := make([]string, 0, len(set))
		for t := range set {
			tags = append(tags, t)
		}
		sort.Strings(tags)
		out[name] = tags
	}
	return out
}

// ServiceInstances returns every gossipped instance of the named service,
// across all nodes.
func (s *ServiceStore) ServiceInstances(name string) []NodeService {
	s.mu.RLock()
	defer s.mu.RUnlock()
	var out []NodeService
	for node, byID := range s.services {
		for _, svc := range byID {
			if svc.Service != name {
				continue
			}
			cp := *svc
			out = append(out, NodeService{Node: node, Service: &cp})
		}
	}
	sort.Slice(out, func(i, j int) bool {
		if out[i].Node != out[j].Node {
			return out[i].Node < out[j].Node
		}
		return out[i].Service.ID < out[j].Service.ID
	})
	return out
}

// NodeServices returns every service owned by the named node.
func (s *ServiceStore) NodeServices(node string) map[string]*api.AgentService {
	s.mu.RLock()
	defer s.mu.RUnlock()
	out := map[string]*api.AgentService{}
	for id, svc := range s.services[node] {
		cp := *svc
		out[id] = &cp
	}
	return out
}

// Nodes returns the set of node names that currently have at least one
// gossipped service.
func (s *ServiceStore) Nodes() []string {
	s.mu.RLock()
	defer s.mu.RUnlock()
	out := make([]string, 0, len(s.services))
	for node := range s.services {
		out = append(out, node)
	}
	sort.Strings(out)
	return out
}

// putLocked must be called with s.mu held.
func (s *ServiceStore) putLocked(node string, svc *api.AgentService) {
	if s.services[node] == nil {
		s.services[node] = map[string]*api.AgentService{}
	}
	s.services[node][svc.ID] = svc
}

// deleteLocked must be called with s.mu held.
func (s *ServiceStore) deleteLocked(node, id string) {
	if m, ok := s.services[node]; ok {
		delete(m, id)
		if len(m) == 0 {
			delete(s.services, node)
		}
	}
}