package kafka

import "github.com/influxdata/flux/stdlib/kafka"

Index

Constants

const (
	// ToKafkaKind is the Kind for the ToKafka Flux function
	ToKafkaKind = "toKafka"
)

Variables

var DefaultKafkaWriterFactory = func(conf kafka.WriterConfig) KafkaWriter {
	return kafka.NewWriter(conf)
}

DefaultKafkaWriterFactory is a terrible name for a way to make a kafkaWriter that is injectable for testing

Types

type KafkaWriter

type KafkaWriter interface {
	io.Closer
	WriteMessages(context.Context, ...kafka.Message) error
}

KafkaWriter is an interface for what we need fromDefaultKafkaWriterFactory

type ToKafkaOpSpec

type ToKafkaOpSpec struct {
	Brokers      []string `json:"brokers"`
	Topic        string   `json:"topic"`
	Balancer     string   `json:"balancer"`
	Name         string   `json:"name"`
	NameColumn   string   `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
	TimeColumn   string   `json:"timeColumn"`
	TagColumns   []string `json:"tagColumns"`
	ValueColumns []string `json:"valueColumns"`
	MsgBufSize   int      `json:"msgBufferSize"` // the maximim number of messages to buffer before sending to kafka, the library we use defaults to 100
}

func (ToKafkaOpSpec) Kind

func (*ToKafkaOpSpec) ReadArgs

func (o *ToKafkaOpSpec) ReadArgs(args flux.Arguments) error

ReadArgs loads a flux.Arguments into ToKafkaOpSpec. It sets several default values. If the time_column isn't set, it defaults to execute.TimeColLabel. If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.

type ToKafkaProcedureSpec

type ToKafkaProcedureSpec struct {
	plan.DefaultCost
	Spec *ToKafkaOpSpec
	// contains filtered or unexported fields
}

func (*ToKafkaProcedureSpec) Copy

func (*ToKafkaProcedureSpec) Kind

type ToKafkaTransformation

type ToKafkaTransformation struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

func NewToKafkaTransformation

func NewToKafkaTransformation(d execute.Dataset, deps flux.Dependencies, cache execute.TableBuilderCache, spec *ToKafkaProcedureSpec) (*ToKafkaTransformation, error)

func (*ToKafkaTransformation) Finish

func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error)

func (*ToKafkaTransformation) Process

func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (err error)

func (*ToKafkaTransformation) RetractTable

func (t *ToKafkaTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*ToKafkaTransformation) UpdateProcessingTime

func (t *ToKafkaTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error

func (*ToKafkaTransformation) UpdateWatermark

func (t *ToKafkaTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error

Source Files

to.go

Version
v0.196.1 (latest)
Published
Feb 19, 2025
Platform
linux/amd64
Imports
19 packages
Last checked
1 day ago

Tools for package owners.