package httpd

import "github.com/influxdata/influxdb/services/httpd"

Package httpd implements the HTTP service and REST API for InfluxDB.

Index

Constants

const (
	// DefaultBindAddress is the default address to bind to.
	DefaultBindAddress = ":8086"

	// DefaultRealm is the default realm sent back when issuing a basic auth challenge.
	DefaultRealm = "InfluxDB"

	// DefaultBindSocket is the default unix socket to bind to.
	DefaultBindSocket = "/var/run/influxdb.sock"

	// DefaultMaxBodySize is the default maximum size of a client request body, in bytes. Specify 0 for no limit.
	DefaultMaxBodySize = 25e6

	// DefaultEnqueuedWriteTimeout is the maximum time a write request can wait to be processed.
	DefaultEnqueuedWriteTimeout = 30 * time.Second
)
const (
	// DefaultChunkSize specifies the maximum number of points that will
	// be read before sending results back to the engine.
	//
	// This has no relation to the number of bytes that are returned.
	DefaultChunkSize = 10000

	DefaultDebugRequestsInterval = 10 * time.Second

	MaxDebugRequestsInterval = 6 * time.Hour
)

Variables

var ErrDiagnosticsValueMissing = errors.New("expected diagnostic value missing")

Functions

func LimitListener

func LimitListener(l net.Listener, n int) net.Listener

LimitListener returns a Listener that accepts at most n simultaneous connections from the provided Listener and will drop extra connections.

Types

type AuthenticationMethod

type AuthenticationMethod int

AuthenticationMethod defines the type of authentication used.

const (
	// Authenticate using basic authentication.
	UserAuthentication AuthenticationMethod = iota

	// Authenticate with jwt.
	BearerAuthentication
)

Supported authentication methods.

type Bucket

type Bucket struct {
	BucketsBody
	ID                  string    `json:"id,omitempty"`
	Type                string    `json:"type"`
	RetentionPolicyName string    `json:"rp,omitempty"` // This to support v1 sources
	CreatedAt           time.Time `json:"createdAt"`
	UpdatedAt           time.Time `json:"updatedAt"`
}

type BucketUpdate

type BucketUpdate struct {
	Description    string          `json:"description"`
	Name           string          `json:"name"`
	RetentionRules []RetentionRule `json:"retentionRules"`
}

type Buckets

type Buckets struct {
	Buckets []Bucket `json:"buckets"`
}

type BucketsBody

type BucketsBody struct {
	BucketUpdate
	OrgID      string `json:"orgId"`
	Rp         string `json:"rp"`
	SchemaType string `json:"schemaType"`
}

BucketsBody and RetentionRule should match the 2.0 API definition.

type Config

type Config struct {
	Enabled                 bool              `toml:"enabled"`
	BindAddress             string            `toml:"bind-address"`
	AuthEnabled             bool              `toml:"auth-enabled"`
	LogEnabled              bool              `toml:"log-enabled"`
	SuppressWriteLog        bool              `toml:"suppress-write-log"`
	WriteTracing            bool              `toml:"write-tracing"`
	FluxEnabled             bool              `toml:"flux-enabled"`
	FluxLogEnabled          bool              `toml:"flux-log-enabled"`
	FluxTesting             bool              `toml:"flux-testing"`
	PprofEnabled            bool              `toml:"pprof-enabled"`
	PprofAuthEnabled        bool              `toml:"pprof-auth-enabled"`
	DebugPprofEnabled       bool              `toml:"debug-pprof-enabled"`
	PingAuthEnabled         bool              `toml:"ping-auth-enabled"`
	PromReadAuthEnabled     bool              `toml:"prom-read-auth-enabled"`
	HTTPHeaders             map[string]string `toml:"headers"`
	HTTPSEnabled            bool              `toml:"https-enabled"`
	HTTPSCertificate        string            `toml:"https-certificate"`
	HTTPSPrivateKey         string            `toml:"https-private-key"`
	MaxRowLimit             int               `toml:"max-row-limit"`
	MaxConnectionLimit      int               `toml:"max-connection-limit"`
	SharedSecret            string            `toml:"shared-secret"`
	Realm                   string            `toml:"realm"`
	UnixSocketEnabled       bool              `toml:"unix-socket-enabled"`
	UnixSocketGroup         *toml.Group       `toml:"unix-socket-group"`
	UnixSocketPermissions   toml.FileMode     `toml:"unix-socket-permissions"`
	BindSocket              string            `toml:"bind-socket"`
	MaxBodySize             int               `toml:"max-body-size"`
	AccessLogPath           string            `toml:"access-log-path"`
	AccessLogStatusFilters  []StatusFilter    `toml:"access-log-status-filters"`
	MaxConcurrentWriteLimit int               `toml:"max-concurrent-write-limit"`
	MaxEnqueuedWriteLimit   int               `toml:"max-enqueued-write-limit"`
	EnqueuedWriteTimeout    time.Duration     `toml:"enqueued-write-timeout"`
	TLS                     *tls.Config       `toml:"-"`
}

