package rabbitpubsub
import "gocloud.dev/pubsub/rabbitpubsub"
Package rabbitpubsub provides a pubsub driver for RabbitMQ.
RabbitMQ follows the AMQP specification, which uses different terminology than Go Cloud Pub/Sub.
A Pub/Sub topic is an AMQP exchange. The exchange kind should be "fanout" to match the Pub/Sub model, although publishing will work with any kind of exchange.
A Pub/Sub subscription is an AMQP queue. The queue should be bound to the exchange that is the topic of the subscription. See the package example for details.
This package exposes the following types for As:
Topic: *amqp.Connection
Subscription: *amqp.Connection
Code:play
Output:Example¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/streadway/amqp"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/rabbitpubsub"
)
const rabbitURL = "amqp://guest:guest@localhost:5672/"
func main() {
// Connect to RabbitMQ.
conn, err := amqp.Dial(rabbitURL)
if err != nil {
// We can't connect to RabbitMQ, most likely because it's not available
// in the current environment. Rather than have the test fail,
// print the line that is expected.
fmt.Println("success")
return
}
defer conn.Close()
// Declare an exchange and a queue. Bind the queue to the exchange.
ch, err := conn.Channel()
if err != nil {
log.Fatalf("creating channel: %v", err)
}
const exchangeName = "my-topic"
err = ch.ExchangeDeclare(
exchangeName,
"fanout", // kind
false, // durable
false, // delete when unused
false, // internal
false, // no-wait
nil) // args
if err != nil {
log.Fatalf("declaring exchange: %v", err)
}
const queueName = "my-subscription"
q, err := ch.QueueDeclare(
queueName,
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil) // arguments
if err != nil {
log.Fatalf("declaring queue: %v", err)
}
err = ch.QueueBind(q.Name, q.Name, exchangeName,
false, // no-wait
nil) // args
if err != nil {
log.Fatalf("binding queue: %v", err)
}
ch.Close() // OpenSubscription will create its own channels.
// Publish a message to the exchange (that is, topic).
topic := rabbitpubsub.OpenTopic(conn, exchangeName)
ctx := context.Background()
err = topic.Send(ctx, &pubsub.Message{Body: []byte("hello")})
if err != nil {
log.Fatalf("sending: %v", err)
}
if err := topic.Shutdown(ctx); err != nil {
log.Fatalf("closing topic: %v", err)
}
// Receive the message from the queue (that is, subscription).
sub := rabbitpubsub.OpenSubscription(conn, queueName)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
msg, err := sub.Receive(ctx)
if err != nil {
log.Fatalf("opening subscription: %v", err)
}
msg.Ack()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := sub.Shutdown(ctx); err != nil {
log.Fatalf("closing subscription: %v", err)
}
fmt.Println("success")
}
success
Index ¶
- func OpenSubscription(conn *amqp.Connection, name string) *pubsub.Subscription
- func OpenTopic(conn *amqp.Connection, name string) *pubsub.Topic
- type MultiError
Examples ¶
Functions ¶
func OpenSubscription ¶
func OpenSubscription(conn *amqp.Connection, name string) *pubsub.Subscription
OpenSubscription returns a *pubsub.Subscription corresponding to the named queue. The queue must have been previously created (for instance, by using amqp.Channel.QueueDeclare) and bound to an exchange.
OpenSubscription uses the supplied amqp.Connection for all communication. It is the caller's responsibility to establish this connection before calling OpenSubscription and to close it when Close has been called on all Subscriptions opened with it.
The documentation of the amqp package recommends using separate connections for publishing and subscribing.
func OpenTopic ¶
func OpenTopic(conn *amqp.Connection, name string) *pubsub.Topic
OpenTopic returns a *pubsub.Topic corresponding to the named exchange. The exchange should already exist (for instance, by using amqp.Channel.ExchangeDeclare), although this won't be checked until the first call to SendBatch. For the model of Go Cloud Pub/Sub to make sense, the exchange should be a fanout exchange, although nothing in this package enforces that.
OpenTopic uses the supplied amqp.Connection for all communication. It is the caller's responsibility to establish this connection before calling OpenTopic, and to close it when Close has been called on all Topics opened with it.
The documentation of the amqp package recommends using separate connections for publishing and subscribing.
Types ¶
type MultiError ¶
type MultiError []error
A MultiError is an error that contains multiple errors.
func (MultiError) Error ¶
func (m MultiError) Error() string
Source Files ¶
amqp.go doc.go rabbit.go
- Version
- v0.9.0
- Published
- Jan 15, 2019
- Platform
- js/wasm
- Imports
- 10 packages
- Last checked
- 1 week ago –
Tools for package owners.