small pixel drawing of a pufferfish cascade

initial api template; subject to massive change
Jes Olson j3s@c3f.net
Tue, 21 Feb 2023 02:26:46 -0800
commit

8153ee2551607e2480556b58abb85f9f83605b6a

parent

d56da6b2faba403e4aa06305a5abcfd27d7bf1b0

M agent/agent.goagent/agent.go

@@ -56,6 +56,7 @@ package agent

import ( "fmt" + "net/http" "os" "sync" "time"

@@ -73,6 +74,9 @@

// This is the underlying Serf we are wrapping serf *serf.Serf + // This is the http api handler - see http.go and *_endpoint.go + httpHandlers *HTTPHandlers + // We receive serf events on this channel eventCh chan serf.Event logger *slog.Logger

@@ -85,16 +89,7 @@ }

// New returns an agent lmao func New(config *Config) *Agent { - agent := Agent{} - serfConfig := serf.DefaultConfig() - eventCh := make(chan serf.Event, 1024) - agent.eventCh = eventCh - serfConfig.EventCh = eventCh - - // XXX: why do serf and cascade use the same event channel? - agent.eventCh = eventCh - serfConfig.EventCh = eventCh // TODO: make log level configurable // XXX: reconsider this agent-scoped logger? what does consul do?

@@ -102,9 +97,11 @@ // logLogger is a bridge from the log.Logger to slog.Logger

h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(os.Stderr) logger := slog.New(h) slog.SetDefault(logger) - agent.logger = logger - serfConfig.Logger = slog.NewLogLogger(h, slog.LevelInfo) + // serf + serfConfig := serf.DefaultConfig() + serfConfig.EventCh = eventCh + serfConfig.Logger = slog.NewLogLogger(h, slog.LevelInfo) // TODO: some of these serf settings were cargo-culted // from consul[1]. re-examine them eventually. serfConfig.EnableNameConflictResolution = false

@@ -118,9 +115,18 @@ serfConfig.ProtocolVersion = uint8(serf.ProtocolVersionMax)

serfConfig.QueueDepthWarning = 1000000 serfConfig.ReconnectTimeout = 3 * 24 * time.Hour - agent.Config = config - agent.serfConfig = serfConfig - agent.shutdownCh = make(chan struct{}) + agent := Agent{ + Config: config, + serfConfig: serfConfig, + httpHandlers: &HTTPHandlers{}, + eventCh: eventCh, + logger: logger, + shutdownCh: make(chan struct{}), + } + + // we put the completed agent inside of httpHandlers because + // it needs access to a bunch of Agent{} funcs. basically a hardlink. + agent.httpHandlers.agent = &agent return &agent }

@@ -143,7 +149,7 @@ }

} // Start creates and starts an agent's serf client, eventloop, api, and more! -// The agent config should not be modified after an agend has been started. +// The agent config should not be modified after an agent has been started. func (a *Agent) Start() error { // Create serf first serf, err := serf.Create(a.serfConfig)

@@ -154,9 +160,21 @@ a.serf = serf

// Start event loop go a.eventLoop() + + // Start API Server + apiHandler := a.httpHandlers.handler() + go a.serveAPI(apiHandler) + return nil } +func (a *Agent) serveAPI(handler http.Handler) { + portStr := fmt.Sprintf(":%d", a.Config.HTTPBindAddr.Port) + if err := http.ListenAndServe(portStr, handler); err != nil { + a.logger.Error("api error", err) + } +} + // Shutdown closes this agent and all of its processes. Should be preceded // by a Leave for a graceful shutdown. func (a *Agent) Shutdown() error {

@@ -209,6 +227,17 @@ // ShutdownCh returns a channel that can be selected to wait

// for the agent to perform a shutdown. func (a *Agent) ShutdownCh() <-chan struct{} { return a.shutdownCh +} + +func (a *Agent) GetCoordinate() error { + return nil +} + +// TODO: filter support +// Members returns all of the serf members in the gossip cluster. +func (a *Agent) Members() []serf.Member { + // todo: move this to cascade/client perhaps + return a.serf.Members() } // [1]: sources for consul serf tweaks
A agent/agent_endpoint.go

@@ -0,0 +1,1100 @@

