gocloud.devgocloud.dev/pubsub/awssnssqs Index | Examples | Files

package awssnssqs

import "gocloud.dev/pubsub/awssnssqs"

Package awssnssqs provides two implementations of pubsub.Topic, one that sends messages to AWS SNS (Simple Notification Service), and one that sends messages to SQS (Simple Queuing Service). It also provides an implementation of pubsub.Subscription that receives messages from SQS.

URLs

For pubsub.OpenTopic, awssnssqs registers for the scheme "awssns" for an SNS topic, and "awssqs" for an SQS topic. For pubsub.OpenSubscription, it registers for the scheme "awssqs".

The default URL opener will use an AWS session with the default credentials and configuration.

To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Message Delivery Semantics

AWS SQS supports at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

Escaping

Go CDK supports all UTF-8 strings; to make this work with services lacking full UTF-8 support, strings must be escaped (during writes) and unescaped (during reads). The following escapes are required for awssnssqs:

As

awssnssqs exposes the following types for As:

Example (OpenSNSTopicFromURL)

Code:play 

package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
	// Note the 3 slashes; ARNs have multiple colons and therefore aren't valid
	// as hostnames in the URL.
	topic, err := pubsub.OpenTopic(ctx, "awssns:///"+topicARN+"?region=us-east-2")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Example (OpenSQSTopicFromURL)

Code:play 

package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
	const queueURL = "sqs.us-east-2.amazonaws.com/123456789012/myqueue"
	topic, err := pubsub.OpenTopic(ctx, "awssqs://"+queueURL+"?region=us-east-2")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Example (OpenSubscriptionFromURL)

Code:play 

package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will open the subscription with the URL
	// "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue".
	subscription, err := pubsub.OpenSubscription(ctx,
		"awssqs://sqs.us-east-2.amazonaws.com/123456789012/"+
			"myqueue?region=us-east-2")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}

Index

Examples

Constants

const (
	MetadataKeyDeduplicationID = "DeduplicationId"
	MetadataKeyMessageGroupID  = "MessageGroupId"
)

Defines values for Metadata keys used by the driver for setting message attributes on SNS (sns.PublishBatchRequestEntry/snstypes.PublishBatchRequestEntry) and SQS (sqs.SendMessageBatchRequestEntry/sqstypes.SendMessageBatchRequestEntry) messages.

For example, to set a deduplication ID and message group ID on a message:

import (
	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/awssnssqs"
)

message := pubsub.Message{
	Body: []byte("Hello, World!"),
	Metadata: map[string]string{
		awssnssqs.MetadataKeyDeduplicationID: "my-dedup-id",
		awssnssqs.MetadataKeyMessageGroupID:  "my-group-id",
	},
}
const SNSScheme = "awssns"

SNSScheme is the URL scheme for pubsub.OpenTopic (for an SNS topic) that awssnssqs registers its URLOpeners under on pubsub.DefaultMux.

const SQSScheme = "awssqs"

SQSScheme is the URL scheme for pubsub.OpenTopic (for an SQS topic) and for pubsub.OpenSubscription that awssnssqs registers its URLOpeners under on pubsub.DefaultMux.

Variables

var OpenSNSTopicV2 = OpenSNSTopic
var OpenSQSTopicV2 = OpenSQSTopic
var OpenSubscriptionV2 = OpenSubscription
var Set = wire.NewSet(
	DialSNS,
	DialSQS,
)

Set holds Wire providers for this package.

Functions

func DialSNS

func DialSNS(cfg aws.Config) *sns.Client

DialSNS gets an AWS SNS service client using the AWS SDK V2.

func DialSQS

func DialSQS(cfg aws.Config) *sqs.Client

DialSQS gets an AWS SQS service client using the AWS SDK V2.

func OpenSNSTopic

func OpenSNSTopic(ctx context.Context, client *sns.Client, topicARN string, opts *TopicOptions) *pubsub.Topic

OpenSNSTopic opens a topic that sends to the SNS topic with the given Amazon Resource Name (ARN), using AWS SDK V2.

Example

Code:play 

package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sns"
	"gocloud.dev/pubsub/awssnssqs"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Establish a AWS V2 Config.
	// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ for more info.
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Create a *pubsub.Topic.
	const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
	client := sns.NewFromConfig(cfg)
	topic := awssnssqs.OpenSNSTopic(ctx, client, topicARN, nil)
	defer topic.Shutdown(ctx)
}

func OpenSQSTopic

func OpenSQSTopic(ctx context.Context, client *sqs.Client, qURL string, opts *TopicOptions) *pubsub.Topic

OpenSQSTopic opens a topic that sends to the SQS topic with the given SQS queue URL, using AWS SDK V2.

Example

Code:play 

