small pixel drawing of a pufferfish cascade

import preliminary API structs & methods
Jes Olson j3s@c3f.net
Mon, 20 Feb 2023 19:00:50 -0800
commit

8b9dc8eaa4ff0455d6e5fdff42495cb40bebe293

parent

2d61578f336b9f69e87747b987752dc22461b427

3 files changed, 649 insertions(+), 0 deletions(-)

jump to
A api/agent.go

@@ -0,0 +1,106 @@

+package api + +// AgentMember represents a cluster member known to the agent +type AgentMember struct { + Name string + Addr string + Port uint16 + Tags map[string]string + // Status of the Member which corresponds to github.com/hashicorp/serf/serf.MemberStatus + // Value is one of: + // + // AgentMemberNone = 0 + // AgentMemberAlive = 1 + // AgentMemberLeaving = 2 + // AgentMemberLeft = 3 + // AgentMemberFailed = 4 + Status int + ProtocolMin uint8 + ProtocolMax uint8 + ProtocolCur uint8 + DelegateMin uint8 + DelegateMax uint8 + DelegateCur uint8 +} + +// XXX: this may be needed for consul compatability, but I'm leaving it +// out for now in case it isn't. +// const AllSegments = "_all" + +// Agent can be used to query the Agent endpoints +type Agent struct { + c *Client + + // cache the node name + nodeName string +} + +// Agent returns a handle to the agent endpoints +func (c *Client) Agent() *Agent { + return &Agent{c: c} +} + +// Self is used to query the agent we are speaking to for +// information about itself +func (a *Agent) Self() (map[string]map[string]interface{}, error) { + r := a.c.newRequest("GET", "/v1/agent/self") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + + var out map[string]map[string]interface{} + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +} + +// Host is used to retrieve information about the host the +// agent is running on such as CPU, memory, and disk. Requires +// a operator:read ACL token. +func (a *Agent) Host() (map[string]interface{}, error) { + r := a.c.newRequest("GET", "/v1/agent/host") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + + var out map[string]interface{} + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +} + +// NodeName is used to get the node name of the agent +func (a *Agent) NodeName() (string, error) { + if a.nodeName != "" { + return a.nodeName, nil + } + info, err := a.Self() + if err != nil { + return "", err + } + name := info["Config"]["NodeName"].(string) + a.nodeName = name + return name, nil +} + +// Members returns the known serf gossip members +func (a *Agent) Members() ([]*AgentMember, error) { + r := a.c.newRequest("GET", "/v1/agent/members") + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + + var out []*AgentMember + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +}
M api/api.goapi/api.go

@@ -0,0 +1,483 @@

