package mqtt

import "github.com/influxdata/flux/stdlib/experimental/mqtt"

Index

Constants

const (
	DefaultConnectMQTTTimeout = 1 * time.Second
	DefaultClientID           = "flux-mqtt"
)
const (
	ToMQTTKind          = "toMQTT"
	DefaultNameColLabel = "_measurement"
)

Types

type CommonMQTTOpSpec

type CommonMQTTOpSpec struct {
	Broker      string        `json:"broker"`
	ClientID    string        `json:"clientid"`
	Username    string        `json:"username"`
	Password    string        `json:"password"`
	QoS         int64         `json:"qos"`
	Retain      bool          `json:"retain"`
	Timeout     time.Duration `json:"timeout"`
	NoKeepAlive bool          `json:"noKeepAlive"`
}

func (*CommonMQTTOpSpec) ReadArgs

func (o *CommonMQTTOpSpec) ReadArgs(args flux.Arguments) error

type ToMQTTOpSpec

type ToMQTTOpSpec struct {
	CommonMQTTOpSpec
	Topic        string   `json:"topic"` // optional in this spec
	Name         string   `json:"name"`
	NameColumn   string   `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
	TimeColumn   string   `json:"timeColumn"`
	TagColumns   []string `json:"tagColumns"`
	ValueColumns []string `json:"valueColumns"`
}

func (ToMQTTOpSpec) Kind

func (*ToMQTTOpSpec) ReadArgs

func (o *ToMQTTOpSpec) ReadArgs(args flux.Arguments) error

ReadArgs loads a flux.Arguments into ToMQTTOpSpec. It sets several default values. If the time_column isn't set, it defaults to execute.TimeColLabel. If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.

func (*ToMQTTOpSpec) UnmarshalJSON

func (o *ToMQTTOpSpec) UnmarshalJSON(b []byte) (err error)

UnmarshalJSON unmarshals and validates toMQTTOpSpec into JSON.

type ToMQTTProcedureSpec

type ToMQTTProcedureSpec struct {
	plan.DefaultCost
	Spec *ToMQTTOpSpec
}

func (*ToMQTTProcedureSpec) Copy

func (*ToMQTTProcedureSpec) Kind

type ToMQTTTransformation

type ToMQTTTransformation struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

func NewToMQTTTransformation

func NewToMQTTTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, spec *ToMQTTProcedureSpec) *ToMQTTTransformation

func (*ToMQTTTransformation) Finish

func (t *ToMQTTTransformation) Finish(id execute.DatasetID, err error)

func (*ToMQTTTransformation) Process

func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) error

func (*ToMQTTTransformation) RetractTable

func (t *ToMQTTTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*ToMQTTTransformation) UpdateProcessingTime

func (t *ToMQTTTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error

func (*ToMQTTTransformation) UpdateWatermark

func (t *ToMQTTTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error

Source Files

mqtt.go publish.go to.go

Version
v0.196.1 (latest)
Published
Feb 19, 2025
Platform
linux/amd64
Imports
18 packages
Last checked
1 day ago

Tools for package owners.