package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"gocloud.dev/pubsub/awssnssqs"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Establish a AWS V2 Config.
	// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ for more info.
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Create a *pubsub.Topic.
	const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
	client := sqs.NewFromConfig(cfg)
	topic := awssnssqs.OpenSQSTopic(ctx, client, queueURL, nil)
	defer topic.Shutdown(ctx)
}

func OpenSubscription

func OpenSubscription(ctx context.Context, client *sqs.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription

OpenSubscription opens a subscription based on AWS SQS for the given SQS queue URL, using AWS SDK V2. The queue is assumed to be subscribed to some SNS topic, though there is no check for this.

Example

Code:play 

package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"gocloud.dev/pubsub/awssnssqs"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Establish a AWS V2 Config.
	// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ for more info.
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Construct a *pubsub.Subscription.
	// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
	const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
	client := sqs.NewFromConfig(cfg)
	subscription := awssnssqs.OpenSubscription(ctx, client, queueURL, nil)
	defer subscription.Shutdown(ctx)
}

Types

type BodyBase64Encoding

type BodyBase64Encoding int

BodyBase64Encoding is an enum of strategies for when to base64 message bodies.

const (
	// NonUTF8Only means that message bodies that are valid UTF-8 encodings are
	// sent as-is. Invalid UTF-8 message bodies are base64 encoded, and a
	// MessageAttribute with key "base64encoded" is added to the message.
	// When receiving messages, the "base64encoded" attribute is used to determine
	// whether to base64 decode, and is then filtered out.
	NonUTF8Only BodyBase64Encoding = 0
	// Always means that all message bodies are base64 encoded.
	// A MessageAttribute with key "base64encoded" is added to the message.
	// When receiving messages, the "base64encoded" attribute is used to determine
	// whether to base64 decode, and is then filtered out.
	Always BodyBase64Encoding = 1
	// Never means that message bodies are never base64 encoded. Non-UTF-8
	// bytes in message bodies may be modified by SNS/SQS.
	Never BodyBase64Encoding = 2
)

type SubscriptionOptions

type SubscriptionOptions struct {
	// Raw determines how the Subscription will process message bodies.
	//
	// If the subscription is expected to process messages sent directly to
	// SQS, or messages from SNS topics configured to use "raw" delivery,
	// set this to true. Message bodies will be passed through untouched.
	//
	// If false, the Subscription will use best-effort heuristics to
	// identify whether message bodies are raw or SNS JSON; this may be
	// inefficient for raw messages.
	//
	// See https://aws.amazon.com/sns/faqs/#Raw_message_delivery.
	Raw bool

	// NackLazy determines what Nack does.
	//
	// By default, Nack uses ChangeMessageVisibility to set the VisibilityTimeout
	// for the nacked message to 0, so that it will be redelivered immediately.
	// Set NackLazy to true to bypass this behavior; Nack will do nothing,
	// and the message will be redelivered after the existing VisibilityTimeout
	// expires (defaults to 30s, but can be configured per queue).
	//
	// See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.
	NackLazy bool

	// WaitTime passed to ReceiveMessage to enable long polling.
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling.
	// Note that a non-zero WaitTime can delay delivery of messages
	// by up to that duration.
	WaitTime time.Duration

	// ReceiveBatcherOptions adds constraints to the default batching done for receives.
	ReceiveBatcherOptions batcher.Options

	// AckBatcherOptions adds constraints to the default batching done for acks.
	AckBatcherOptions batcher.Options
}

SubscriptionOptions will contain configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// BodyBase64Encoding determines when message bodies are base64 encoded.
	// The default is NonUTF8Only.
	BodyBase64Encoding BodyBase64Encoding

	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
}

TopicOptions contains configuration options for topics.

type URLOpener

type URLOpener struct {
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens AWS SNS/SQS URLs like "awssns:///sns-topic-arn" for SNS topics or "awssqs://sqs-queue-url" for SQS topics and subscriptions.

For SNS topics, the URL's host+path is used as the topic Amazon Resource Name (ARN). Since ARNs have ":" in them, and ":" precedes a port in URL hostnames, leave the host blank and put the ARN in the path (e.g., "awssns:///arn:aws:service:region:accountid:resourceType/resourcePath").

For SQS topics and subscriptions, the URL's host+path is prefixed with "https://" to create the queue URL.

See https://pkg.go.dev/gocloud.dev/aws#V2ConfigFromURLParams.

In addition, the following query parameters are supported:

See gocloud.dev/aws/ConfigFromURLParams for other query parameters that affect the default AWS session.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Source Files

awssnssqs.go

Version
v0.42.0
Published
Jun 28, 2025
Platform
linux/amd64
Imports
25 packages
Last checked
2 days ago

Tools for package owners.