small pixel drawing of a pufferfish cascade

agent/agent.go

// design touchstones
//   simple to configure/implement
//     - minimal configurable options
//     - sparse use of commands/flags/env vars
//     - fast to start/join a cluster
//     - http-only api
//       api auth/encryption is best implemented by a reverse proxy (like caddy or nginx)
//     - no RPC (it's too annoying to deal with)
//   predictable state
//     - no runtime configuration allowed (no reload support)
//       for a gossip cluster to work, nodes in the cluster must
//       be the source of truth. we do not allow the user to change
//       the cluster's view of reality in any way - the user may
//       only inspect it.
//     - HTTP API only supports read-only commands
//       this ensures that cascade's starting state never
//       diverges from its running state.
//       (restarts are very fast + interruptionless)
//       this also helps with security
//   one massive global cluster
//     - for simplicity, operability, and because gossip can handle it
//   compatability
//     - attempt to have a consul-compatible API where it matters?
// todo
//   dns resolver for services
//   read-write exception for maintenance mode?
//   opentelemetry metrics

// key differences from consul
//   - services are gossip'd

// in cascade, node == member, and catalog == agent
// agent api (http-only)
//   GET /v1/agent/metrics  -> opentelemetry metrics for this agent
//   GET /v1/agent/members  -> show serf cluster membership state
//   GET /v1/agent/services -> show services this agent owns
//     aliases: GET /v1/catalog/nodes
// catalog api (http & dns)
//   GET /v1/catalog/nodes
//   GET /v1/catalog/services
//     dns: dig @127.0.0.1 -p 8600 nostromo.node.cascade. ANY
//     dns: dig @127.0.0.1 -p 8600 redis.service.cascade. ANY

// consul uses /catalog because the masters maintain a service
// catalog separate than the agent state

// in cascade, the agents _are_ expected to maintain service catalogues,
// so we do away with the catalog concept entirely.

package agent

import (
	"fmt"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/hashicorp/memberlist"
	"github.com/hashicorp/serf/serf"
	"golang.org/x/exp/slog"
)

const (
	// gracefulTimeout controls how long we wait before forcefully terminating
	// note that this value interacts with serfConfig.LeavePropagateDelay
	gracefulTimeout = 10 * time.Second
)

func Run() {
	config := readConfig()
	logger := config.SetupLogger()
	agent := setupAgent(config, logger)
	if err := agent.Start(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer agent.Shutdown()
	// join any specified startup nodes
	if err := startupJoin(config, agent); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	if err := handleSignals(config, agent); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}

// handleSignals blocks until we get an exit-causing signal
func handleSignals(config *Config, agent *Agent) error {
	signalCh := make(chan os.Signal, 4)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

	// Wait for a signal
	var sig os.Signal
	select {
	case s := <-signalCh:
		sig = s
	case <-agent.ShutdownCh():
		// Agent is already shutdown!
		return nil
	}
	agent.logger.Info("caught signal", "signal", sig)

	// Check if we should do a graceful leave
	graceful := false
	if sig == os.Interrupt || sig == syscall.SIGTERM {
		graceful = true
	}

	// Bail fast if not doing a graceful leave
	if !graceful {
		agent.logger.Warn("leave cluster with zero grace")
		return nil
	}

	// Attempt a graceful leave
	gracefulCh := make(chan struct{})
	agent.logger.Info("shut down agent gracefully")
	go func() {
		if err := agent.Leave(); err != nil {
			agent.logger.Error("Error: %s", err)
			return
		}
		close(gracefulCh)
	}()

	// Wait for leave or another signal
	select {
	case <-signalCh:
		return fmt.Errorf("idfk")
	case <-time.After(gracefulTimeout):
		return fmt.Errorf("leave timed out")
	case <-gracefulCh:
		return nil
	}
}

func readConfig() *Config {
	config := DefaultConfig()
	// CASCADE_LOGLEVEL=info
	l := os.Getenv("CASCADE_LOGLEVEL")
	if l != "" {
		config.LogLevel = l
		if l == "DEBUG" {
			//			config.
		}
	}
	// CASCADE_BIND=192.168.0.15:12345
	if os.Getenv("CASCADE_BIND") != "" {
		config.BindAddr = os.Getenv("CASCADE_BIND")
	}
	// CASCADE_JOIN=127.0.0.1,127.0.0.5
	if os.Getenv("CASCADE_JOIN") != "" {
		config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",")
	}
	// CASCADE_NAME=nostromo.j3s.sh
	if os.Getenv("CASCADE_NAME") != "" {
		config.NodeName = os.Getenv("CASCADE_NAME")
	}
	return config
}

func startupJoin(config *Config, agent *Agent) error {
	if len(config.StartJoin) == 0 {
		return nil
	}

	n, err := agent.Join(config.StartJoin)
	if err != nil {
		return err
	}
	if n > 0 {
		agent.logger.Info("issue join request", "nodes", n)
	}

	return nil
}