+package agent + +import ( + "fmt" + "net/http" + + "github.com/hashicorp/serf/coordinate" + "github.com/hashicorp/serf/serf" +) + +type Self struct { + Config interface{} + DebugConfig map[string]interface{} + Coord *coordinate.Coordinate + Member serf.Member + Stats map[string]map[string]string + Meta map[string]string +} + +func (s *HTTPHandlers) agentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var err error + if err = s.agent.GetCoordinate(); err != nil { + return nil, err + } + + config := struct { + NodeName string + Version string + BuildDate string + }{ + NodeName: s.agent.Config.NodeName, + // We expect the ent version to be part of the reported version string, and that's now part of the metadata, not the actual version. + Version: s.agent.Config.VersionWithMetadata(), + } + + return Self{ + Config: config, + // Coord: cs[s.agent.Config.SegmentName], + // Member: s.agent.AgentLocalMember(), + // Stats: s.agent.Stats(), + // Meta: s.agent.State.Metadata(), + }, nil +} + +// return s.agent.Members() +// func (s *HTTPHandlers) Index(resp http.ResponseWriter, req *http.Request) { +func (s *HTTPHandlers) agentMembers(w http.ResponseWriter, r *http.Request) { + members := s.agent.Members() + json, err := s.marshalJSON(r, members) + if err != nil { + fmt.Println(err) + } + fmt.Fprintf(w, string(json)) +} + +// func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Fetch the ACL token, if any, and enforce agent policy. +// var token string +// s.parseToken(req, &token) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// +// // Authorize using the agent's own enterprise meta, not the token. +// var authzContext acl.AuthorizerContext +// s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) +// +// if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil { +// return nil, err +// } +// +// // Get the request partition and default to that of the agent. +// entMeta := s.agent.AgentEnterpriseMeta() +// if err := s.parseEntMetaPartition(req, entMeta); err != nil { +// return nil, err +// } +// +// // Check if the WAN is being queried +// wan := false +// if other := req.URL.Query().Get("wan"); other != "" { +// wan = true +// } +// +// // Get the address +// addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/join/") +// +// if wan { +// if s.agent.config.ConnectMeshGatewayWANFederationEnabled { +// return nil, fmt.Errorf("WAN join is disabled when wan federation via mesh gateways is enabled") +// } +// _, err = s.agent.JoinWAN([]string{addr}) +// } else { +// _, err = s.agent.JoinLAN([]string{addr}, entMeta) +// } +// return nil, err +// } +// +// func (s *HTTPHandlers) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Fetch the ACL token, if any, and enforce agent policy. +// var token string +// s.parseToken(req, &token) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// +// // Authorize using the agent's own enterprise meta, not the token. +// var authzContext acl.AuthorizerContext +// s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) +// if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil { +// return nil, err +// } +// +// if err := s.agent.Leave(); err != nil { +// return nil, err +// } +// return nil, s.agent.ShutdownAgent() +// } +// +// func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Fetch the ACL token, if any, and enforce agent policy. +// var token string +// s.parseToken(req, &token) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// // TODO(partitions): should this be possible in a partition? +// if err := authz.ToAllowAuthorizer().OperatorWriteAllowed(nil); err != nil { +// return nil, err +// } +// +// // Get the request partition and default to that of the agent. +// entMeta := s.agent.AgentEnterpriseMeta() +// if err := s.parseEntMetaPartition(req, entMeta); err != nil { +// return nil, err +// } +// +// // Check the value of the prune query +// _, prune := req.URL.Query()["prune"] +// +// // Check if the WAN is being queried +// _, wan := req.URL.Query()["wan"] +// +// addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/") +// if wan { +// return nil, s.agent.ForceLeaveWAN(addr, prune, entMeta) +// } else { +// return nil, s.agent.ForceLeave(addr, prune, entMeta) +// } +// } +// +// // syncChanges is a helper function which wraps a blocking call to sync +// // services and checks to the server. If the operation fails, we only +// // only warn because the write did succeed and anti-entropy will sync later. +// func (s *HTTPHandlers) syncChanges() { +// if err := s.agent.State.SyncChanges(); err != nil { +// s.agent.logger.Error("failed to sync changes", "error", err) +// } +// } +// +// func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// var token string +// s.parseToken(req, &token) +// +// var args structs.CheckDefinition +// if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// if err := decodeBody(req.Body, &args); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)} +// } +// +// // Verify the check has a name. +// if args.Name == "" { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing check name"} +// } +// +// if args.Status != "" && !structs.ValidStatus(args.Status) { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Bad check status"} +// } +// +// s.defaultMetaPartitionToAgent(&args.EnterpriseMeta) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &args.EnterpriseMeta, nil) +// if err != nil { +// return nil, err +// } +// +// if !s.validateRequestPartition(resp, &args.EnterpriseMeta) { +// return nil, nil +// } +// +// // Construct the health check. +// health := args.HealthCheck(s.agent.config.NodeName) +// +// // Verify the check type. +// chkType := args.CheckType() +// err = chkType.Validate() +// if err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check: %v", err)} +// } +// +// // Store the type of check based on the definition +// health.Type = chkType.Type() +// +// if health.ServiceID != "" { +// // fixup the service name so that vetCheckRegister requires the right ACLs +// cid := health.CompoundServiceID() +// service := s.agent.State.Service(cid) +// if service != nil { +// health.ServiceName = service.Service +// } else { +// return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("ServiceID %q does not exist", cid.String())} +// } +// } +// +// // Get the provided token, if any, and vet against any ACL policies. +// if err := s.agent.vetCheckRegisterWithAuthorizer(authz, health); err != nil { +// return nil, err +// } +// +// // Add the check. +// if err := s.agent.AddCheck(health, chkType, true, token, ConfigSourceRemote); err != nil { +// return nil, err +// } +// s.syncChanges() +// return nil, nil +// } +// +// func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/") +// +// entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "") +// checkID := structs.NewCheckID(types.CheckID(id), &entMeta) +// +// // Get the provided token, if any, and vet against any ACL policies. +// var token string +// s.parseToken(req, &token) +// +// if err := s.parseEntMetaNoWildcard(req, &checkID.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &checkID.EnterpriseMeta, nil) +// if err != nil { +// return nil, err +// } +// +// checkID.Normalize() +// +// if !s.validateRequestPartition(resp, &checkID.EnterpriseMeta) { +// return nil, nil +// } +// +// if err := s.agent.vetCheckUpdateWithAuthorizer(authz, checkID); err != nil { +// return nil, err +// } +// +// if err := s.agent.RemoveCheck(checkID, true); err != nil { +// return nil, err +// } +// s.syncChanges() +// return nil, nil +// } +// +// func (s *HTTPHandlers) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/") +// checkID := types.CheckID(id) +// note := req.URL.Query().Get("note") +// return s.agentCheckUpdate(resp, req, checkID, api.HealthPassing, note) +// } +// +// func (s *HTTPHandlers) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/") +// checkID := types.CheckID(id) +// note := req.URL.Query().Get("note") +// +// return s.agentCheckUpdate(resp, req, checkID, api.HealthWarning, note) +// +// } +// +// func (s *HTTPHandlers) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/") +// checkID := types.CheckID(id) +// note := req.URL.Query().Get("note") +// +// return s.agentCheckUpdate(resp, req, checkID, api.HealthCritical, note) +// } +// +// // checkUpdate is the payload for a PUT to AgentCheckUpdate. +// type checkUpdate struct { +// // Status us one of the api.Health* states, "passing", "warning", or +// // "critical". +// Status string +// +// // Output is the information to post to the UI for operators as the +// // output of the process that decided to hit the TTL check. This is +// // different from the note field that's associated with the check +// // itself. +// Output string +// } +// +// // AgentCheckUpdate is a PUT-based alternative to the GET-based Pass/Warn/Fail +// // APIs. +// func (s *HTTPHandlers) AgentCheckUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// var update checkUpdate +// if err := decodeBody(req.Body, &update); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)} +// } +// +// switch update.Status { +// case api.HealthPassing: +// case api.HealthWarning: +// case api.HealthCritical: +// default: +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check status: '%s'", update.Status)} +// } +// +// id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/") +// checkID := types.CheckID(id) +// +// return s.agentCheckUpdate(resp, req, checkID, update.Status, update.Output) +// } +// +// func (s *HTTPHandlers) agentCheckUpdate(resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) { +// entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "") +// cid := structs.NewCheckID(checkID, &entMeta) +// +// // Get the provided token, if any, and vet against any ACL policies. +// var token string +// s.parseToken(req, &token) +// +// if err := s.parseEntMetaNoWildcard(req, &cid.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &cid.EnterpriseMeta, nil) +// if err != nil { +// return nil, err +// } +// +// cid.Normalize() +// +// if err := s.agent.vetCheckUpdateWithAuthorizer(authz, cid); err != nil { +// return nil, err +// } +// +// if !s.validateRequestPartition(resp, &cid.EnterpriseMeta) { +// return nil, nil +// } +// +// if err := s.agent.updateTTLCheck(cid, status, output); err != nil { +// return nil, err +// } +// s.syncChanges() +// return nil, nil +// } +// +// // agentHealthService Returns Health for a given service ID +// func agentHealthService(serviceID structs.ServiceID, s *HTTPHandlers) (int, string, api.HealthChecks) { +// checks := s.agent.State.ChecksForService(serviceID, true) +// serviceChecks := make(api.HealthChecks, 0) +// for _, c := range checks { +// // TODO: harmonize struct.HealthCheck and api.HealthCheck (or at least extract conversion function) +// healthCheck := &api.HealthCheck{ +// Node: c.Node, +// CheckID: string(c.CheckID), +// Name: c.Name, +// Status: c.Status, +// Notes: c.Notes, +// Output: c.Output, +// ServiceID: c.ServiceID, +// ServiceName: c.ServiceName, +// ServiceTags: c.ServiceTags, +// } +// fillHealthCheckEnterpriseMeta(healthCheck, &c.EnterpriseMeta) +// serviceChecks = append(serviceChecks, healthCheck) +// } +// status := serviceChecks.AggregatedStatus() +// switch status { +// case api.HealthWarning: +// return http.StatusTooManyRequests, status, serviceChecks +// case api.HealthPassing: +// return http.StatusOK, status, serviceChecks +// default: +// return http.StatusServiceUnavailable, status, serviceChecks +// } +// } +// +// func returnTextPlain(req *http.Request) bool { +// if contentType := req.Header.Get("Accept"); strings.HasPrefix(contentType, "text/plain") { +// return true +// } +// if format := req.URL.Query().Get("format"); format != "" { +// return format == "text" +// } +// return false +// } +// +// // AgentHealthServiceByID return the local Service Health given its ID +// func (s *HTTPHandlers) AgentHealthServiceByID(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Pull out the service id (service id since there may be several instance of the same service on this host) +// serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/id/") +// if serviceID == "" { +// return nil, &HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing serviceID"} +// } +// +// var entMeta acl.EnterpriseMeta +// if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { +// return nil, err +// } +// +// var token string +// s.parseToken(req, &token) +// +// // need to resolve to default the meta +// s.defaultMetaPartitionToAgent(&entMeta) +// var authzContext acl.AuthorizerContext +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext) +// if err != nil { +// return nil, err +// } +// +// if !s.validateRequestPartition(resp, &entMeta) { +// return nil, nil +// } +// +// sid := structs.NewServiceID(serviceID, &entMeta) +// +// dc := s.agent.config.Datacenter +// +// if service := s.agent.State.Service(sid); service != nil { +// if err := authz.ToAllowAuthorizer().ServiceReadAllowed(service.Service, &authzContext); err != nil { +// return nil, err +// } +// code, status, healthChecks := agentHealthService(sid, s) +// if returnTextPlain(req) { +// return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"} +// } +// serviceInfo := buildAgentService(service, dc) +// result := &api.AgentServiceChecksInfo{ +// AggregatedStatus: status, +// Checks: healthChecks, +// Service: &serviceInfo, +// } +// return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"} +// } +// notFoundReason := fmt.Sprintf("ServiceId %s not found", sid.String()) +// if returnTextPlain(req) { +// return notFoundReason, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "text/plain"} +// } +// return &api.AgentServiceChecksInfo{ +// AggregatedStatus: api.HealthCritical, +// Checks: nil, +// Service: nil, +// }, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "application/json"} +// } +// +// // AgentHealthServiceByName return the worse status of all the services with given name on an agent +// func (s *HTTPHandlers) AgentHealthServiceByName(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Pull out the service name +// serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/name/") +// if serviceName == "" { +// return nil, &HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service Name"} +// } +// +// var entMeta acl.EnterpriseMeta +// if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { +// return nil, err +// } +// +// var token string +// s.parseToken(req, &token) +// +// s.defaultMetaPartitionToAgent(&entMeta) +// // need to resolve to default the meta +// var authzContext acl.AuthorizerContext +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext) +// if err != nil { +// return nil, err +// } +// +// if err := authz.ToAllowAuthorizer().ServiceReadAllowed(serviceName, &authzContext); err != nil { +// return nil, err +// } +// +// if !s.validateRequestPartition(resp, &entMeta) { +// return nil, nil +// } +// +// dc := s.agent.config.Datacenter +// +// code := http.StatusNotFound +// status := fmt.Sprintf("ServiceName %s Not Found", serviceName) +// +// services := s.agent.State.ServicesByName(structs.NewServiceName(serviceName, &entMeta)) +// result := make([]api.AgentServiceChecksInfo, 0, 16) +// for _, service := range services { +// sid := structs.NewServiceID(service.ID, &entMeta) +// +// scode, sstatus, healthChecks := agentHealthService(sid, s) +// serviceInfo := buildAgentService(service, dc) +// res := api.AgentServiceChecksInfo{ +// AggregatedStatus: sstatus, +// Checks: healthChecks, +// Service: &serviceInfo, +// } +// result = append(result, res) +// // When service is not found, we ignore it and keep existing HTTP status +// if code == http.StatusNotFound { +// code = scode +// status = sstatus +// } +// // We take the worst of all statuses, so we keep iterating +// // passing: 200 < warning: 429 < critical: 503 +// if code < scode { +// code = scode +// status = sstatus +// } +// } +// if returnTextPlain(req) { +// return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"} +// } +// return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"} +// } +// +// func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// var args structs.ServiceDefinition +// // Fixup the type decode of TTL or Interval if a check if provided. +// +// if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// if err := decodeBody(req.Body, &args); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)} +// } +// +// // Verify the service has a name. +// if args.Name == "" { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"} +// } +// +// // Check the service address here and in the catalog RPC endpoint +// // since service registration isn't synchronous. +// if ipaddr.IsAny(args.Address) { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Invalid service address"} +// } +// +// var token string +// s.parseToken(req, &token) +// +// s.defaultMetaPartitionToAgent(&args.EnterpriseMeta) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &args.EnterpriseMeta, nil) +// if err != nil { +// return nil, err +// } +// +// if !s.validateRequestPartition(resp, &args.EnterpriseMeta) { +// return nil, nil +// } +// +// // Get the node service. +// ns := args.NodeService() +// if ns.Weights != nil { +// if err := structs.ValidateWeights(ns.Weights); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid Weights: %v", err)} +// } +// } +// if err := structs.ValidateServiceMetadata(ns.Kind, ns.Meta, false); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid Service Meta: %v", err)} +// } +// +// // Run validation. This same validation would happen on the catalog endpoint, +// // so it helps ensure the sync will work properly. +// if err := ns.Validate(); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Validation failed: %v", err.Error())} +// } +// +// // Verify the check type. +// chkTypes, err := args.CheckTypes() +// if err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check: %v", err)} +// } +// for _, check := range chkTypes { +// if check.Status != "" && !structs.ValidStatus(check.Status) { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Status for checks must 'passing', 'warning', 'critical'"} +// } +// } +// +// // Verify the sidecar check types +// if args.Connect != nil && args.Connect.SidecarService != nil { +// chkTypes, err := args.Connect.SidecarService.CheckTypes() +// if err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check in sidecar_service: %v", err)} +// } +// for _, check := range chkTypes { +// if check.Status != "" && !structs.ValidStatus(check.Status) { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Status for checks must 'passing', 'warning', 'critical'"} +// } +// } +// } +// +// // Get the provided token, if any, and vet against any ACL policies. +// if err := s.agent.vetServiceRegisterWithAuthorizer(authz, ns); err != nil { +// return nil, err +// } +// +// // See if we have a sidecar to register too +// sidecar, sidecarChecks, sidecarToken, err := sidecarServiceFromNodeService(ns, token) +// if err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid SidecarService: %s", err)} +// } +// if sidecar != nil { +// if err := sidecar.ValidateForAgent(); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Failed Validation: %v", err.Error())} +// } +// // Make sure we are allowed to register the sidecar using the token +// // specified (might be specific to sidecar or the same one as the overall +// // request). +// if err := s.agent.vetServiceRegister(sidecarToken, sidecar); err != nil { +// return nil, err +// } +// // We parsed the sidecar registration, now remove it from the NodeService +// // for the actual service since it's done it's job and we don't want to +// // persist it in the actual state/catalog. SidecarService is meant to be a +// // registration syntax sugar so don't propagate it any further. +// ns.Connect.SidecarService = nil +// } +// +// // Add the service. +// replaceExistingChecks := false +// +// query := req.URL.Query() +// if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") { +// replaceExistingChecks = true +// } +// +// addReq := AddServiceRequest{ +// Service: ns, +// chkTypes: chkTypes, +// persist: true, +// token: token, +// Source: ConfigSourceRemote, +// replaceExistingChecks: replaceExistingChecks, +// } +// if err := s.agent.AddService(addReq); err != nil { +// return nil, err +// } +// +// if sidecar != nil { +// addReq := AddServiceRequest{ +// Service: sidecar, +// chkTypes: sidecarChecks, +// persist: true, +// token: sidecarToken, +// Source: ConfigSourceRemote, +// replaceExistingChecks: replaceExistingChecks, +// } +// if err := s.agent.AddService(addReq); err != nil { +// return nil, err +// } +// } +// s.syncChanges() +// return nil, nil +// } +// +// func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/") +// entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "") +// sid := structs.NewServiceID(serviceID, &entMeta) +// +// // Get the provided token, if any, and vet against any ACL policies. +// var token string +// s.parseToken(req, &token) +// +// if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &sid.EnterpriseMeta, nil) +// if err != nil { +// return nil, err +// } +// +// sid.Normalize() +// +// if !s.validateRequestPartition(resp, &sid.EnterpriseMeta) { +// return nil, nil +// } +// +// if err := s.agent.vetServiceUpdateWithAuthorizer(authz, sid); err != nil { +// return nil, err +// } +// +// if err := s.agent.RemoveService(sid); err != nil { +// return nil, err +// } +// +// s.syncChanges() +// return nil, nil +// } +// +// func (s *HTTPHandlers) AgentServiceMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Ensure we have a service ID +// serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/maintenance/") +// entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "") +// sid := structs.NewServiceID(serviceID, &entMeta) +// +// if sid.ID == "" { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service ID"} +// } +// +// // Ensure we have some action +// params := req.URL.Query() +// if _, ok := params["enable"]; !ok { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing value for enable"} +// } +// +// raw := params.Get("enable") +// enable, err := strconv.ParseBool(raw) +// if err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid value for enable: %q", raw)} +// } +// +// // Get the provided token, if any, and vet against any ACL policies. +// var token string +// s.parseToken(req, &token) +// +// if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &sid.EnterpriseMeta, nil) +// if err != nil { +// return nil, err +// } +// +// sid.Normalize() +// +// if !s.validateRequestPartition(resp, &sid.EnterpriseMeta) { +// return nil, nil +// } +// +// if err := s.agent.vetServiceUpdateWithAuthorizer(authz, sid); err != nil { +// return nil, err +// } +// +// if enable { +// reason := params.Get("reason") +// if err = s.agent.EnableServiceMaintenance(sid, reason, token); err != nil { +// return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: err.Error()} +// } +// } else { +// if err = s.agent.DisableServiceMaintenance(sid); err != nil { +// return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: err.Error()} +// } +// } +// s.syncChanges() +// return nil, nil +// } +// +// func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Ensure we have some action +// params := req.URL.Query() +// if _, ok := params["enable"]; !ok { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing value for enable"} +// } +// +// raw := params.Get("enable") +// enable, err := strconv.ParseBool(raw) +// if err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid value for enable: %q", raw)} +// } +// +// // Get the provided token, if any, and vet against any ACL policies. +// var token string +// s.parseToken(req, &token) +// +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// +// var authzContext acl.AuthorizerContext +// s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) +// if err := authz.ToAllowAuthorizer().NodeWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil { +// return nil, err +// } +// +// if enable { +// s.agent.EnableNodeMaintenance(params.Get("reason"), token) +// } else { +// s.agent.DisableNodeMaintenance() +// } +// s.syncChanges() +// return nil, nil +// } +// +// func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Fetch the ACL token, if any, and enforce agent policy. +// var token string +// s.parseToken(req, &token) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// +// // Authorize using the agent's own enterprise meta, not the token. +// var authzContext acl.AuthorizerContext +// s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) +// if err := authz.ToAllowAuthorizer().AgentReadAllowed(s.agent.config.NodeName, &authzContext); err != nil { +// return nil, err +// } +// +// // Get the provided loglevel. +// logLevel := req.URL.Query().Get("loglevel") +// if logLevel == "" { +// logLevel = "INFO" +// } +// +// var logJSON bool +// if _, ok := req.URL.Query()["logjson"]; ok { +// logJSON = true +// } +// +// if !logging.ValidateLogLevel(logLevel) { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Unknown log level: %s", logLevel)} +// } +// +// flusher, ok := resp.(http.Flusher) +// if !ok { +// return nil, fmt.Errorf("Streaming not supported") +// } +// +// monitor := monitor.New(monitor.Config{ +// BufferSize: 512, +// Logger: s.agent.logger, +// LoggerOptions: &hclog.LoggerOptions{ +// Level: logging.LevelFromString(logLevel), +// JSONFormat: logJSON, +// }, +// }) +// logsCh := monitor.Start() +// +// // Send header so client can start streaming body +// resp.WriteHeader(http.StatusOK) +// +// // 0 byte write is needed before the Flush call so that if we are using +// // a gzip stream it will go ahead and write out the HTTP response header +// resp.Write([]byte("")) +// flusher.Flush() +// const flushDelay = 200 * time.Millisecond +// flushTicker := time.NewTicker(flushDelay) +// defer flushTicker.Stop() +// +// // Stream logs until the connection is closed. +// for { +// select { +// case <-req.Context().Done(): +// droppedCount := monitor.Stop() +// if droppedCount > 0 { +// s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount) +// } +// flusher.Flush() +// return nil, nil +// +// case log := <-logsCh: +// fmt.Fprint(resp, string(log)) +// +// case <-flushTicker.C: +// flusher.Flush() +// } +// } +// } +// +// func (s *HTTPHandlers) AgentToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// if s.checkACLDisabled() { +// return nil, HTTPError{StatusCode: http.StatusUnauthorized, Reason: "ACL support disabled"} +// } +// +// // Fetch the ACL token, if any, and enforce agent policy. +// var token string +// s.parseToken(req, &token) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// +// // Authorize using the agent's own enterprise meta, not the token. +// var authzContext acl.AuthorizerContext +// s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) +// if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil { +// return nil, err +// } +// +// // The body is just the token, but it's in a JSON object so we can add +// // fields to this later if needed. +// var args api.AgentToken +// if err := decodeBody(req.Body, &args); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)} +// } +// +// // Figure out the target token. +// target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/") +// +// err = s.agent.tokens.WithPersistenceLock(func() error { +// triggerAntiEntropySync := false +// switch target { +// case "acl_token", "default": +// changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI) +// if changed { +// triggerAntiEntropySync = true +// } +// +// case "acl_agent_token", "agent": +// changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI) +// if changed { +// triggerAntiEntropySync = true +// } +// +// case "acl_agent_master_token", "agent_master", "agent_recovery": +// s.agent.tokens.UpdateAgentRecoveryToken(args.Token, token_store.TokenSourceAPI) +// +// case "acl_replication_token", "replication": +// s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI) +// +// case "config_file_service_registration": +// s.agent.tokens.UpdateConfigFileRegistrationToken(args.Token, token_store.TokenSourceAPI) +// +// default: +// return HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("Token %q is unknown", target)} +// } +// +// // TODO: is it safe to move this out of WithPersistenceLock? +// if triggerAntiEntropySync { +// s.agent.sync.SyncFull.Trigger() +// } +// return nil +// }) +// if err != nil { +// return nil, err +// } +// +// s.agent.logger.Info("Updated agent's ACL token", "token", target) +// return nil, nil +// } +// +// // AgentConnectCARoots returns the trusted CA roots. +// func (s *HTTPHandlers) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// var args structs.DCSpecificRequest +// if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { +// return nil, nil +// } +// +// raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCARootName, &args) +// if err != nil { +// return nil, err +// } +// defer setCacheMeta(resp, &m) +// +// // Add cache hit +// +// reply, ok := raw.(*structs.IndexedCARoots) +// if !ok { +// // This should never happen, but we want to protect against panics +// return nil, fmt.Errorf("internal error: response type not correct") +// } +// defer setMeta(resp, &reply.QueryMeta) +// +// return *reply, nil +// } +// +// // AgentConnectCALeafCert returns the certificate bundle for a service +// // instance. This endpoint ignores all "Cache-Control" attributes. +// // This supports blocking queries to update the returned bundle. +// // Non-blocking queries will always verify that the cache entry is still valid. +// func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Get the service name. Note that this is the name of the service, +// // not the ID of the service instance. +// serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/ca/leaf/") +// +// // TODO(peering): expose way to get kind=mesh-gateway type cert with appropriate ACLs +// +// args := cachetype.ConnectCALeafRequest{ +// Service: serviceName, // Need name not ID +// } +// var qOpts structs.QueryOptions +// +// if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// // Store DC in the ConnectCALeafRequest but query opts separately +// if done := s.parse(resp, req, &args.Datacenter, &qOpts); done { +// return nil, nil +// } +// args.MinQueryIndex = qOpts.MinQueryIndex +// args.MaxQueryTime = qOpts.MaxQueryTime +// args.Token = qOpts.Token +// +// // TODO(ffmmmm): maybe set MustRevalidate in ConnectCALeafRequest (as part of CacheInfo()) +// // We don't want non-blocking queries to return expired leaf certs +// // or leaf certs not valid under the current CA. So always revalidate +// // the leaf cert on non-blocking queries (ie when MinQueryIndex == 0) +// if args.MinQueryIndex == 0 { +// args.MustRevalidate = true +// } +// +// if !s.validateRequestPartition(resp, &args.EnterpriseMeta) { +// return nil, nil +// } +// +// raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args) +// if err != nil { +// return nil, err +// } +// defer setCacheMeta(resp, &m) +// +// reply, ok := raw.(*structs.IssuedCert) +// if !ok { +// // This should never happen, but we want to protect against panics +// return nil, fmt.Errorf("internal error: response type not correct") +// } +// setIndex(resp, reply.ModifyIndex) +// +// return reply, nil +// } +// +// // AgentConnectAuthorize +// // +// // POST /v1/agent/connect/authorize +// // +// // NOTE: This endpoint treats any L7 intentions as DENY. +// // +// // Note: when this logic changes, consider if the Intention.Check RPC method +// // also needs to be updated. +// func (s *HTTPHandlers) AgentConnectAuthorize(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Fetch the token +// var token string +// s.parseToken(req, &token) +// +// var authReq structs.ConnectAuthorizeRequest +// +// if err := s.parseEntMetaNoWildcard(req, &authReq.EnterpriseMeta); err != nil { +// return nil, err +// } +// +// if err := decodeBody(req.Body, &authReq); err != nil { +// return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)} +// } +// +// if !s.validateRequestPartition(resp, &authReq.EnterpriseMeta) { +// return nil, nil +// } +// +// authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq) +// if err != nil { +// return nil, err +// } +// setCacheMeta(resp, cacheMeta) +// +// return &connectAuthorizeResp{ +// Authorized: authz, +// Reason: reason, +// }, nil +// } +// +// // connectAuthorizeResp is the response format/structure for the +// // /v1/agent/connect/authorize endpoint. +// type connectAuthorizeResp struct { +// Authorized bool // True if authorized, false if not +// Reason string // Reason for the Authorized value (whether true or false) +// } +// +// // AgentHost +// // +// // GET /v1/agent/host +// // +// // Retrieves information about resources available and in-use for the +// // host the agent is running on such as CPU, memory, and disk usage. Requires +// // a operator:read ACL token. +// func (s *HTTPHandlers) AgentHost(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// // Fetch the ACL token, if any, and enforce agent policy. +// var token string +// s.parseToken(req, &token) +// authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) +// if err != nil { +// return nil, err +// } +// +// // TODO(partitions): should this be possible in a partition? +// if err := authz.ToAllowAuthorizer().OperatorReadAllowed(nil); err != nil { +// return nil, err +// } +// +// return debug.CollectHostInfo(), nil +// }
M agent/config.goagent/config.go

@@ -28,9 +28,18 @@ return &cfg

} type Config struct { + //// user-configurable DNSBindAddr *net.TCPAddr HTTPBindAddr *net.TCPAddr SerfBindAddr *net.TCPAddr NodeName string StartJoin []string + + //// non user-configurable + // HTTPResponseHeaders are used to add HTTP header response fields to the HTTP API responses. + HTTPResponseHeaders map[string]string +} + +func (c *Config) VersionWithMetadata() string { + return "todo" }
A agent/http.go

@@ -0,0 +1,256 @@

+package agent + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" + + "git.j3s.sh/cascade/lib" + "git.j3s.sh/cascade/lib/cleanhttp" +) + +// MethodNotAllowedError should be returned by a handler when the HTTP method is not allowed. +type MethodNotAllowedError struct { + Method string + Allow []string +} + +func (e MethodNotAllowedError) Error() string { + return fmt.Sprintf("method %s not allowed", e.Method) +} + +// BadRequestError should be returned by a handler when parameters or the payload are not valid +type BadRequestError struct { + Reason string +} + +func (e BadRequestError) Error() string { + return fmt.Sprintf("Bad request: %s", e.Reason) +} + +// NotFoundError should be returned by a handler when a resource specified does not exist +type NotFoundError struct { + Reason string +} + +func (e NotFoundError) Error() string { + return e.Reason +} + +// CodeWithPayloadError allow returning non HTTP 200 +// Error codes while not returning PlainText payload +type CodeWithPayloadError struct { + Reason string + StatusCode int + ContentType string +} + +func (e CodeWithPayloadError) Error() string { + return e.Reason +} + +type ForbiddenError struct { +} + +func (e ForbiddenError) Error() string { + return "Access is restricted" +} + +// HTTPHandlers provides an HTTP api for an agent. +// agent is copied into this struct because we need +// to call some of its functions & access some of its data +type HTTPHandlers struct { + h http.Handler + agent *Agent +} + +// handler is used to initialize the Handler. +// In agent code we only ever call this once. +func (s *HTTPHandlers) handler() http.Handler { + mux := http.NewServeMux() + + // handleFuncMetrics takes the given pattern and handler and wraps to produce + // metrics based on the pattern and request. + handleFuncMetrics := func(pattern string, handler http.HandlerFunc) { + // Get the parts of the pattern. We omit any initial empty for the + // leading slash, and put an underscore as a "thing" placeholder if we + // see a trailing slash, which means the part after is parsed. This lets + // us distinguish from things like /v1/query and /v1/query/<query id>. + var parts []string + for i, part := range strings.Split(pattern, "/") { + if part == "" { + if i == 0 { + continue + } + part = "_" + } + parts = append(parts, part) + } + + // Tranform the pattern to a valid label by replacing the '/' by '_'. + // Omit the leading slash. + // Distinguish thing like /v1/query from /v1/query/<query_id> by having + // an extra underscore. + path_label := strings.Replace(pattern[1:], "/", "_", -1) + + // Register the wrapper. + wrapper := func(resp http.ResponseWriter, req *http.Request) { + start := time.Now() + handler(resp, req) + + s.agent.logger.Warn("request metrics", "method", req.Method, "path", path_label, "latency", time.Since(start)) + } + + mux.Handle(pattern, http.HandlerFunc(wrapper)) + } + + mux.HandleFunc("/", s.Index) + endpoints := map[string]func(resp http.ResponseWriter, req *http.Request){ + "/v1/agent/members": s.agentMembers, + } + + for pattern, fn := range endpoints { + handleFuncMetrics(pattern, fn) + } + + // This handler bans URLs with non-printable characters + h := cleanhttp.PrintablePathCheckHandler(mux, nil) + return h +} + +// wrap is used to wrap functions to make them more convenient +// func (s *HTTPHandlers) wrap(handler http.HandlerFunc) http.HandlerFunc { +// httpLogger := s.agent.logger +// return func(resp http.ResponseWriter, req *http.Request) { +// setHeaders(resp, s.agent.Config.HTTPResponseHeaders) +// +// addAllowHeader := func(methods []string) { +// resp.Header().Add("Allow", strings.Join(methods, ",")) +// } +// +// logURL := req.URL.String() +// +// handleErr := func(err error) { +// httpLogger.Error("Request error", err, +// "method", req.Method, +// "url", logURL, +// "from", req.RemoteAddr, +// "error", err, +// ) +// switch { +// default: +// resp.WriteHeader(http.StatusInternalServerError) +// fmt.Fprint(resp, err.Error()) +// } +// } +// +// start := time.Now() +// defer func() { +// httpLogger.Debug("Request finished", +// "method", req.Method, +// "url", logURL, +// "from", req.RemoteAddr, +// "latency", time.Since(start).String(), +// ) +// }() +// +// contentType := "application/json" +// httpCode := http.StatusOK +// var buf []byte +// if contentType == "application/json" { +// buf, err = s.marshalJSON(req, handler(resp, req)) +// if err != nil { +// handleErr(err) +// return +// } +// } else { +// if strings.HasPrefix(contentType, "text/") { +// if val, ok := obj.(string); ok { +// buf = []byte(val) +// } +// } +// } +// resp.Header().Set("Content-Type", contentType) +// resp.WriteHeader(httpCode) +// resp.Write(buf) +// } +// } + +// marshalJSON marshals the object into JSON +func (s *HTTPHandlers) marshalJSON(req *http.Request, obj interface{}) ([]byte, error) { + buf, err := json.Marshal(obj) + if err != nil { + return nil, err + } + return buf, nil +} + +// Renders a simple index page +func (s *HTTPHandlers) Index(resp http.ResponseWriter, req *http.Request) { + // Send special headers too since this endpoint isn't wrapped with something + // that sends them. + // setHeaders(resp, s.agent.Config.HTTPResponseHeaders) + + // Check if this is a non-index path + if req.URL.Path != "/" { + resp.WriteHeader(http.StatusNotFound) + return + } + + // Give them something helpful if there's no UI so they at least know + // what this server is. + fmt.Fprint(resp, "cascade agent\n") + return +} + +func decodeBody(body io.Reader, out interface{}) error { + return lib.DecodeJSON(body, out) +} + +// setHeaders is used to set canonical response header fields +func setHeaders(resp http.ResponseWriter, headers map[string]string) { + for field, value := range headers { + resp.Header().Set(http.CanonicalHeaderKey(field), value) + } +} + +// serveHandlerWithHeaders is used to serve a http.Handler with the specified headers +func serveHandlerWithHeaders(h http.Handler, headers map[string]string) http.HandlerFunc { + return func(resp http.ResponseWriter, req *http.Request) { + setHeaders(resp, headers) + h.ServeHTTP(resp, req) + } +} + +func sourceAddrFromRequest(req *http.Request) string { + xff := req.Header.Get("X-Forwarded-For") + forwardHosts := strings.Split(xff, ",") + if len(forwardHosts) > 0 { + forwardIp := net.ParseIP(strings.TrimSpace(forwardHosts[0])) + if forwardIp != nil { + return forwardIp.String() + } + } + + host, _, err := net.SplitHostPort(req.RemoteAddr) + if err != nil { + return "" + } + + ip := net.ParseIP(host) + if ip != nil { + return ip.String() + } else { + return "" + } +} + +func (s *HTTPHandlers) parseFilter(req *http.Request, filter *string) { + if other := req.URL.Query().Get("filter"); other != "" { + *filter = other + } +}
A agent/http_register.go

@@ -0,0 +1,29 @@

+package agent + +func init() { + // registerEndpoint("/v1/agent/members", []string{"GET"}, (*HTTPHandlers).AgentMembers) + // TODO soon + // registerEndpoint("/v1/agent/join/", []string{"PUT"}, (*HTTPHandlers).AgentJoin) + // registerEndpoint("/v1/agent/leave", []string{"PUT"}, (*HTTPHandlers).AgentLeave) + // registerEndpoint("/v1/agent/force-leave/", []string{"PUT"}, (*HTTPHandlers).AgentForceLeave) + // registerEndpoint("/v1/agent/service/register", []string{"PUT"}, (*HTTPHandlers).AgentRegisterService) + // registerEndpoint("/v1/agent/service/deregister/", []string{"PUT"}, (*HTTPHandlers).AgentDeregisterService) + // registerEndpoint("/v1/catalog/nodes", []string{"GET"}, (*HTTPHandlers).CatalogNodes) + // registerEndpoint("/v1/catalog/services", []string{"GET"}, (*HTTPHandlers).CatalogServices) + // registerEndpoint("/v1/catalog/service/", []string{"GET"}, (*HTTPHandlers).CatalogServiceNodes) + // registerEndpoint("/v1/catalog/node/", []string{"GET"}, (*HTTPHandlers).CatalogNodeServices) + // registerEndpoint("/v1/catalog/node-services/", []string{"GET"}, (*HTTPHandlers).CatalogNodeServiceList) + // TODO later + // registerEndpoint("/v1/agent/health/service/id/", []string{"GET"}, (*HTTPHandlers).AgentHealthServiceByID) + // registerEndpoint("/v1/agent/health/service/name/", []string{"GET"}, (*HTTPHandlers).AgentHealthServiceByName) + // registerEndpoint("/v1/agent/check/register", []string{"PUT"}, (*HTTPHandlers).AgentRegisterCheck) + // registerEndpoint("/v1/agent/check/deregister/", []string{"PUT"}, (*HTTPHandlers).AgentDeregisterCheck) + // registerEndpoint("/v1/agent/check/pass/", []string{"PUT"}, (*HTTPHandlers).AgentCheckPass) + // registerEndpoint("/v1/agent/check/warn/", []string{"PUT"}, (*HTTPHandlers).AgentCheckWarn) + // registerEndpoint("/v1/agent/check/fail/", []string{"PUT"}, (*HTTPHandlers).AgentCheckFail) + // registerEndpoint("/v1/agent/check/update/", []string{"PUT"}, (*HTTPHandlers).AgentCheckUpdate) + // registerEndpoint("/v1/agent/service/maintenance/", []string{"PUT"}, (*HTTPHandlers).AgentServiceMaintenance) + // registerEndpoint("/v1/catalog/register", []string{"PUT"}, (*HTTPHandlers).CatalogRegister) + // registerEndpoint("/v1/config/", []string{"GET", "DELETE"}, (*HTTPHandlers).Config) + // registerEndpoint("/v1/config", []string{"PUT"}, (*HTTPHandlers).ConfigApply) +}
M api/agent.goapi/agent.go

@@ -25,7 +25,7 @@ }

// XXX: this may be needed for consul compatability, but I'm leaving it // out for now in case it isn't. -// const AllSegments = "_all" +const AllSegments = "_all" // Agent can be used to query the Agent endpoints type Agent struct {
M api/api.goapi/api.go

@@ -371,42 +371,12 @@ }

