small pixel drawing of a pufferfish cascade

main.go

// design touchstones
//   configured entirely via environment variables
//   minimal configurable options
//   a single global cluster
//   easy cluster formation
// todo
//   dns resolver for services

package main

import (
	"fmt"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/hashicorp/memberlist"
	"github.com/hashicorp/serf/serf"
	"golang.org/x/exp/slog"
)

const (
	// gracefulTimeout controls how long we wait before forcefully terminating
	// note that this value interacts with serfConfig.LeavePropagateDelay
	gracefulTimeout = 10 * time.Second
)

func main() {
	config := readConfig()
	logger := setupLogger(config)
	agent := setupAgent(config, logger)
	if err := agent.Start(); err != nil {
		panic(err)
	}
	defer agent.Shutdown()
	// join any specified startup nodes
	if err := startupJoin(config, agent); err != nil {
		panic(err)
	}
	go debugPrints(agent)
	if err := handleSignals(config, agent); err != nil {
		panic(err)
	}
}

func setupLogger(config *Config) *slog.Logger {
	debugOn := false
	if config.LogLevel == "DEBUG" {
		debugOn = true
	}
	opts := slog.HandlerOptions{
		Level: config.GetLogLevel(),
		AddSource: debugOn,
	}
	handler := opts.NewTextHandler(os.Stderr)
	logger := slog.New(handler)
	slog.SetDefault(logger)
	return logger
}


func debugPrints(a *Agent) {
	for {
		for _, m := range a.serf.Members() {
			a.logger.Debug("debug-loop", "name", m.Name, "addr", m.Addr, "status", m.Status)
		}
		time.Sleep(time.Second * 5)
	}
}

// handleSignals blocks until we get an exit-causing signal
func handleSignals(config *Config, agent *Agent) error {
	signalCh := make(chan os.Signal, 4)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

	// Wait for a signal
	var sig os.Signal
	select {
	case s := <-signalCh:
		sig = s
	case <-agent.ShutdownCh():
		// Agent is already shutdown!
		return nil
	}
	agent.logger.Info("caught signal", "signal", sig)

	// Check if we should do a graceful leave
	graceful := false
	if sig == os.Interrupt || sig == syscall.SIGTERM {
		graceful = true
	}

	// Bail fast if not doing a graceful leave
	if !graceful {
		agent.logger.Warn("leave cluster with zero grace")
		return nil
	}

	// Attempt a graceful leave
	gracefulCh := make(chan struct{})
	agent.logger.Info("shut down agent gracefully")
	go func() {
		if err := agent.Leave(); err != nil {
			agent.logger.Error("Error: %s", err)
			return
		}
		close(gracefulCh)
	}()

	// Wait for leave or another signal
	select {
	case <-signalCh:
		return fmt.Errorf("idfk")
	case <-time.After(gracefulTimeout):
		return fmt.Errorf("leave timed out")
	case <-gracefulCh:
		return nil
	}
}

func readConfig() *Config {
	config := DefaultConfig()
	// CASCADE_LOGLEVEL=info
	if os.Getenv("CASCADE_LOGLEVEL") != "" {
		config.LogLevel = strings.ToUpper(os.Getenv("CASCADE_LOGLEVEL"))
	}
	// CASCADE_BIND=192.168.0.15:12345
	if os.Getenv("CASCADE_BIND") != "" {
		config.BindAddr = os.Getenv("CASCADE_BIND")
	}
	// CASCADE_JOIN=127.0.0.1,127.0.0.5
	if os.Getenv("CASCADE_JOIN") != "" {
		config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",")
	}
	// CASCADE_NAME=nostromo.j3s.sh
	if os.Getenv("CASCADE_NAME") != "" {
		config.NodeName = os.Getenv("CASCADE_NAME")
	}
	return config
}

func startupJoin(config *Config, agent *Agent) error {
	if len(config.StartJoin) == 0 {
		return nil
	}

	n, err := agent.Join(config.StartJoin)
	if err != nil {
		return err
	}
	if n > 0 {
		agent.logger.Info("issue join request", "nodes", n)
	}

	return nil
}

func setupAgent(config *Config, logger *slog.Logger) *Agent {
	bindIP, bindPort, err := config.AddrParts(config.BindAddr)
	if err != nil {
		panic(err)
	}

	serfConfig := serf.DefaultConfig()
	// logLogger is just a bridge from the old logger to the
	// new slog handler.
	logLogger := slog.NewLogLogger(logger.Handler(), config.GetLogLevel())
	serfConfig.Logger = logLogger
	serfConfig.NodeName = config.NodeName
	serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)
	// TODO: how should cascade handle name conflicts?
	//       defaulting to just blowing up for now, but
	//       we _could_ take the tailscale route & append
	//       -1 or whatever to the node. that would be more user friendly.
	// TODO: some of these serf settings were pulled
	// from consul[1]. re-examine them eventually.
	serfConfig.EnableNameConflictResolution = false
	serfConfig.LeavePropagateDelay = 3 * time.Second
	serfConfig.MinQueueDepth = 4096
	serfConfig.QueueDepthWarning = 1000000
	serfConfig.ReconnectTimeout = 3 * 24 * time.Hour

	serfConfig.MemberlistConfig = memberlist.DefaultWANConfig()
	serfConfig.MemberlistConfig.Logger = logLogger
	serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
	serfConfig.MemberlistConfig.BindAddr = bindIP
	serfConfig.MemberlistConfig.BindPort = bindPort

	agent := Create(config, serfConfig, logger)
	return agent
}

// [1]: sources for consul serf tweaks
//      https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go
//      https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go