Config represents a configuration for a HTTP service.

func NewConfig

func NewConfig() Config

NewConfig returns a new Config with default settings.

func (Config) Diagnostics

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

type Controller

type Controller interface {
	Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error)
	PrometheusCollectors() []prometheus.Collector
}

type DeleteBody

type DeleteBody struct {
	Start     string `json:"start"`
	Stop      string `json:"stop"`
	Predicate string `json:"predicate"`
}

type Handler

type Handler struct {
	Version   string
	BuildType string

	MetaClient interface {
		Database(name string) *meta.DatabaseInfo
		Databases() []meta.DatabaseInfo
		Authenticate(username, password string) (ui meta.User, err error)
		User(username string) (meta.User, error)
		AdminUserExists() bool
		CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
		DropRetentionPolicy(database, name string) error
		CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
		UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
	}

	QueryAuthorizer QueryAuthorizer

	WriteAuthorizer interface {
		AuthorizeWrite(username, database string) error
	}

	QueryExecutor *query.Executor

	Monitor interface {
		Statistics(tags map[string]string) ([]*monitor.Statistic, error)
		Diagnostics() (map[string]*diagnostics.Diagnostics, error)
	}

	PointsWriter interface {
		WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
	}

	Store Store

	// Flux services
	Controller       Controller
	CompilerMappings flux.CompilerMappings

	Config    *Config
	Logger    *zap.Logger
	CLFLogger *log.Logger
	// contains filtered or unexported fields
}

Handler represents an HTTP handler for the InfluxDB server.

func NewHandler

func NewHandler(c Config) *Handler

NewHandler returns a new instance of handler with routes.

func (*Handler) AddRoutes

func (h *Handler) AddRoutes(routes ...Route)

AddRoutes sets the provided routes on the handler.

func (*Handler) Close

func (h *Handler) Close()

func (*Handler) Open

func (h *Handler) Open()

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP responds to HTTP request to the handler.

func (*Handler) SetHeadersHandler

func (h *Handler) SetHeadersHandler(handler http.Handler) http.Handler

func (*Handler) SetHeadersWrapper

func (h *Handler) SetHeadersWrapper(f func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request)

wrapper that adds user supplied headers to the response.

func (*Handler) Statistics

func (h *Handler) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

type QueryAuthorizer

type QueryAuthorizer interface {
	AuthorizeQuery(u meta.User, query *influxql.Query, database string) (query.FineAuthorizer, error)
	AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error
	AuthorizeCreateDatabase(u meta.User) error
	AuthorizeCreateRetentionPolicy(u meta.User, db string) error
	AuthorizeDeleteRetentionPolicy(u meta.User, db string) error
}

type RequestInfo

type RequestInfo struct {
	IPAddr   string
	Username string
}

func (*RequestInfo) String

func (r *RequestInfo) String() string

type RequestProfile

type RequestProfile struct {
	Requests map[RequestInfo]*RequestStats
	// contains filtered or unexported fields
}

func (*RequestProfile) AddQuery

func (p *RequestProfile) AddQuery(info RequestInfo)

func (*RequestProfile) AddWrite

func (p *RequestProfile) AddWrite(info RequestInfo)

func (*RequestProfile) Stop

func (p *RequestProfile) Stop()

Stop informs the RequestTracker to stop collecting statistics for this profile.

type RequestStats

type RequestStats struct {
	Writes  int64 `json:"writes"`
	Queries int64 `json:"queries"`
}

type RequestTracker

type RequestTracker struct {
	// contains filtered or unexported fields
}

func NewRequestTracker

func NewRequestTracker() *RequestTracker

func (*RequestTracker) Add

func (rt *RequestTracker) Add(req *http.Request, user meta.User)

func (*RequestTracker) TrackRequests

func (rt *RequestTracker) TrackRequests() *RequestProfile

type Response

type Response struct {
	Results []*query.Result
	Err     error
}

Response represents a list of statement results.

func (*Response) Error

func (r *Response) Error() error

Error returns the first error from any statement. Returns nil if no errors occurred on any statements.

func (Response) MarshalJSON

func (r Response) MarshalJSON() ([]byte, error)

MarshalJSON encodes a Response struct into JSON.

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Response struct.

type ResponseWriter

type ResponseWriter interface {
	// WriteResponse writes a response.
	WriteResponse(resp Response) (int, error)

	http.ResponseWriter
}