+package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "git.j3s.sh/cascade/lib/cleanhttp" +) + +// Config is used to configure the creation of a client +type Config struct { + // Address is the address of the Consul server + Address string + + // Scheme is the URI scheme for the Consul server + Scheme string + + // Transport is the Transport to use for the http client. + Transport *http.Transport + + // HttpClient is the client to use. Default will be + // used if not provided. + HttpClient *http.Client + + // WaitTime limits how long a Watch will block. If not provided, + // the agent default values will be used. + WaitTime time.Duration +} + +// DefaultConfig returns a default configuration for the client. By default this +// will pool and reuse idle connections to cascade. If you have a long-lived +// client object, this is the desired behavior and should make the most efficient +// use of the connections to cascade. If you don't reuse a client object, which +// is not recommended, then you may notice idle connections building up over +// time. To avoid this, use the DefaultNonPooledConfig() instead. +func DefaultConfig() *Config { + return defaultConfig(cleanhttp.DefaultPooledTransport) +} + +// DefaultNonPooledConfig returns a default configuration for the client which +// does not pool connections. This isn't a recommended configuration because it +// will reconnect to cascade on every request, but this is useful to avoid the +// accumulation of idle connections if you make many client objects during the +// lifetime of your application. +func DefaultNonPooledConfig() *Config { + return defaultConfig(cleanhttp.DefaultTransport) +} + +// defaultConfig returns the default configuration for the client, using the +// given function to make the transport. +func defaultConfig(transportFn func() *http.Transport) *Config { + config := &Config{ + Address: "127.0.0.1:8500", + Scheme: "http", + Transport: transportFn(), + } + + return config +} + +// Client provides a client to the Consul API +type Client struct { + modifyLock sync.RWMutex + headers http.Header + + config Config +} + +// Headers gets the current set of headers used for requests. This returns a +// copy; to modify it call AddHeader or SetHeaders. +func (c *Client) Headers() http.Header { + c.modifyLock.RLock() + defer c.modifyLock.RUnlock() + + if c.headers == nil { + return nil + } + + ret := make(http.Header) + for k, v := range c.headers { + for _, val := range v { + ret[k] = append(ret[k], val) + } + } + + return ret +} + +// AddHeader allows a single header key/value pair to be added +// in a race-safe fashion. +func (c *Client) AddHeader(key, value string) { + c.modifyLock.Lock() + defer c.modifyLock.Unlock() + c.headers.Add(key, value) +} + +// SetHeaders clears all previous headers and uses only the given +// ones going forward. +func (c *Client) SetHeaders(headers http.Header) { + c.modifyLock.Lock() + defer c.modifyLock.Unlock() + c.headers = headers +} + +// NewClient returns a new client +func NewClient(config *Config) (*Client, error) { + // bootstrap the config + defConfig := DefaultConfig() + + if config.Address == "" { + config.Address = defConfig.Address + } + + if config.Scheme == "" { + config.Scheme = defConfig.Scheme + } + + if config.Transport == nil { + config.Transport = defConfig.Transport + } + + if config.HttpClient == nil { + config.HttpClient = NewHttpClient(config.Transport) + } + + parts := strings.SplitN(config.Address, "://", 2) + if len(parts) == 2 { + switch parts[0] { + case "http": + config.Scheme = "http" + // TODO: unix socket support? do i care? no??? + default: + return nil, fmt.Errorf("Unknown protocol scheme: %s", parts[0]) + } + config.Address = parts[1] + } + + return &Client{config: *config, headers: make(http.Header)}, nil +} + +// NewHttpClient returns an http client configured with the given Transport +func NewHttpClient(transport *http.Transport) *http.Client { + client := &http.Client{ + Transport: transport, + } + return client +} + +// request is used to help build up a request +type request struct { + config *Config + method string + url *url.URL + params url.Values + body io.Reader + header http.Header + obj interface{} + ctx context.Context +} + +// QueryMeta is used to return meta data about a query +type QueryMeta struct { + // LastIndex. This can be used as a WaitIndex to perform + // a blocking query + LastIndex uint64 + + // LastContentHash. This can be used as a WaitHash to perform a blocking query + // for endpoints that support hash-based blocking. Endpoints that do not + // support it will return an empty hash. + LastContentHash string + + // How long did the request take + RequestTime time.Duration + + // Is address translation enabled for HTTP responses on this agent + AddressTranslationEnabled bool +} + +// QueryOptions are used to parameterize a query +type QueryOptions struct { + // WaitIndex is used to enable a blocking query. Waits + // until the timeout or the next index is reached + WaitIndex uint64 + + // WaitHash is used by some endpoints instead of WaitIndex to perform blocking + // on state based on a hash of the response rather than a monotonic index. + // This is required when the state being blocked on is not stored in Raft, for + // example agent-local proxy configuration. + WaitHash string + + // WaitTime is used to bound the duration of a wait. + // Defaults to that of the Config, but can be overridden. + WaitTime time.Duration + + // Near is used to provide a node name that will sort the results + // in ascending order based on the estimated round trip time from + // that node. Setting this to "_agent" will use the agent's node + // for the sort. + Near string + + // NodeMeta is used to filter results by nodes with the given + // metadata key/value pairs. Currently, only one key/value pair can + // be provided for filtering. + NodeMeta map[string]string + + // ctx is an optional context pass through to the underlying HTTP + // request layer. Use Context() and WithContext() to manage this. + ctx context.Context + + // Filter requests filtering data prior to it being returned. The string + // is a go-bexpr compatible expression. + Filter string +} + +// setQueryOptions is used to annotate the request with +// additional query options +func (r *request) setQueryOptions(q *QueryOptions) { + if q == nil { + return + } + if q.WaitIndex != 0 { + r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10)) + } + if q.WaitTime != 0 { + r.params.Set("wait", durToMsec(q.WaitTime)) + } + if q.WaitHash != "" { + r.params.Set("hash", q.WaitHash) + } + if q.Near != "" { + r.params.Set("near", q.Near) + } + if q.Filter != "" { + r.params.Set("filter", q.Filter) + } + if len(q.NodeMeta) > 0 { + for key, value := range q.NodeMeta { + r.params.Add("node-meta", key+":"+value) + } + } + + r.ctx = q.ctx +} + +// durToMsec converts a duration to a millisecond specified string. If the +// user selected a positive value that rounds to 0 ms, then we will use 1 ms +// so they get a short delay, otherwise Consul will translate the 0 ms into +// a huge default delay. +func durToMsec(dur time.Duration) string { + ms := dur / time.Millisecond + if dur > 0 && ms == 0 { + ms = 1 + } + return fmt.Sprintf("%dms", ms) +} + +// serverError is a string we look for to detect 500 errors. +const serverError = "Unexpected response code: 500" + +// IsRetryableError returns true for 500 errors from the Consul servers, and +// network connection errors. These are usually retryable at a later time. +// This applies to reads but NOT to writes. This may return true for errors +// on writes that may have still gone through, so do not use this to retry +// any write operations. +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + if _, ok := err.(net.Error); ok { + return true + } + + // TODO (slackpad) - Make a real error type here instead of using + // a string check. + return strings.Contains(err.Error(), serverError) +} + +// toHTTP converts the request to an HTTP request +func (r *request) toHTTP() (*http.Request, error) { + // Encode the query parameters + r.url.RawQuery = r.params.Encode() + + // Check if we should encode the body + if r.body == nil && r.obj != nil { + b, err := encodeBody(r.obj) + if err != nil { + return nil, err + } + r.body = b + } + + // Create the HTTP request + req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body) + if err != nil { + return nil, err + } + + req.URL.Host = r.url.Host + req.URL.Scheme = r.url.Scheme + req.Host = r.url.Host + req.Header = r.header + + // Content-Type must always be set when a body is present + // See https://github.com/hashicorp/consul/issues/10011 + if req.Body != nil && req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/json") + } + + if r.ctx != nil { + return req.WithContext(r.ctx), nil + } + + return req, nil +} + +// newRequest is used to create a new request +func (c *Client) newRequest(method, path string) *request { + r := &request{ + config: &c.config, + method: method, + url: &url.URL{ + Scheme: c.config.Scheme, + Host: c.config.Address, + Path: path, + }, + params: make(map[string][]string), + header: c.Headers(), + } + + if c.config.WaitTime != 0 { + r.params.Set("wait", durToMsec(r.config.WaitTime)) + } + + return r +} + +// doRequest runs a request with our client +func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) { + req, err := r.toHTTP() + if err != nil { + return 0, nil, err + } + start := time.Now() + resp, err := c.config.HttpClient.Do(req) + diff := time.Since(start) + return diff, resp, err +} + +// Query is used to do a GET request against an endpoint +// and deserialize the response into an interface using +// standard cascade conventions. +func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { + r := c.newRequest("GET", endpoint) + r.setQueryOptions(q) + rtt, resp, err := c.doRequest(r) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + + qm := &QueryMeta{} + err = parseQueryMeta(resp, qm) + if err != nil { + return nil, err + } + qm.RequestTime = rtt + + if err := decodeBody(resp, out); err != nil { + return nil, err + } + return qm, nil +} + +// parseQueryMeta is used to help parse query meta-data +func parseQueryMeta(resp *http.Response, q *QueryMeta) error { + header := resp.Header + + // Parse the X-Consul-Index (if it's set - hash based blocking queries don't + // set this) + // if indexStr := header.Get("X-Consul-Index"); indexStr != "" { + // index, err := strconv.ParseUint(indexStr, 10, 64) + // if err != nil { + // return fmt.Errorf("Failed to parse X-Consul-Index: %v", err) + // } + // q.LastIndex = index + // } + // q.LastContentHash = header.Get("X-Consul-ContentHash") + + // Parse X-Consul-Translate-Addresses + switch header.Get("X-Consul-Translate-Addresses") { + case "true": + q.AddressTranslationEnabled = true + default: + q.AddressTranslationEnabled = false + } + + return nil +} + +// decodeBody is used to JSON decode a body +func decodeBody(resp *http.Response, out interface{}) error { + dec := json.NewDecoder(resp.Body) + return dec.Decode(out) +} + +// encodeBody is used to encode a request body +func encodeBody(obj interface{}) (io.Reader, error) { + buf := bytes.NewBuffer(nil) + enc := json.NewEncoder(buf) + if err := enc.Encode(obj); err != nil { + return nil, err + } + return buf, nil +} + +// requireOK is used to wrap doRequest and check for a 200 +func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { + if e != nil { + if resp != nil { + closeResponseBody(resp) + } + return d, nil, e + } + if resp.StatusCode != 200 { + return d, nil, generateUnexpectedResponseCodeError(resp) + } + return d, resp, nil +} + +// closeResponseBody reads resp.Body until EOF, and then closes it. The read +// is necessary to ensure that the http.Client's underlying RoundTripper is able +// to re-use the TCP connection. See godoc on net/http.Client.Do. +func closeResponseBody(resp *http.Response) error { + _, _ = io.Copy(ioutil.Discard, resp.Body) + return resp.Body.Close() +} + +func (req *request) filterQuery(filter string) { + if filter == "" { + return + } + + req.params.Set("filter", filter) +} + +// generateUnexpectedResponseCodeError consumes the rest of the body, closes +// the body stream and generates an error indicating the status code was +// unexpected. +func generateUnexpectedResponseCodeError(resp *http.Response) error { + var buf bytes.Buffer + io.Copy(&buf, resp.Body) + closeResponseBody(resp) + return fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes()) +} + +func requireNotFoundOrOK(d time.Duration, resp *http.Response, e error) (bool, time.Duration, *http.Response, error) { + if e != nil { + if resp != nil { + closeResponseBody(resp) + } + return false, d, nil, e + } + switch resp.StatusCode { + case 200: + return true, d, resp, nil + case 404: + return false, d, resp, nil + default: + return false, d, nil, generateUnexpectedResponseCodeError(resp) + } +}
A lib/cleanhttp/cleanhttp.go

