gocloud.devgocloud.dev/pubsub/natspubsub Index | Examples | Files

package natspubsub

import "gocloud.dev/pubsub/natspubsub"

Package natspubsub provides a pubsub implementation for NATS.io. Use CreateTopic to construct a *pubsub.Topic, and/or CreateSubscription to construct a *pubsub.Subscription. This package uses msgPack and the ugorji driver to encode and decode driver.Message to []byte.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats". The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://godoc.org/gocloud.dev#hdr-URLs for background information.

As

natspubsub exposes the following types for As:

Example (OpenfromURL)

Code:play 

package main

import (
	"context"

	"gocloud.dev/pubsub"
)

func main() {
	ctx := context.Background()

	// OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the NATS server at the URL in the environment
	// variable NATS_SERVER_URL and send messages with subject "mytopic".
	t, err := pubsub.OpenTopic(ctx, "nats://mytopic")

	// Similarly, OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will use the same connection and receive messages with subject
	// "mytopic".
	// Note that by default, s.Ack will result in a panic, as Ack is a meaningless
	// no-op for NATS. You can disable the panic using "?ackfunc=log" or
	// "?ackfunc=noop".
	s, err := pubsub.OpenSubscription(ctx, "nats://mytopic")
	_, _, _ = t, s, err
}

Index

Examples

Constants

const AckWarning = "" /* 190 byte string literal not displayed */

AckWarning is a message that may be used in ackFuncs.

const Scheme = "nats"

Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.

Functions

func CreateSubscription

func CreateSubscription(nc *nats.Conn, subscriptionName string, ackFunc func(), _ *SubscriptionOptions) (*pubsub.Subscription, error)

CreateSubscription returns a *pubsub.Subscription representing a NATS subscription.

ackFunc will be called when the application calls pubsub.Topic.Ack on a received message; Ack is a meaningless no-op for NATS. You can provide an empty function to leave it a no-op, or panic/log a warning if you don't expect Ack to be called.

TODO(dlc) - Options for queue groups?

Example

Code:

{
	ctx := context.Background()

	// Create a connection to NATS
	// For use with NGS and credentials.
	// nc, err := nats.Connect("connect.ngs/global", nats.UserCredentials("path_to_creds_file")
	nc, err := nats.Connect("nats://demo.nats.io")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	ackFunc := func() {
		// This function will be called when the application calls "Ack" on a
		// received message.
		// Since Ack is a meaningless no-op for NATS, you can provide an empty
		// function to do nothing, or panic/log a warning if your application
		// is built for at-most-once semantics and should never call Ack.
	}
	sub, err := natspubsub.CreateSubscription(nc, "go-cloud.example.receive", ackFunc, nil)
	if err != nil {
		// Handle error....
	}

	// Now we can use sub to receive messages.
	msg, err := sub.Receive(ctx)
	if err != nil {
		// Handle error....
	}
	// Handle Message

	// Ack will call ackFunc above. If you're only going to use at-most-once
	// providers, you can omit it.
	msg.Ack()
}

func CreateTopic

func CreateTopic(nc *nats.Conn, topicName string, _ *TopicOptions) (*pubsub.Topic, error)

CreateTopic returns a *pubsub.Topic for use with NATS. For more info, see https://nats.io/documentation/writing_applications/subjects

Example

Code:

{
	ctx := context.Background()

	// Create a connection to NATS
	// For use with NGS and credentials.
	// nc, err := nats.Connect("connect.ngs/global", nats.UserCredentials("path_to_creds_file")
	nc, err := nats.Connect("nats://demo.nats.io")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	pt, err := natspubsub.CreateTopic(nc, "go-cloud.example.send", nil)
	if err != nil {
		// Handle error....
	}

	err = pt.Send(ctx, &pubsub.Message{Body: []byte("example message")})
}

Types

type SubscriptionOptions

type SubscriptionOptions struct{}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by NATS.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by NATS.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	Connection *nats.Conn
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens NATS URLs like "nats://mytopic".

The URL host+path is used as the topic name.

The following query parameters are supported:

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Source Files

nats.go

Version
v0.12.0
Published
Mar 20, 2019
Platform
js/wasm
Imports
14 packages
Last checked
1 week ago

Tools for package owners.