ResponseWriter is an interface for writing a response.

func NewResponseWriter

func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter

NewResponseWriter creates a new ResponseWriter based on the Accept header in the request that wraps the ResponseWriter.

type RetentionRule

type RetentionRule struct {
	Type                      string `json:"type"`
	EverySeconds              int64  `json:"everySeconds"`
	ShardGroupDurationSeconds int64  `json:"shardGroupDurationSeconds"`
}

type Route

type Route struct {
	Name           string
	Method         string
	Pattern        string
	Gzipped        bool
	LoggingEnabled bool
	HandlerFunc    interface{}
}

Route specifies how to handle a HTTP verb for a given endpoint.

type Service

type Service struct {
	Handler *Handler

	Logger *zap.Logger
	// contains filtered or unexported fields
}

Service manages the listener and handler for an HTTP endpoint.

func NewService

func NewService(c Config) *Service

NewService returns a new instance of Service.

func (*Service) Addr

func (s *Service) Addr() net.Addr

Addr returns the listener's address. Returns nil if listener is closed.

func (*Service) BoundHTTPAddr

func (s *Service) BoundHTTPAddr() string

BoundHTTPAddr returns the string version of the address that the HTTP server is listening on. This is useful if you start an ephemeral server in test with bind address localhost:0.

func (*Service) Close

func (s *Service) Close() error

Close closes the underlying listener.

func (*Service) Err

func (s *Service) Err() <-chan error

Err returns a channel for fatal errors that occur on the listener.

func (*Service) Open

func (s *Service) Open() error

Open starts the service.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) WithLogger

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger for the service.

type Statistics

type Statistics struct {
	Requests                         int64
	CQRequests                       int64
	QueryRequests                    int64
	WriteRequests                    int64
	PingRequests                     int64
	StatusRequests                   int64
	WriteRequestBytesReceived        int64
	QueryRequestBytesTransmitted     int64
	PointsWrittenOK                  int64
	PointsWrittenDropped             int64
	PointsWrittenFail                int64
	AuthenticationFailures           int64
	RequestDuration                  int64
	QueryRequestDuration             int64
	WriteRequestDuration             int64
	ActiveRequests                   int64
	ActiveWriteRequests              int64
	ClientErrors                     int64
	ServerErrors                     int64
	RecoveredPanics                  int64
	PromWriteRequests                int64
	PromReadRequests                 int64
	FluxQueryRequests                int64
	FluxQueryRequestDuration         int64
	FluxQueryRequestBytesTransmitted int64
}

Statistics maintains statistics for the httpd service.

type StatusFilter

type StatusFilter struct {
	// contains filtered or unexported fields
}

StatusFilter will check if an http status code matches a certain pattern.

func ParseStatusFilter

func ParseStatusFilter(s string) (StatusFilter, error)

ParseStatusFilter will create a new status filter from the string.

func (StatusFilter) MarshalText

func (sf StatusFilter) MarshalText() (text []byte, err error)

MarshalText converts a duration to a string for decoding toml

func (StatusFilter) Match

func (sf StatusFilter) Match(statusCode int) bool

Match will check if the status code matches this filter.

func (*StatusFilter) UnmarshalText

func (sf *StatusFilter) UnmarshalText(text []byte) error

UnmarshalText parses a TOML value into a duration value.

type StatusFilters

type StatusFilters []StatusFilter

func (StatusFilters) Match

func (filters StatusFilters) Match(statusCode int) bool

type Store

type Store interface {
	ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error)
	Delete(database string, sources []influxql.Source, condition influxql.Expr) error
	DeleteRetentionPolicy(database, name string) error
}

Store describes the behaviour of the storage packages Store type.

type Throttler

type Throttler struct {

	// Maximum amount of time requests can wait in queue.
	// Must be set before adding middleware.
	EnqueueTimeout time.Duration

	Logger *zap.Logger
	// contains filtered or unexported fields
}

Throttler represents an HTTP throttler that limits the number of concurrent requests being processed as well as the number of enqueued requests.

func NewThrottler

func NewThrottler(concurrentN, maxEnqueueN int) *Throttler

NewThrottler returns a new instance of Throttler that limits to concurrentN. requests processed at a time and maxEnqueueN requests waiting to be processed.

func (*Throttler) Handler

func (t *Throttler) Handler(h http.Handler) http.Handler

Handler wraps h in a middleware handler that throttles requests.

Source Files

accept.go config.go flux.go gzip.go handler.go io.go listen.go pprof.go requests.go response_logger.go response_writer.go service.go

Version
v1.12.0 (latest)
Published
Apr 8, 2025
Platform
linux/amd64
Imports
59 packages
Last checked
2 days ago

Tools for package owners.