small pixel drawing of a pufferfish cascade

agent.go

package main

import (
	"log"
	"os"
	"sync"
	"time"

	"github.com/hashicorp/memberlist"
	"github.com/hashicorp/serf/serf"
)

// Agent starts and manages a Serf & adds
// service discovery
type Agent struct {
	conf     *Config
	serfConf *serf.Config
	eventCh  chan serf.Event

	// i doubt we care about handling events,
	// but i'm leaving this here just in case
	//	eventHandlerList  []EventHandler
	//	eventHandlersLock sync.Mutex

	// This is the underlying Serf we are wrapping
	serf *serf.Serf

	// this channel receiving a signal = shutdown
	shutdown bool
	shutdownCh chan struct{}
	shutdownLock sync.Mutex
}

func Create(conf *Config, serfConf *serf.Config) (*Agent) {
	// Create a channel to listen for events from Serf
	eventCh := make(chan serf.Event, 64)
	serfConf.EventCh = eventCh

	// Setup the agent
	agent := &Agent{
		conf:     conf,
		serfConf:          serfConf,
		eventCh:       eventCh,
		shutdownCh:    make(chan struct{}),
	}

	return agent
}

// setupAgent calls Create
func SetupAgent(config *Config) *Agent {
	bindIP, bindPort, err := config.AddrParts(config.BindAddr)
	if err != nil {
		log.Panic(err)
	}

	serfConfig := serf.DefaultConfig()
	if os.Getenv("CASCADE_BIND") != "" {
		config.BindAddr = os.Getenv("CASCADE_BIND")
	}
	serfConfig.MemberlistConfig.BindAddr = bindIP
	serfConfig.MemberlistConfig.BindPort = bindPort
	serfConfig.MemberlistConfig.AdvertiseAddr = ""
	serfConfig.MemberlistConfig.AdvertisePort = 0
	serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)
	serfConfig.CoalescePeriod = 3 * time.Second
	serfConfig.QuiescentPeriod = time.Second
	serfConfig.QueryResponseSizeLimit = 1024
	serfConfig.QuerySizeLimit = 1024
	serfConfig.UserEventSizeLimit = 512
	serfConfig.UserCoalescePeriod = 3 * time.Second
	serfConfig.UserQuiescentPeriod = time.Second
	// TODO: look at reconnect/tombstone settings w more scrutiny
	serfConfig.ReconnectInterval = 0
	serfConfig.ReconnectTimeout = 0
	serfConfig.TombstoneTimeout = 0
	serfConfig.BroadcastTimeout = 0
	// TODO: what are the implications of true here o_O
	serfConfig.EnableNameConflictResolution = true


	// hardcode DefaultWANConfig because cascade is designed to be
	// used as a single global system.
	serfConfig.MemberlistConfig = memberlist.DefaultWANConfig()
	serfConfig.MemberlistConfig.BindAddr = bindIP
	serfConfig.MemberlistConfig.BindPort = bindPort

	agent := Create(config, serfConfig)

	return agent
}

func (a *Agent) eventLoop() {
	serfShutdownCh := a.serf.ShutdownCh()
	for {
		select {
		case e := <-a.eventCh:
			log.Printf("[INFO] agent: Received event: %s", e.String())

		case <-serfShutdownCh:
			log.Printf("[WARN] agent: Serf shutdown detected, quitting")
			a.Shutdown()
			return

		case <-a.shutdownCh:
			return
		}
	}
}

// Shutdown closes this agent and all of its processes. Should be preceded
// by a Leave for a graceful shutdown.
func (a *Agent) Shutdown() error {
	a.shutdownLock.Lock()
	defer a.shutdownLock.Unlock()

	if a.shutdown {
		return nil
	}

	if a.serf == nil {
		goto EXIT
	}

	log.Println("[INFO] agent: requesting serf shutdown")
	if err := a.serf.Shutdown(); err != nil {
		return err
	}

EXIT:
	log.Println("[INFO] agent: shutdown complete")
	a.shutdown = true
	close(a.shutdownCh)
	return nil
}