func setupAgent(config *Config, logger *slog.Logger) *Agent {
	bindIP, bindPort, err := config.AddrParts(config.BindAddr)
	if err != nil {
		panic(err)
	}

	serfConfig := serf.DefaultConfig()
	// logLogger is just a bridge from the old logger to the
	// new slog handler.
	logLogger := slog.NewLogLogger(logger.Handler(), config.GetLogLevel())
	serfConfig.Logger = logLogger
	serfConfig.NodeName = config.NodeName
	serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)
	// TODO: how should cascade handle name conflicts?
	//       defaulting to just blowing up for now, but
	//       we _could_ take the tailscale route & append
	//       -1 or whatever to the node. that would be more user friendly.
	// TODO: some of these serf settings were pulled
	// from consul[1]. re-examine them eventually.
	serfConfig.EnableNameConflictResolution = false
	serfConfig.LeavePropagateDelay = 3 * time.Second
	serfConfig.MinQueueDepth = 4096
	serfConfig.QueueDepthWarning = 1000000
	serfConfig.ReconnectTimeout = 3 * 24 * time.Hour

	serfConfig.MemberlistConfig = memberlist.DefaultWANConfig()
	serfConfig.MemberlistConfig.Logger = logLogger
	serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
	serfConfig.MemberlistConfig.BindAddr = bindIP
	serfConfig.MemberlistConfig.BindPort = bindPort

	agent := Create(config, serfConfig, logger)
	agent.logger.Info("setup cascade agent", "nodename", config.NodeName, "bindport", bindPort, "bindip", bindIP, "loglevel", config.GetLogLevel())
	return agent
}

// [1]: sources for consul serf tweaks
//      https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go
//      https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go

// Agent starts and manages a Serf & adds service discovery (TODO)
type Agent struct {
	serfConf *serf.Config
	eventCh  chan serf.Event
	// This is the underlying Serf we are wrapping
	serf *serf.Serf
	// a lil logger used by the agent, you know how it be
	logger *slog.Logger
	// this channel receiving a signal = shutdown
	shutdown     bool
	shutdownCh   chan struct{}
	shutdownLock sync.Mutex
}

func Create(config *Config, serfConf *serf.Config, logger *slog.Logger) *Agent {
	// Create a channel to listen for events from Serf
	eventCh := make(chan serf.Event, 64)
	serfConf.EventCh = eventCh

	// Setup the agent
	agent := &Agent{
		serfConf:   serfConf,
		eventCh:    eventCh,
		logger:     logger,
		shutdownCh: make(chan struct{}),
	}

	return agent
}

func (a *Agent) eventLoop() {
	serfShutdownCh := a.serf.ShutdownCh()
	for {
		select {
		case e := <-a.eventCh:
			a.logger.Info("receive event", "event", e.String())

		case <-serfShutdownCh:
			a.logger.Info("detect serf shutdown, turning off event loop")
			a.Shutdown()
			return

		case <-a.shutdownCh:
			return
		}
	}
}

func (a *Agent) Start() error {
	// Create serf first
	serf, err := serf.Create(a.serfConf)
	if err != nil {
		return err
	}
	a.serf = serf

	// Start event loop
	go a.eventLoop()
	return nil
}

// Shutdown closes this agent and all of its processes. Should be preceded
// by a Leave for a graceful shutdown.
func (a *Agent) Shutdown() error {
	a.shutdownLock.Lock()
	defer a.shutdownLock.Unlock()

	if a.shutdown {
		return nil
	}

	if a.serf == nil {
		goto EXIT
	}

	a.logger.Info("request serf shutdown")
	if err := a.serf.Shutdown(); err != nil {
		return err
	}

EXIT:
	a.logger.Info("complete serf shutdown")
	a.shutdown = true
	close(a.shutdownCh)
	return nil
}

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) Join(addrs []string) (n int, err error) {
	a.logger.Info("issue join request", "addrs", addrs)
	// we always ignore old events because cascade don't
	// care about the past
	n, err = a.serf.Join(addrs, true)
	if err != nil {
		return n, fmt.Errorf("Error joining: %v\n", err)
	}
	// TODO: when joining fails, we don't get an error here - serf & memberlist
	// just print to stdout and serf.Join returns without issue.
	return n, err
}

// Leave prepares for a graceful shutdown of the agent and its processes
func (a *Agent) Leave() error {
	if a.serf == nil {
		return nil
	}

	a.logger.Info("request graceful serf leave")
	return a.serf.Leave()
}

// ShutdownCh returns a channel that can be selected to wait
// for the agent to perform a shutdown.
func (a *Agent) ShutdownCh() <-chan struct{} {
	return a.shutdownCh
}

func (c *Config) SetupLogger() *slog.Logger {
	opts := slog.HandlerOptions{
		Level: c.GetLogLevel(),
	}
	if c.LogLevel == "DEBUG" {
		opts.AddSource = true
	}
	handler := opts.NewTextHandler(os.Stderr)
	logger := slog.New(handler)
	slog.SetDefault(logger)
	return logger
}