small pixel drawing of a pufferfish cascade

agent.go

package main

import (
	"fmt"
	"log"
	"sync"

	"github.com/hashicorp/serf/serf"
)

// Agent starts and manages a Serf & adds
// service discovery
type Agent struct {
	conf     *Config
	serfConf *serf.Config
	eventCh  chan serf.Event

	// i doubt we care about handling events,
	// but i'm leaving this here just in case
	//	eventHandlerList  []EventHandler
	//	eventHandlersLock sync.Mutex

	// This is the underlying Serf we are wrapping
	serf *serf.Serf

	// this channel receiving a signal = shutdown
	shutdown     bool
	shutdownCh   chan struct{}
	shutdownLock sync.Mutex
}

func Create(conf *Config, serfConf *serf.Config) *Agent {
	// Create a channel to listen for events from Serf
	eventCh := make(chan serf.Event, 64)
	serfConf.EventCh = eventCh

	// Setup the agent
	agent := &Agent{
		conf:       conf,
		serfConf:   serfConf,
		eventCh:    eventCh,
		shutdownCh: make(chan struct{}),
	}

	return agent
}


func (a *Agent) eventLoop() {
	serfShutdownCh := a.serf.ShutdownCh()
	for {
		select {
		case e := <-a.eventCh:
			log.Printf("[INFO] agent: Received event: %s", e.String())

		case <-serfShutdownCh:
			log.Printf("[WARN] agent: Serf shutdown detected, quitting")
			a.Shutdown()
			return

		case <-a.shutdownCh:
			return
		}
	}
}

func (a *Agent) Start() error {
	log.Printf("[INFO] agent: Serf agent starting")

	// Create serf first
	serf, err := serf.Create(a.serfConf)
	if err != nil {
		return fmt.Errorf("Error creating Serf: %s", err)
	}
	a.serf = serf

	// Start event loop
	go a.eventLoop()
	return nil
}

// Shutdown closes this agent and all of its processes. Should be preceded
// by a Leave for a graceful shutdown.
func (a *Agent) Shutdown() error {
	a.shutdownLock.Lock()
	defer a.shutdownLock.Unlock()

	if a.shutdown {
		return nil
	}

	if a.serf == nil {
		goto EXIT
	}

	log.Println("[INFO] agent: requesting serf shutdown")
	if err := a.serf.Shutdown(); err != nil {
		return err
	}

EXIT:
	log.Println("[INFO] agent: shutdown complete")
	a.shutdown = true
	close(a.shutdownCh)
	return nil
}

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
	log.Printf("[INFO] agent: joining: %v replay: %v", addrs, replay)
	ignoreOld := !replay
	n, err = a.serf.Join(addrs, ignoreOld)
	if n > 0 {
		log.Printf("[INFO] agent: joined: %d nodes", n)
	}
	if err != nil {
		log.Printf("[WARN] agent: error joining: %v", err)
	}
	return
}

// Leave prepares for a graceful shutdown of the agent and its processes
func (a *Agent) Leave() error {
	if a.serf == nil {
		return nil
	}

	log.Println("[INFO] agent: requesting graceful leave from Serf")
	return a.serf.Leave()
}