@@ -0,0 +1,60 @@

+// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package cleanhttp + +import ( + "net" + "net/http" + "runtime" + "time" +) + +// DefaultTransport returns a new http.Transport with similar default values to +// http.DefaultTransport, but with idle connections and keepalives disabled. +func DefaultTransport() *http.Transport { + transport := DefaultPooledTransport() + transport.DisableKeepAlives = true + transport.MaxIdleConnsPerHost = -1 + return transport +} + +// DefaultPooledTransport returns a new http.Transport with similar default +// values to http.DefaultTransport. Do not use this for transient transports as +// it can leak file descriptors over time. Only use this for transports that +// will be re-used for the same host(s). +func DefaultPooledTransport() *http.Transport { + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + ForceAttemptHTTP2: true, + MaxIdleConnsPerHost: runtime.GOMAXPROCS(0) + 1, + } + return transport +} + +// DefaultClient returns a new http.Client with similar default values to +// http.Client, but with a non-shared Transport, idle connections disabled, and +// keepalives disabled. +func DefaultClient() *http.Client { + return &http.Client{ + Transport: DefaultTransport(), + } +} + +// DefaultPooledClient returns a new http.Client with similar default values to +// http.Client, but with a shared Transport. Do not use this function for +// transient clients as it can leak file descriptors over time. Only use this +// for clients that will be re-used for the same host(s). +func DefaultPooledClient() *http.Client { + return &http.Client{ + Transport: DefaultPooledTransport(), + } +}