small pixel drawing of a pufferfish cascade

internal/agent/agent.go

// design touchstones
//   simple to configure/implement
//     - minimal configurable options
//     - sparse use of commands/flags/env vars
//     - fast to start/join a cluster
//     - http-only api
//       api auth/encryption is best implemented by a reverse proxy (like caddy or nginx)
//     - no RPC (it's too annoying to deal with)
//   predictable state
//     - no runtime configuration allowed (no reload support)
//       for a gossip cluster to work, nodes in the cluster must
//       be the source of truth. we do not allow the user to change
//       the cluster's view of reality in any way - the user may
//       only inspect it.
//     - HTTP API only supports read-only commands
//       this ensures that cascade's starting state never
//       diverges from its running state.
//       (restarts are very fast + interruptionless)
//       this also helps with security
//   one massive global cluster
//     - for simplicity, operability, and because gossip can handle it
//   compatability
//     - attempt to have a consul-compatible API where it matters?
// todo
//   think about checks
//   think about DNS weights
//   dns resolver for services
//   read-write exception for maintenance mode?
//   opentelemetry metrics
//   how should cascade handle name conflicts? defaulting to just blowing up
//     for now, but we _could_ take the tailscale route & append -1 or whatever
//     to the node. that would be more user friendly.

// key differences from consul
//   - services are gossip'd

// in cascade, node == member, and catalog == agent
// agent api (http-only)
//   GET /v1/agent/metrics  -> opentelemetry metrics for this agent
//   GET /v1/agent/members  -> show serf cluster membership state
//   GET /v1/agent/services -> show services this agent owns
//     aliases: GET /v1/catalog/nodes
// catalog api (http & dns)
//   GET /v1/catalog/nodes
//   GET /v1/catalog/services
//     dns: dig @127.0.0.1 -p 8600 nostromo.node.cascade. ANY
//     dns: dig @127.0.0.1 -p 8600 redis.service.cascade. ANY

// consul uses /catalog because the masters maintain a service
// catalog separate than the agent state

// in cascade, the agents _are_ expected to maintain service catalogues,
// so we do away with the catalog concept entirely.

package agent

import (
	"fmt"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/hashicorp/serf/serf"
	"golang.org/x/exp/slog"
)

// Agent starts and manages a Serf & adds service discovery (TODO)
type Agent struct {
	// this is the agent's configuration
	Config     *Config
	serfConfig *serf.Config

	// This is the underlying Serf we are wrapping
	serf *serf.Serf

	// This is the http api handler - see http.go and *_endpoint.go
	httpHandlers *HTTPHandlers

	// We receive serf events on this channel
	eventCh chan serf.Event
	logger  *slog.Logger

	// if this channel recieves a signal, cascade shuts down
	shutdown     bool
	shutdownCh   chan struct{}
	shutdownLock sync.Mutex
}

// New returns an agent lmao
func New(config *Config) *Agent {
	eventCh := make(chan serf.Event, 1024)

	// TODO: make log level configurable
	// XXX: reconsider this agent-scoped logger? what does consul do?
	// logLogger is a bridge from the log.Logger to slog.Logger
	h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(os.Stderr)
	logger := slog.New(h)
	slog.SetDefault(logger)

	// serf
	serfConfig := serf.DefaultConfig()
	serfConfig.EventCh = eventCh
	serfConfig.Logger = slog.NewLogLogger(h, slog.LevelInfo)
	// TODO: some of these serf settings were cargo-culted
	// from consul[1]. re-examine them eventually.
	serfConfig.EnableNameConflictResolution = false
	serfConfig.LeavePropagateDelay = 3 * time.Second
	serfConfig.MemberlistConfig.BindAddr = config.SerfBindAddr.IP.String()
	serfConfig.MemberlistConfig.BindPort = config.SerfBindAddr.Port
	serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
	serfConfig.MinQueueDepth = 4096
	serfConfig.NodeName = config.NodeName
	serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)
	serfConfig.QueueDepthWarning = 1000000
	serfConfig.ReconnectTimeout = 3 * 24 * time.Hour

	agent := Agent{
		Config:       config,
		serfConfig:   serfConfig,
		httpHandlers: &HTTPHandlers{},
		eventCh:      eventCh,
		logger:       logger,
		shutdownCh:   make(chan struct{}),
	}

	// we put the completed agent inside of httpHandlers because
	// it needs access to a bunch of Agent{} funcs. basically a hardlink.
	agent.httpHandlers.agent = &agent
	return &agent
}

func (a *Agent) eventLoop() {
	serfShutdownCh := a.serf.ShutdownCh()
	for {
		select {
		case e := <-a.eventCh:
			a.logger.Info("receive event", "event", e.String())

		case <-serfShutdownCh:
			a.logger.Info("detect serf shutdown, turning off event loop")
			a.Shutdown()
			return

		case <-a.shutdownCh:
			return
		}
	}
}

// Start creates and starts an agent's serf client, eventloop, api, and more!
// The agent config should not be modified after an agent has been started.
func (a *Agent) Start() error {
	// Create serf first
	serf, err := serf.Create(a.serfConfig)
	if err != nil {
		return err
	}
	a.serf = serf

	// Start event loop
	go a.eventLoop()

	// Start API Server
	apiHandler := a.httpHandlers.handler()
	go a.serveAPI(apiHandler)

	return nil
}

func (a *Agent) serveAPI(handler http.Handler) {
	portStr := fmt.Sprintf("%s:%d", a.Config.HTTPBindAddr.IP.String(), a.Config.HTTPBindAddr.Port)
	if err := http.ListenAndServe(portStr, handler); err != nil {
		a.logger.Error("api error", err)
	}
}

// Shutdown closes this agent and all of its processes. Should be preceded
// by a Leave for a graceful shutdown.
func (a *Agent) Shutdown() error {
	a.shutdownLock.Lock()
	defer a.shutdownLock.Unlock()

	if a.shutdown {
		return nil
	}

	if a.serf == nil {
		goto EXIT
	}

	a.logger.Info("request serf shutdown")
	if err := a.serf.Shutdown(); err != nil {
		return err
	}

EXIT:
	a.logger.Info("complete serf shutdown")
	a.shutdown = true
	close(a.shutdownCh)
	return nil
}

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) Join(addrs []string) (n int, err error) {
	a.logger.Info("issue join request", "addrs", addrs)
	// we always ignore old events because cascade don't
	// care about the past
	n, err = a.serf.Join(addrs, true)
	if err != nil {
		return n, fmt.Errorf("Error joining: %v\n", err)
	}
	return n, err
}

// Leave prepares for a graceful shutdown of the agent and its processes
func (a *Agent) Leave() error {
	if a.serf == nil {
		return nil
	}

	a.logger.Info("request graceful serf leave")
	return a.serf.Leave()
}

// ShutdownCh returns a channel that can be selected to wait
// for the agent to perform a shutdown.
func (a *Agent) ShutdownCh() <-chan struct{} {
	return a.shutdownCh
}

func (a *Agent) GetCoordinate() error {
	return nil
}

// TODO: filter support
// Members returns all of the serf members in the gossip cluster.
func (a *Agent) Members() []serf.Member {
	// todo: move this to cascade/client perhaps
	return a.serf.Members()
}

// [1]: sources for consul serf tweaks
//      https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go
//      https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go