package pubsub
import "gocloud.dev/pubsub"
Package pubsub provides an easy and portable way to interact with publish/
subscribe systems.
Code:play
Output: Code:play
Output: Code:play
Output: Code:play
Output:Example (ReceiveWithInvertedWorkerPool)¶
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/mempubsub"
)
func main() {
// Open a topic and corresponding subscription.
ctx := context.Background()
t := mempubsub.NewTopic()
defer t.Shutdown(ctx)
s := mempubsub.NewSubscription(t, time.Second)
defer s.Shutdown(ctx)
// Send a bunch of messages to the topic.
const nMessages = 100
for n := 0; n < nMessages; n++ {
m := &pubsub.Message{
Body: []byte(fmt.Sprintf("message %d", n)),
}
if err := t.Send(ctx, m); err != nil {
log.Fatal(err)
}
}
// In order to make our test exit, we keep track of how many messages were
// processed with wg, and cancel the receiveCtx when we've processed them all.
// A more realistic application would not need this WaitGroup.
var wg sync.WaitGroup
wg.Add(nMessages)
receiveCtx, cancel := context.WithCancel(ctx)
go func() {
wg.Wait()
cancel()
}()
// Process messages using an inverted worker pool, as described here:
// https://www.youtube.com/watch?v=5zXAHh5tJqQ&t=26m58s
// It uses a buffered channel, sem, as a semaphore to manage the maximum
// number of workers.
const poolSize = 10
sem := make(chan struct{}, poolSize)
for {
// Read a message. Receive will block until a message is available.
msg, err := s.Receive(receiveCtx)
if err != nil {
// An error from Receive is fatal; Receive will never succeed again
// so the application should exit.
// In this example, we expect to get a error here when we've read all the
// messages and receiveCtx is canceled.
break
}
// Write a token to the semaphore; if there are already poolSize workers
// active, this will block until one of them completes.
sem <- struct{}{}
// Process the message. For many applications, this can be expensive, so
// we do it in a goroutine, allowing this loop to continue and Receive more
// messages.
go func() {
// Record that we've processed this message, and Ack it.
msg.Ack()
wg.Done()
// Read a token from the semaphore before exiting this goroutine, freeing
// up the slot for another goroutine.
<-sem
}()
}
// Wait for all workers to finish.
for n := poolSize; n > 0; n-- {
sem <- struct{}{}
}
fmt.Printf("Read %d messages", nMessages)
}
Read 100 messages
Example (ReceiveWithTraditionalWorkerPool)¶
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/mempubsub"
)
func main() {
// Open a topic and corresponding subscription.
ctx := context.Background()
t := mempubsub.NewTopic()
defer t.Shutdown(ctx)
s := mempubsub.NewSubscription(t, time.Second)
defer s.Shutdown(ctx)
// Send a bunch of messages to the topic.
const nMessages = 100
for n := 0; n < nMessages; n++ {
m := &pubsub.Message{
Body: []byte(fmt.Sprintf("message %d", n)),
}
if err := t.Send(ctx, m); err != nil {
log.Fatal(err)
}
}
// In order to make our test exit, we keep track of how many messages were
// processed with wg, and cancel the receiveCtx when we've processed them all.
// A more realistic application would not need this WaitGroup.
var wg sync.WaitGroup
wg.Add(nMessages)
receiveCtx, cancel := context.WithCancel(ctx)
go func() {
wg.Wait()
cancel()
}()
// Process messages using a traditional worker pool. Consider using an
// inverted pool instead (see the other example).
const poolSize = 10
var workerWg sync.WaitGroup
for n := 0; n < poolSize; n++ {
workerWg.Add(1)
go func() {
for {
// Read a message. Receive will block until a message is available.
// It's fine to call Receive from many goroutines.
msg, err := s.Receive(receiveCtx)
if err != nil {
// An error from Receive is fatal; Receive will never succeed again
// so the application should exit.
// In this example, we expect to get a error here when we've read all
// the messages and receiveCtx is canceled.
workerWg.Done()
return
}
// Process the message and Ack it.
msg.Ack()
wg.Done()
}
}()
}
// Wait for all workers to finish.
workerWg.Wait()
fmt.Printf("Read %d messages", nMessages)
}
Read 100 messages
Example (SendReceive)¶
package main
import (
"context"
"fmt"
"log"
"time"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/mempubsub"
)
func main() {
// Open a topic and corresponding subscription.
ctx := context.Background()
t := mempubsub.NewTopic()
defer t.Shutdown(ctx)
s := mempubsub.NewSubscription(t, time.Second)
defer s.Shutdown(ctx)
// Send a message to the topic.
if err := t.Send(ctx, &pubsub.Message{Body: []byte("Hello, world!")}); err != nil {
log.Fatal(err)
}
// Receive a message from the subscription.
m, err := s.Receive(ctx)
if err != nil {
log.Fatal(err)
}
// Print out the received message.
fmt.Printf("%s\n", m.Body)
// Acknowledge the message.
m.Ack()
}
Hello, world!
Example (SendReceiveMultipleMessages)¶
package main
import (
"context"
"fmt"
"log"
"sort"
"time"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/mempubsub"
)
func main() {
// Open a topic and corresponding subscription.
ctx := context.Background()
t := mempubsub.NewTopic()
defer t.Shutdown(ctx)
s := mempubsub.NewSubscription(t, time.Second)
defer s.Shutdown(ctx)
// Send messages to the topic.
ms := []*pubsub.Message{
{Body: []byte("a")},
{Body: []byte("b")},
{Body: []byte("c")},
}
for _, m := range ms {
if err := t.Send(ctx, m); err != nil {
log.Fatal(err)
}
}
// Receive messages from the subscription.
ms2 := []string{}
for i := 0; i < len(ms); i++ {
m2, err := s.Receive(ctx)
if err != nil {
log.Fatal(err)
}
ms2 = append(ms2, string(m2.Body))
m2.Ack()
}
// The messages may be received in a different order than they were
// sent.
sort.Strings(ms2)
// Print out and acknowledge the received messages.
for _, m2 := range ms2 {
fmt.Println(m2)
}
}
a
b
c
Index ¶
- type Message
- type Subscription
- func NewSubscription(d driver.Subscription, newAckBatcher func(context.Context, *Subscription) driver.Batcher) *Subscription
- func (s *Subscription) As(i interface{}) bool
- func (s *Subscription) Receive(ctx context.Context) (*Message, error)
- func (s *Subscription) Shutdown(ctx context.Context) error
- type Topic
Examples ¶
- package (ReceiveWithInvertedWorkerPool)
- package (ReceiveWithTraditionalWorkerPool)
- package (SendReceive)
- package (SendReceiveMultipleMessages)
Types ¶
type Message ¶
type Message struct { // Body contains the content of the message. Body []byte // Metadata has key/value metadata for the message. Metadata map[string]string // contains filtered or unexported fields }
Message contains data to be published.
func (*Message) Ack ¶
func (m *Message) Ack()
Ack acknowledges the message, telling the server that it does not need to be sent again to the associated Subscription. It returns immediately, but the actual ack is sent in the background, and is not guaranteed to succeed.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription receives published messages.
func NewSubscription ¶
func NewSubscription(d driver.Subscription, newAckBatcher func(context.Context, *Subscription) driver.Batcher) *Subscription
NewSubscription creates a Subscription from a driver.Subscription and a function to make a batcher that sends batches of acks to the provider. If newAckBatcher is nil, a default batcher implementation will be used. NewSubscription is for use by provider implementations.
func (*Subscription) As ¶
func (s *Subscription) As(i interface{}) bool
As converts i to provider-specific types. See provider documentation for which type(s) are supported.
See https://github.com/google/go-cloud/blob/master/internal/docs/design.md#as for more background.
func (*Subscription) Receive ¶
func (s *Subscription) Receive(ctx context.Context) (*Message, error)
Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. This method can be called concurrently from multiple goroutines. The Ack() method of the returned Message has to be called once the message has been processed, to prevent it from being received again.
func (*Subscription) Shutdown ¶
func (s *Subscription) Shutdown(ctx context.Context) error
Shutdown flushes pending ack sends and disconnects the Subscription.
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic publishes messages to all its subscribers.
func NewTopic ¶
NewTopic makes a pubsub.Topic from a driver.Topic. It is for use by provider implementations.
func (*Topic) As ¶
As converts i to provider-specific types. See provider documentation for which type(s) are supported.
See https://github.com/google/go-cloud/blob/master/internal/docs/design.md#as for more background.
func (*Topic) Send ¶
Send publishes a message. It only returns after the message has been sent, or failed to be sent. Send can be called from multiple goroutines at once.
func (*Topic) Shutdown ¶
Shutdown flushes pending message sends and disconnects the Topic. It only returns after all pending messages have been sent.
Source Files ¶
pubsub.go
Directories ¶
Path | Synopsis |
---|---|
pubsub/driver | Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services. |
pubsub/drivertest | Package drivertest provides a conformance test for implementations of driver. |
pubsub/gcppubsub | Package gcppubsub provides an implementation of pubsub that uses GCP PubSub. |
pubsub/mempubsub | Package mempubsub provides an in-memory pubsub implementation. |
pubsub/rabbitpubsub | Package rabbitpubsub provides a pubsub driver for RabbitMQ. |
pubsub/samples | package samples contains sample programs using the pubsub API. |
pubsub/samples/gcmsg | gcmsg is a sample application that publishes messages from stdin to an existing topic or receives messages from an existing subscription and sends them to stdout. |
- Version
- v0.9.0
- Published
- Jan 15, 2019
- Platform
- js/wasm
- Imports
- 9 packages
- Last checked
- 4 days ago –
Tools for package owners.