small pixel drawing of a pufferfish cascade

main.go

// design touchstones
//   configured entirely via environment variables
//   minimal configurable options
//   a single global cluster
//   easy cluster formation
// todo
//   dns resolver for services

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/hashicorp/memberlist"
	"github.com/hashicorp/serf/serf"
)

const (
	// gracefulTimeout controls how long we wait before forcefully terminating
	// note that this value interacts with serfConfig.LeavePropagateDelay
	gracefulTimeout = 5 * time.Second
)

func main() {
	config := readConfig()
	agent := setupAgent(config)
	if err := agent.Start(); err != nil {
		log.Panic(err)
	}
	defer agent.Shutdown()
	// join any specified startup nodes
	if err := startupJoin(config, agent); err != nil {
		log.Panic(err)
	}
	if err := handleSignals(config, agent); err != nil {
		log.Panic(err)
	}
}

// handleSignals blocks until we get an exit-causing signal
func handleSignals(config *Config, agent *Agent) error {
	signalCh := make(chan os.Signal, 4)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

	// Wait for a signal
	var sig os.Signal
	select {
	case s := <-signalCh:
		sig = s
	case <-agent.ShutdownCh():
		// Agent is already shutdown!
		return nil
	}
	log.Printf("Caught signal: %v", sig)

	// Check if we should do a graceful leave
	graceful := false
	if sig == os.Interrupt || sig == syscall.SIGTERM {
		graceful = true
	}

	// Bail fast if not doing a graceful leave
	if !graceful {
		log.Fatal("[WARN] cascade: non-graceful leave detected")
	}

	// Attempt a graceful leave
	gracefulCh := make(chan struct{})
	log.Println("Gracefully shutting down agent...")
	go func() {
		if err := agent.Leave(); err != nil {
			log.Printf("Error: %s", err)
			return
		}
		close(gracefulCh)
	}()

	// Wait for leave or another signal
	select {
	case <-signalCh:
		return fmt.Errorf("[WARN] cascade: idk")
	case <-time.After(gracefulTimeout):
		return fmt.Errorf("[WARN] cascade: leave timed out")
	case <-gracefulCh:
		return nil
	}
}

func readConfig() *Config {
	config := DefaultConfig()
	// CASCADE_BIND=127.0.0.1:1234
	if os.Getenv("CASCADE_BIND") != "" {
		config.BindAddr = os.Getenv("CASCADE_BIND")
	}
	// CASCADE_JOIN=127.0.0.1,127.0.0.5
	if os.Getenv("CASCADE_JOIN") != "" {
		config.StartJoin = strings.Split(os.Getenv("CASCADE_JOIN"), ",")
	}
	// CASCADE_NAME=nostromo.j3s.sh
	if os.Getenv("CASCADE_NAME") != "" {
		config.NodeName = os.Getenv("CASCADE_NAME")
	}
	return config
}

func startupJoin(config *Config, agent *Agent) error {
	if len(config.StartJoin) == 0 {
		return nil
	}

	log.Printf("Joining cluster...")
	n, err := agent.Join(config.StartJoin)
	if err != nil {
		return err
	}

	log.Printf("Join completed. Synced with %d initial agents", n)
	return nil
}

func setupAgent(config *Config) *Agent {
	bindIP, bindPort, err := config.AddrParts(config.BindAddr)
	if err != nil {
		log.Panic(err)
	}

	serfConfig := serf.DefaultConfig()
	serfConfig.NodeName = config.NodeName
	serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)
	// TODO: how should cascade handle name conflicts?
	//       defaulting to just blowing up for now, but
	//       we _could_ take the tailscale route & append
	//       -1 or whatever to the node. that would be more user friendly.
	// TODO: some of these serf settings were pulled
	// from consul[1]. re-examine them eventually.
	serfConfig.EnableNameConflictResolution = false
	serfConfig.LeavePropagateDelay = 3 * time.Second
	serfConfig.MinQueueDepth = 4096
	serfConfig.QueueDepthWarning = 1000000
	serfConfig.ReconnectTimeout = 3 * 24 * time.Hour

	serfConfig.MemberlistConfig = memberlist.DefaultWANConfig()
	serfConfig.MemberlistConfig.DeadNodeReclaimTime = 30 * time.Second
	serfConfig.MemberlistConfig.BindAddr = bindIP
	serfConfig.MemberlistConfig.BindPort = bindPort

	agent := Create(serfConfig)
	return agent
}

// [1]: sources for consul serf tweaks
//      https://github.com/hashicorp/consul/blob/v1.14.4/agent/consul/config.go
//      https://github.com/hashicorp/consul/blob/v1.14.4/lib/serf/serf.go