package natspubsub
import "gocloud.dev/pubsub/natspubsub"
Package natspubsub provides a pubsub implementation for NATS.io. Use CreateTopic to construct a *pubsub.Topic, and/or CreateSubscription to construct a *pubsub.Subscription. This package uses msgPack and the ugorji driver to encode and decode driver.Message to []byte.
URLs
For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats". The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://godoc.org/gocloud.dev#hdr-URLs for background information.
As
natspubsub exposes the following types for As:
- Topic: *nats.Conn
- Subscription: *nats.Subscription
- Message: *nats.Msg
Example (OpenfromURL)¶
Code:play
package main import ( "context" "gocloud.dev/pubsub" ) func main() { ctx := context.Background() // OpenTopic creates a *pubsub.Topic from a URL. // This URL will Dial the NATS server at the URL in the environment // variable NATS_SERVER_URL and send messages with subject "mytopic". t, err := pubsub.OpenTopic(ctx, "nats://mytopic") // Similarly, OpenSubscription creates a *pubsub.Subscription from a URL. // This URL will use the same connection and receive messages with subject // "mytopic". // Note that by default, s.Ack will result in a panic, as Ack is a meaningless // no-op for NATS. You can disable the panic using "?ackfunc=log" or // "?ackfunc=noop". s, err := pubsub.OpenSubscription(ctx, "nats://mytopic") _, _, _ = t, s, err }
Index ¶
- Constants
- func CreateSubscription(nc *nats.Conn, subscriptionName string, ackFunc func(), _ *SubscriptionOptions) (*pubsub.Subscription, error)
- func CreateTopic(nc *nats.Conn, topicName string, _ *TopicOptions) (*pubsub.Topic, error)
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const AckWarning = "" /* 190 byte string literal not displayed */
AckWarning is a message that may be used in ackFuncs.
const Scheme = "nats"
Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.
Functions ¶
func CreateSubscription ¶
func CreateSubscription(nc *nats.Conn, subscriptionName string, ackFunc func(), _ *SubscriptionOptions) (*pubsub.Subscription, error)
CreateSubscription returns a *pubsub.Subscription representing a NATS subscription.
ackFunc will be called when the application calls pubsub.Topic.Ack on a received message; Ack is a meaningless no-op for NATS. You can provide an empty function to leave it a no-op, or panic/log a warning if you don't expect Ack to be called.
TODO(dlc) - Options for queue groups?
Code:
Example¶
{
ctx := context.Background()
// Create a connection to NATS
// For use with NGS and credentials.
// nc, err := nats.Connect("connect.ngs/global", nats.UserCredentials("path_to_creds_file")
nc, err := nats.Connect("nats://demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
ackFunc := func() {
// This function will be called when the application calls "Ack" on a
// received message.
// Since Ack is a meaningless no-op for NATS, you can provide an empty
// function to do nothing, or panic/log a warning if your application
// is built for at-most-once semantics and should never call Ack.
}
sub, err := natspubsub.CreateSubscription(nc, "go-cloud.example.receive", ackFunc, nil)
if err != nil {
// Handle error....
}
// Now we can use sub to receive messages.
msg, err := sub.Receive(ctx)
if err != nil {
// Handle error....
}
// Handle Message
// Ack will call ackFunc above. If you're only going to use at-most-once
// providers, you can omit it.
msg.Ack()
}
func CreateTopic ¶
func CreateTopic(nc *nats.Conn, topicName string, _ *TopicOptions) (*pubsub.Topic, error)
CreateTopic returns a *pubsub.Topic for use with NATS.
For more info, see https://nats.io/documentation/writing_applications/subjects
Code:
Example¶
{
ctx := context.Background()
// Create a connection to NATS
// For use with NGS and credentials.
// nc, err := nats.Connect("connect.ngs/global", nats.UserCredentials("path_to_creds_file")
nc, err := nats.Connect("nats://demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
pt, err := natspubsub.CreateTopic(nc, "go-cloud.example.send", nil)
if err != nil {
// Handle error....
}
err = pt.Send(ctx, &pubsub.Message{Body: []byte("example message")})
}
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct{}
SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by NATS.
type TopicOptions ¶
type TopicOptions struct{}
TopicOptions sets options for constructing a *pubsub.Topic backed by NATS.
type URLOpener ¶
type URLOpener struct { // Connection to use for communication with the server. Connection *nats.Conn // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens NATS URLs like "nats://mytopic".
The URL host+path is used as the topic name.
The following query parameters are supported:
- ackfunc: One of "log", "noop", "panic"; defaults to "panic". Determines the behavior if pubsub.Subscription.Ack (which is a meaningless no-op for NATS) is called. "log" means a log.Printf warning will be emitted; "noop" means nothing will happen; and "panic" means the application will panic.
No query parameters are supported.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (*URLOpener) OpenTopicURL ¶
OpenTopicURL opens a pubsub.Topic based on u.
Source Files ¶
nats.go
- Version
- v0.12.0
- Published
- Mar 20, 2019
- Platform
- js/wasm
- Imports
- 14 packages
- Last checked
- 1 week ago –
Tools for package owners.