package coordinator
import "github.com/influxdata/influxdb/coordinator"
Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.
Index ¶
- Constants
- Variables
- type BufferedPointsWriter
- func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter
- func (w *BufferedPointsWriter) Cap() int
- func (w *BufferedPointsWriter) Flush() error
- func (w *BufferedPointsWriter) Len() int
- func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error
- type Config
- type IntoWriteRequest
- type IteratorCreator
- type LocalShardMapper
- type LocalShardMapping
- func (a *LocalShardMapping) Close() error
- func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error)
- func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
- func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error)
- func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType
- type LocalTSDBStore
- type MetaClient
- type PointsWriter
- func NewPointsWriter() *PointsWriter
- func (w *PointsWriter) Close() error
- func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
- func (w *PointsWriter) Open() error
- func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic
- func (w *PointsWriter) WithLogger(log *zap.Logger)
- func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
- func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error
- func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
- type ShardIteratorCreator
- type ShardMapping
- func NewShardMapping(n int) *ShardMapping
- func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)
- type Source
- type StatementExecutor
- func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt influxql.Statement) error
- func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string) (err error)
- type TSDBStore
- type WritePointsRequest
- type WriteStatistics
- type WriteWindow
Constants ¶
const ( // DefaultWriteTimeout is the default timeout for a complete write to succeed. DefaultWriteTimeout = 10 * time.Second // DefaultMaxConcurrentQueries is the maximum number of running queries. // A value of zero will make the maximum query limit unlimited. DefaultMaxConcurrentQueries = 0 // DefaultMaxSelectPointN is the maximum number of points a SELECT can process. // A value of zero will make the maximum point count unlimited. DefaultMaxSelectPointN = 0 // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. // A value of zero will make the maximum series count unlimited. DefaultMaxSelectSeriesN = 0 )
Variables ¶
var ( // ErrTimeout is returned when a write times out. ErrTimeout = errors.New("timeout") // ErrPartialWrite is returned when a write partially succeeds but does // not meet the requested consistency level. ErrPartialWrite = errors.New("partial write") // ErrWriteFailed is returned when no writes succeeded. ErrWriteFailed = errors.New("write failed") )
ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.
Types ¶
type BufferedPointsWriter ¶
type BufferedPointsWriter struct {
// contains filtered or unexported fields
}
BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries write their points to the destination in batches.
func NewBufferedPointsWriter ¶
func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter
NewBufferedPointsWriter returns a new BufferedPointsWriter.
func (*BufferedPointsWriter) Cap ¶
func (w *BufferedPointsWriter) Cap() int
Cap returns the capacity (in points) of the buffer.
func (*BufferedPointsWriter) Flush ¶
func (w *BufferedPointsWriter) Flush() error
Flush writes all buffered points to the underlying writer.
func (*BufferedPointsWriter) Len ¶
func (w *BufferedPointsWriter) Len() int
Len returns the number of points buffered.
func (*BufferedPointsWriter) WritePointsInto ¶
func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error
WritePointsInto implements pointsWriter for BufferedPointsWriter.
type Config ¶
type Config struct { WriteTimeout toml.Duration `toml:"write-timeout"` MaxConcurrentQueries int `toml:"max-concurrent-queries"` QueryTimeout toml.Duration `toml:"query-timeout"` LogQueriesAfter toml.Duration `toml:"log-queries-after"` LogTimedOutQueries bool `toml:"log-timedout-queries"` MaxSelectPointN int `toml:"max-select-point"` MaxSelectSeriesN int `toml:"max-select-series"` MaxSelectBucketsN int `toml:"max-select-buckets"` TerminationQueryLog bool `toml:"termination-query-log"` }
Config represents the configuration for the coordinator service.
func NewConfig ¶
func NewConfig() Config
NewConfig returns an instance of Config with defaults.
func (Config) Diagnostics ¶
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)
Diagnostics returns a diagnostics representation of a subset of the Config.
type IntoWriteRequest ¶
IntoWriteRequest is a partial copy of cluster.WriteRequest
type IteratorCreator ¶
type IteratorCreator interface { query.IteratorCreator influxql.FieldMapper io.Closer }
IteratorCreator is an interface that combines mapping fields and creating iterators.
type LocalShardMapper ¶
type LocalShardMapper struct { MetaClient interface { ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) } TSDBStore interface { ShardGroup(ids []uint64) tsdb.ShardGroup } }
LocalShardMapper implements a ShardMapper for local shards.
func (*LocalShardMapper) MapShards ¶
func (e *LocalShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRange, opt query.SelectOptions) (query.ShardGroup, error)
MapShards maps the sources to the appropriate shards into an IteratorCreator.
type LocalShardMapping ¶
type LocalShardMapping struct { ShardMap map[Source]tsdb.ShardGroup // MinTime is the minimum time that this shard mapper will allow. // Any attempt to use a time before this one will automatically result in using // this time instead. MinTime time.Time // MaxTime is the maximum time that this shard mapper will allow. // Any attempt to use a time after this one will automatically result in using // this time instead. MaxTime time.Time }
ShardMapper maps data sources to a list of shard information.
func (*LocalShardMapping) Close ¶
func (a *LocalShardMapping) Close() error
Close clears out the list of mapped shards.
func (*LocalShardMapping) CreateIterator ¶
func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error)
func (*LocalShardMapping) FieldDimensions ¶
func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
func (*LocalShardMapping) IteratorCost ¶
func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error)
func (*LocalShardMapping) MapType ¶
func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType
type LocalTSDBStore ¶
LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator to satisfy the TSDBStore interface.
type MetaClient ¶
type MetaClient interface { CreateContinuousQuery(database, name, query string) error CreateDatabase(name string) (*meta.DatabaseInfo, error) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateSubscription(database, rp, name, mode string, destinations []string) error CreateUser(name, password string, admin bool) (meta.User, error) Database(name string) *meta.DatabaseInfo Databases() []meta.DatabaseInfo DropShard(id uint64) error DropContinuousQuery(database, name string) error DropDatabase(name string) error DropRetentionPolicy(database, name string) error DropSubscription(database, rp, name string) error DropUser(name string) error RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) SetAdminPrivilege(username string, admin bool) error SetPrivilege(username, database string, p influxql.Privilege) error ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) TruncateShardGroups(t time.Time) error UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error UpdateUser(name, password string) error UserPrivilege(username, database string) (*influxql.Privilege, error) UserPrivileges(username string) (map[string]influxql.Privilege, error) Users() []meta.UserInfo }
MetaClient is an interface for accessing meta data.
type PointsWriter ¶
type PointsWriter struct { WriteTimeout time.Duration Logger *zap.Logger Node *influxdb.Node MetaClient interface { Database(name string) (di *meta.DatabaseInfo) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) } TSDBStore interface { CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error WriteToShard(ctx tsdb.WriteContext, shardID uint64, points []models.Point) error } Subscriber interface { Send(*WritePointsRequest) } // contains filtered or unexported fields }
PointsWriter handles writes across multiple local and remote data nodes.
func NewPointsWriter ¶
func NewPointsWriter() *PointsWriter
NewPointsWriter returns a new instance of PointsWriter for a node.
func (*PointsWriter) Close ¶
func (w *PointsWriter) Close() error
Close closes the communication channel with the point writer.
func (*PointsWriter) MapShards ¶
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.
func (*PointsWriter) Open ¶
func (w *PointsWriter) Open() error
Open opens the communication channel with the point writer.
func (*PointsWriter) Statistics ¶
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic
Statistics returns statistics for periodic monitoring.
func (*PointsWriter) WithLogger ¶
func (w *PointsWriter) WithLogger(log *zap.Logger)
WithLogger sets the Logger on w.
func (*PointsWriter) WritePoints ¶
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
A wrapper for WritePointsPrivileged() - user is only required for clustering
func (*PointsWriter) WritePointsInto ¶
func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error
WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency. It is used for 'SELECT INTO' statements
func (*PointsWriter) WritePointsPrivileged ¶
func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
type ShardIteratorCreator ¶
type ShardIteratorCreator interface { ShardIteratorCreator(id uint64) query.IteratorCreator }
ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.
type ShardMapping ¶
type ShardMapping struct { Points map[uint64][]models.Point // The points associated with a shard ID Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID Dropped []models.Point // Points that were dropped // contains filtered or unexported fields }
ShardMapping contains a mapping of shards to points.
func NewShardMapping ¶
func NewShardMapping(n int) *ShardMapping
NewShardMapping creates an empty ShardMapping.
func (*ShardMapping) MapPoint ¶
func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)
MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
type Source ¶
Source contains the database and retention policy source for data.
type StatementExecutor ¶
type StatementExecutor struct { MetaClient MetaClient // TaskManager holds the StatementExecutor that handles task-related commands. TaskManager query.StatementExecutor // TSDB storage for local node. TSDBStore TSDBStore // ShardMapper for mapping shards when executing a SELECT statement. ShardMapper query.ShardMapper // Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS. Monitor *monitor.Monitor // Used for rewriting points back into system for SELECT INTO statements. PointsWriter interface { WritePointsInto(*IntoWriteRequest) error } // Disallow INF values in SELECT INTO and other previously ignored errors StrictErrorHandling bool // Select statement limits MaxSelectPointN int MaxSelectSeriesN int MaxSelectBucketsN int }
StatementExecutor executes a statement in the query.
func (*StatementExecutor) ExecuteStatement ¶
func (e *StatementExecutor) ExecuteStatement(ctx *query.ExecutionContext, stmt influxql.Statement) error
ExecuteStatement executes the given statement with the given execution context.
func (*StatementExecutor) NormalizeStatement ¶
func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string) (err error)
NormalizeStatement adds a default database and policy to the measurements in statement. Parameter defaultRetentionPolicy can be "".
type TSDBStore ¶
type TSDBStore interface { CreateShard(database, policy string, shardID uint64, enabled bool) error WriteToShard(writeCtx tsdb.WriteContext, shardID uint64, points []models.Point) error RestoreShard(id uint64, r io.Reader) error BackupShard(id uint64, since time.Time, w io.Writer) error DeleteDatabase(name string) error DeleteMeasurement(database, name string) error DeleteRetentionPolicy(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, retentionPolicy string, cond influxql.Expr) ([][]byte, error) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) SeriesCardinality(ctx context.Context, database string) (int64, error) MeasurementsCardinality(ctx context.Context, database string) (int64, error) }
TSDBStore is an interface for accessing the time series data store.
type WritePointsRequest ¶
WritePointsRequest represents a request to write point data to the cluster.
func (*WritePointsRequest) AddPoint ¶
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)
AddPoint adds a point to the WritePointRequest with field key 'value'
type WriteStatistics ¶
type WriteStatistics struct { WriteReq int64 PointWriteReq int64 PointWriteReqLocal int64 WriteOK int64 WriteDropped int64 WriteTimeout int64 WriteErr int64 SubWriteOK int64 }
WriteStatistics keeps statistics related to the PointsWriter.
type WriteWindow ¶
type WriteWindow struct {
// contains filtered or unexported fields
}
func NewWriteWindow ¶
func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow
func (*WriteWindow) WithinWindow ¶
func (w *WriteWindow) WithinWindow(t time.Time) bool
Source Files ¶
config.go meta_client.go points_writer.go shard_mapper.go statement_executor.go
- Version
- v1.12.0 (latest)
- Published
- Apr 8, 2025
- Platform
- linux/amd64
- Imports
- 23 packages
- Last checked
- 2 days ago –
Tools for package owners.