defer closeResponseBody(resp) qm := &QueryMeta{} - err = parseQueryMeta(resp, qm) - if err != nil { - return nil, err - } qm.RequestTime = rtt if err := decodeBody(resp, out); err != nil { return nil, err } return qm, nil -} - -// parseQueryMeta is used to help parse query meta-data -func parseQueryMeta(resp *http.Response, q *QueryMeta) error { - header := resp.Header - - // Parse the X-Consul-Index (if it's set - hash based blocking queries don't - // set this) - // if indexStr := header.Get("X-Consul-Index"); indexStr != "" { - // index, err := strconv.ParseUint(indexStr, 10, 64) - // if err != nil { - // return fmt.Errorf("Failed to parse X-Consul-Index: %v", err) - // } - // q.LastIndex = index - // } - // q.LastContentHash = header.Get("X-Consul-ContentHash") - - // Parse X-Consul-Translate-Addresses - switch header.Get("X-Consul-Translate-Addresses") { - case "true": - q.AddressTranslationEnabled = true - default: - q.AddressTranslationEnabled = false - } - - return nil } // decodeBody is used to JSON decode a body
A lib/cleanhttp/handlers.go

@@ -0,0 +1,48 @@

+package cleanhttp + +import ( + "net/http" + "strings" + "unicode" +) + +// HandlerInput provides input options to cleanhttp's handlers +type HandlerInput struct { + ErrStatus int +} + +// PrintablePathCheckHandler is a middleware that ensures the request path +// contains only printable runes. +func PrintablePathCheckHandler(next http.Handler, input *HandlerInput) http.Handler { + // Nil-check on input to make it optional + if input == nil { + input = &HandlerInput{ + ErrStatus: http.StatusBadRequest, + } + } + + // Default to http.StatusBadRequest on error + if input.ErrStatus == 0 { + input.ErrStatus = http.StatusBadRequest + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r != nil { + // Check URL path for non-printable characters + idx := strings.IndexFunc(r.URL.Path, func(c rune) bool { + return !unicode.IsPrint(c) + }) + + if idx != -1 { + w.WriteHeader(input.ErrStatus) + return + } + + if next != nil { + next.ServeHTTP(w, r) + } + } + + return + }) +}
A lib/json.go

