gocloud.devgocloud.dev/pubsub/rabbitpubsub Index | Examples | Files

package rabbitpubsub

import "gocloud.dev/pubsub/rabbitpubsub"

Package rabbitpubsub provides an pubsub implementation for RabbitMQ. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription.

RabbitMQ follows the AMQP specification, which uses different terminology than the Go CDK Pub/Sub.

A Pub/Sub topic is an AMQP exchange. The exchange kind should be "fanout" to match the Pub/Sub model, although publishing will work with any kind of exchange.

A Pub/Sub subscription is an AMQP queue. The queue should be bound to the exchange that is the topic of the subscription. See the package example for details.

As

rabbitpubsub exposes the following types for As:

Index

Examples

Functions

func OpenSubscription

func OpenSubscription(conn *amqp.Connection, name string, opts *SubscriptionOptions) *pubsub.Subscription

OpenSubscription returns a *pubsub.Subscription corresponding to the named queue. See the package documentation for an example.

The queue must have been previously created (for instance, by using amqp.Channel.QueueDeclare) and bound to an exchange.

OpenSubscription uses the supplied amqp.Connection for all communication. It is the caller's responsibility to establish this connection before calling OpenSubscription and to close it when Close has been called on all Subscriptions opened with it.

The documentation of the amqp package recommends using separate connections for publishing and subscribing.

Example

Code:play 

package main

import (
	"context"
	"log"

	"github.com/streadway/amqp"
	"gocloud.dev/pubsub/rabbitpubsub"
)

func main() {
	// Connect to RabbitMQ.
	conn, err := amqp.Dial("your-rabbit-url")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	// Construct a *pubsub.Subscription..
	s := rabbitpubsub.OpenSubscription(conn, "queue-name", nil)

	// Now we can use s to receive messages.
	ctx := context.Background()
	msg, err := s.Receive(ctx)
	if err != nil {
		log.Fatalf("opening subscription: %v", err)
	}
	msg.Ack()
}

func OpenTopic

func OpenTopic(conn *amqp.Connection, name string, opts *TopicOptions) *pubsub.Topic

OpenTopic returns a *pubsub.Topic corresponding to the named exchange. See the package documentation for an example.

The exchange should already exist (for instance, by using amqp.Channel.ExchangeDeclare), although this won't be checked until the first call to SendBatch. For the Go CDK Pub/Sub model to make sense, the exchange should be a fanout exchange, although nothing in this package enforces that.

OpenTopic uses the supplied amqp.Connection for all communication. It is the caller's responsibility to establish this connection before calling OpenTopic, and to close it when Close has been called on all Topics opened with it.

The documentation of the amqp package recommends using separate connections for publishing and subscribing.

Example

Code:play 

package main

import (
	"context"
	"log"

	"github.com/streadway/amqp"
	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/rabbitpubsub"
)

func main() {
	// Connect to RabbitMQ.
	conn, err := amqp.Dial("your-rabbit-url")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	// Construct a *pubsub.Topic.
	t := rabbitpubsub.OpenTopic(conn, "exchange-name", nil)

	// Now we can use t to send messages.
	ctx := context.Background()
	err = t.Send(ctx, &pubsub.Message{Body: []byte("hello")})
}

Types

type MultiError

type MultiError []error

A MultiError is an error that contains multiple errors.

func (MultiError) Error

func (m MultiError) Error() string

type SubscriptionOptions

type SubscriptionOptions struct{}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by RabbitMQ.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by RabbitMQ.

Source Files

amqp.go doc.go rabbit.go

Version
v0.10.0
Published
Feb 12, 2019
Platform
js/wasm
Imports
11 packages
Last checked
1 week ago

Tools for package owners.