package kafkapubsub
import "gocloud.dev/pubsub/kafkapubsub"
Package kafkapubsub provides an implementation of pubsub for Kafka. It requires a minimum Kafka version of 0.11.x for Header support. Some functionality may work with earlier versions of Kafka.
See https://kafka.apache.org/documentation.html#semantics for a discussion of message semantics in Kafka. sarama.Config exposes many knobs that can affect performance and semantics, so review and set them carefully.
kafkapubsub does not support Message.Nack; Message.Nackable will return false, and Message.Nack will panic if called.
URLs
For pubsub.OpenTopic and pubsub.OpenSubscription, kafkapubsub registers for the scheme "kafka". The default URL opener will connect to a default set of Kafka brokers based on the environment variable "KAFKA_BROKERS", expected to be a comma-delimited set of server addresses. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.
Escaping
Go CDK supports all UTF-8 strings. No escaping is required for Kafka. Message metadata is supported through Kafka Headers, which allow arbitrary []byte for both key and value. These are converted to string for use in Message.Metadata.
As
kafkapubsub exposes the following types for As:
- Topic: sarama.SyncProducer
- Subscription: sarama.ConsumerGroup, sarama.ConsumerGroupSession (may be nil during session renegotiation, and session may go stale at any time)
- Message: *sarama.ConsumerMessage
- Message.BeforeSend: *sarama.ProducerMessage
- Error: sarama.ConsumerError, sarama.ConsumerErrors, sarama.ProducerError, sarama.ProducerErrors, sarama.ConfigurationError, sarama.PacketDecodingError, sarama.PacketEncodingError, sarama.KError
Example (OpenSubscriptionFromURL)¶
Code:play
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // This example is used in https://gocloud.dev/howto/pubsub/subscribe/#kafka // import _ "gocloud.dev/pubsub/kafkapubsub" // Variables set up elsewhere: ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // The host + path are used as the consumer group name. // The "topic" query parameter sets one or more topics to subscribe to. // The set of brokers must be in an environment variable KAFKA_BROKERS. subscription, err := pubsub.OpenSubscription(ctx, "kafka://my-group?topic=my-topic") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Example (OpenTopicFromURL)¶
Code:play
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // This example is used in https://gocloud.dev/howto/pubsub/publish/#kafka // import _ "gocloud.dev/pubsub/kafkapubsub" // Variables set up elsewhere: ctx := context.Background() // pubsub.OpenTopic creates a *pubsub.Topic from a URL. // The host + path are the topic name to send to. // The set of brokers must be in an environment variable KAFKA_BROKERS. topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Index ¶
- Constants
- func MinimalConfig() *sarama.Config
- func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
- func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const Scheme = "kafka"
Scheme is the URL scheme that kafkapubsub registers its URLOpeners under on pubsub.DefaultMux.
Functions ¶
func MinimalConfig ¶
MinimalConfig returns a minimal sarama.Config.
func OpenSubscription ¶
func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription creates a pubsub.Subscription that joins group, receiving messages from topics.
It uses a sarama.ConsumerGroup to receive messages. Consumer options can
be configured in the Consumer section of the sarama.Config:
https://godoc.org/github.com/Shopify/sarama#Config.
Code:play
Example¶
package main
import (
"context"
"log"
"gocloud.dev/pubsub/kafkapubsub"
)
func main() {
// This example is used in https://gocloud.dev/howto/pubsub/subscribe/#kafka-ctor
// Variables set up elsewhere:
ctx := context.Background()
// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()
// Construct a *pubsub.Subscription, joining the consumer group "my-group"
// and receiving messages from "my-topic".
subscription, err := kafkapubsub.OpenSubscription(
addrs, config, "my-group", []string{"my-topic"}, nil)
if err != nil {
log.Fatal(err)
}
defer subscription.Shutdown(ctx)
}
func OpenTopic ¶
func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)
OpenTopic creates a pubsub.Topic that sends to a Kafka topic.
It uses a sarama.SyncProducer to send messages. Producer options can be configured in the Producer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.
Config.Producer.Return.Success must be set to true.
Code:play
Example¶
package main
import (
"context"
"log"
"gocloud.dev/pubsub/kafkapubsub"
)
func main() {
// This example is used in https://gocloud.dev/howto/pubsub/publish/#kafka-ctor
// Variables set up elsewhere:
ctx := context.Background()
// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()
// Construct a *pubsub.Topic.
topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
if err != nil {
log.Fatal(err)
}
defer topic.Shutdown(ctx)
}
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct { // KeyName optionally sets the Message.Metadata key in which to store the // Kafka message key. If set, and if the Kafka message key is non-empty, // the key value will be stored in Message.Metadata under KeyName. KeyName string // WaitForJoin causes OpenSubscription to wait for up to WaitForJoin // to allow the client to join the consumer group. // Messages sent to the topic before the client joins the group // may not be received by this subscription. // OpenSubscription will succeed even if WaitForJoin elapses and // the subscription still hasn't been joined successfully. WaitForJoin time.Duration }
SubscriptionOptions contains configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct { // KeyName optionally sets the Message.Metadata key to use as the optional // Kafka message key. If set, and if a matching Message.Metadata key is found, // the value for that key will be used as the message key when sending to // Kafka, instead of being added to the message headers. KeyName string }
TopicOptions contains configuration options for topics.
type URLOpener ¶
type URLOpener struct { // Brokers is the slice of brokers in the Kafka cluster. Brokers []string // Config is the Sarama Config. // Config.Producer.Return.Success must be set to true. Config *sarama.Config // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens Kafka URLs like "kafka://mytopic" for topics and "kafka://group?topic=mytopic" for subscriptions.
For topics, the URL's host+path is used as the topic name.
For subscriptions, the URL's host+path is used as the group name, and the "topic" query parameter(s) are used as the set of topics to subscribe to.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (*URLOpener) OpenTopicURL ¶
OpenTopicURL opens a pubsub.Topic based on u.
Source Files ¶
kafka.go
- Version
- v0.14.0
- Published
- May 29, 2019
- Platform
- js/wasm
- Imports
- 16 packages
- Last checked
- 1 week ago –
Tools for package owners.