@@ -0,0 +1,27 @@

+package lib + +import ( + "bytes" + "encoding/json" + "io" +) + +// DecodeJSON is a convenience function to create a JSON decoder +// set it up to disallow unknown fields and then decode into the +// given value +func DecodeJSON(data io.Reader, out interface{}) error { + if data == nil { + return io.EOF + } + + decoder := json.NewDecoder(data) + decoder.DisallowUnknownFields() + return decoder.Decode(&out) +} + +// UnmarshalJSON is a convenience function around calling +// DecodeJSON. It will mainly be useful in many of our +// UnmarshalJSON methods for structs. +func UnmarshalJSON(data []byte, out interface{}) error { + return DecodeJSON(bytes.NewReader(data), out) +}
A lib/rtt.go

@@ -0,0 +1,61 @@

+package lib + +import ( + "math" + "time" + + "github.com/hashicorp/serf/coordinate" +) + +// ComputeDistance returns the distance between the two network coordinates in +// seconds. If either of the coordinates is nil then this will return positive +// infinity. +func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 { + if a == nil || b == nil { + return math.Inf(1.0) + } + + return a.DistanceTo(b).Seconds() +} + +// CoordinateSet holds all the coordinates for a given node, indexed by network +// segment name. +type CoordinateSet map[string]*coordinate.Coordinate + +// Intersect tries to return a pair of coordinates which are compatible with the +// current set and a given set. We employ some special knowledge about network +// segments to avoid doing a full intersection, since this is in several hot +// paths. This might return nil for either coordinate in the output pair if an +// intersection cannot be found. The ComputeDistance function above is designed +// to deal with that. +func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate, *coordinate.Coordinate) { + // Use the empty segment by default. + segment := "" + + // If we have a single segment, then let our segment take priority since + // we are possibly a client. Any node with more than one segment can only + // be a server, which means it should be in all segments. + if len(cs) == 1 { + for s := range cs { + segment = s + } + } + + // Likewise for the other set. + if len(other) == 1 { + for s := range other { + segment = s + } + } + + return cs[segment], other[segment] +} + +// GenerateCoordinate creates a new coordinate with the given distance from the +// origin. This should only be used for tests. +func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate { + coord := coordinate.NewCoordinate(coordinate.DefaultConfig()) + coord.Vec[0] = rtt.Seconds() + coord.Height = 0 + return coord +}