package awssnssqs
import "gocloud.dev/pubsub/awssnssqs"
Package awssnssqs provides two implementations of pubsub.Topic, one that sends messages to AWS SNS (Simple Notification Service), and one that sends messages to SQS (Simple Queuing Service). It also provides an implementation of pubsub.Subscription that receives messages from SQS.
URLs
For pubsub.OpenTopic, awssnssqs registers for the scheme "awssns" for an SNS topic, and "awssqs" for an SQS topic. For pubsub.OpenSubscription, it registers for the scheme "awssqs".
The default URL opener will use an AWS session with the default credentials and configuration.
To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.
Message Delivery Semantics
AWS SQS supports at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.
Escaping
Go CDK supports all UTF-8 strings; to make this work with services lacking full UTF-8 support, strings must be escaped (during writes) and unescaped (during reads). The following escapes are required for awssnssqs:
- Metadata keys: Characters other than "a-zA-z0-9_-.", and additionally "." when it's at the start of the key or the previous character was ".", are escaped using "__0x<hex>__". These characters were determined by experimentation.
- Metadata values: Escaped using URL encoding.
- Message body: AWS SNS/SQS only supports UTF-8 strings. See the BodyBase64Encoding enum in TopicOptions for strategies on how to send non-UTF-8 message bodies. By default, non-UTF-8 message bodies are base64 encoded.
As
awssnssqs exposes the following types for As:
- Topic: *sns.Client for OpenSNSTopic, *sqs.Client for OpenSQSTopic
- Subscription: *sqs.Client
- Message: sqstypes.Message
- Message.BeforeSend: *sns.PublishBatchRequestEntry or *sns.PublishInput (deprecated) for OpenSNSTopic, *sqstypes.SendMessageBatchRequestEntry for OpenSQSTopic
- Message.AfterSend: snstypes.PublishBatchResultEntry or *sns.PublishOutput (deprecated) for OpenSNSTopic, sqstypes.SendMessageBatchResultEntry for OpenSQSTopic
- Error: any error type returned by the service, notably smithy.APIError
Example (OpenSNSTopicFromURL)¶
Code:play
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs" // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic" // Note the 3 slashes; ARNs have multiple colons and therefore aren't valid // as hostnames in the URL. topic, err := pubsub.OpenTopic(ctx, "awssns:///"+topicARN+"?region=us-east-2") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Example (OpenSQSTopicFromURL)¶
Code:play
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs" // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() // https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html const queueURL = "sqs.us-east-2.amazonaws.com/123456789012/myqueue" topic, err := pubsub.OpenTopic(ctx, "awssqs://"+queueURL+"?region=us-east-2") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Example (OpenSubscriptionFromURL)¶
Code:play
package main import ( "context" "log" "gocloud.dev/pubsub" ) func main() { // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs" // PRAGMA: On gocloud.dev, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // This URL will open the subscription with the URL // "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue". subscription, err := pubsub.OpenSubscription(ctx, "awssqs://sqs.us-east-2.amazonaws.com/123456789012/"+ "myqueue?region=us-east-2") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Index ¶
- Constants
- Variables
- func DialSNS(cfg aws.Config) *sns.Client
- func DialSQS(cfg aws.Config) *sqs.Client
- func OpenSNSTopic(ctx context.Context, client *sns.Client, topicARN string, opts *TopicOptions) *pubsub.Topic
- func OpenSQSTopic(ctx context.Context, client *sqs.Client, qURL string, opts *TopicOptions) *pubsub.Topic
- func OpenSubscription(ctx context.Context, client *sqs.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription
- type BodyBase64Encoding
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
- package (OpenSNSTopicFromURL)
- package (OpenSQSTopicFromURL)
- package (OpenSubscriptionFromURL)
- OpenSNSTopic
- OpenSQSTopic
- OpenSubscription
Constants ¶
const ( MetadataKeyDeduplicationID = "DeduplicationId" MetadataKeyMessageGroupID = "MessageGroupId" )
Defines values for Metadata keys used by the driver for setting message attributes on SNS (sns.PublishBatchRequestEntry/snstypes.PublishBatchRequestEntry) and SQS (sqs.SendMessageBatchRequestEntry/sqstypes.SendMessageBatchRequestEntry) messages.
For example, to set a deduplication ID and message group ID on a message:
import ( "gocloud.dev/pubsub" "gocloud.dev/pubsub/awssnssqs" ) message := pubsub.Message{ Body: []byte("Hello, World!"), Metadata: map[string]string{ awssnssqs.MetadataKeyDeduplicationID: "my-dedup-id", awssnssqs.MetadataKeyMessageGroupID: "my-group-id", }, }
const SNSScheme = "awssns"
SNSScheme is the URL scheme for pubsub.OpenTopic (for an SNS topic) that awssnssqs registers its URLOpeners under on pubsub.DefaultMux.
const SQSScheme = "awssqs"
SQSScheme is the URL scheme for pubsub.OpenTopic (for an SQS topic) and for pubsub.OpenSubscription that awssnssqs registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
var OpenSNSTopicV2 = OpenSNSTopic
var OpenSQSTopicV2 = OpenSQSTopic
var OpenSubscriptionV2 = OpenSubscription
Set holds Wire providers for this package.
Functions ¶
func DialSNS ¶
DialSNS gets an AWS SNS service client using the AWS SDK V2.
func DialSQS ¶
DialSQS gets an AWS SQS service client using the AWS SDK V2.
func OpenSNSTopic ¶
func OpenSNSTopic(ctx context.Context, client *sns.Client, topicARN string, opts *TopicOptions) *pubsub.Topic
OpenSNSTopic opens a topic that sends to the SNS topic with the given Amazon
Resource Name (ARN), using AWS SDK V2.
Code:play
Example¶
package main
import (
"context"
"log"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"gocloud.dev/pubsub/awssnssqs"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// Establish a AWS V2 Config.
// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ for more info.
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// Create a *pubsub.Topic.
const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
client := sns.NewFromConfig(cfg)
topic := awssnssqs.OpenSNSTopic(ctx, client, topicARN, nil)
defer topic.Shutdown(ctx)
}
func OpenSQSTopic ¶
func OpenSQSTopic(ctx context.Context, client *sqs.Client, qURL string, opts *TopicOptions) *pubsub.Topic
OpenSQSTopic opens a topic that sends to the SQS topic with the given SQS
queue URL, using AWS SDK V2.
Code:play
Example¶
package main
import (
"context"
"log"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"gocloud.dev/pubsub/awssnssqs"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// Establish a AWS V2 Config.
// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ for more info.
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// Create a *pubsub.Topic.
const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
client := sqs.NewFromConfig(cfg)
topic := awssnssqs.OpenSQSTopic(ctx, client, queueURL, nil)
defer topic.Shutdown(ctx)
}
func OpenSubscription ¶
func OpenSubscription(ctx context.Context, client *sqs.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription
OpenSubscription opens a subscription based on AWS SQS for the given SQS
queue URL, using AWS SDK V2. The queue is assumed to be subscribed to some SNS topic, though
there is no check for this.
Code:play
Example¶
package main
import (
"context"
"log"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"gocloud.dev/pubsub/awssnssqs"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// Establish a AWS V2 Config.
// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ for more info.
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// Construct a *pubsub.Subscription.
// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
client := sqs.NewFromConfig(cfg)
subscription := awssnssqs.OpenSubscription(ctx, client, queueURL, nil)
defer subscription.Shutdown(ctx)
}
Types ¶
type BodyBase64Encoding ¶
type BodyBase64Encoding int
BodyBase64Encoding is an enum of strategies for when to base64 message bodies.
const ( // NonUTF8Only means that message bodies that are valid UTF-8 encodings are // sent as-is. Invalid UTF-8 message bodies are base64 encoded, and a // MessageAttribute with key "base64encoded" is added to the message. // When receiving messages, the "base64encoded" attribute is used to determine // whether to base64 decode, and is then filtered out. NonUTF8Only BodyBase64Encoding = 0 // Always means that all message bodies are base64 encoded. // A MessageAttribute with key "base64encoded" is added to the message. // When receiving messages, the "base64encoded" attribute is used to determine // whether to base64 decode, and is then filtered out. Always BodyBase64Encoding = 1 // Never means that message bodies are never base64 encoded. Non-UTF-8 // bytes in message bodies may be modified by SNS/SQS. Never BodyBase64Encoding = 2 )
type SubscriptionOptions ¶
type SubscriptionOptions struct { // Raw determines how the Subscription will process message bodies. // // If the subscription is expected to process messages sent directly to // SQS, or messages from SNS topics configured to use "raw" delivery, // set this to true. Message bodies will be passed through untouched. // // If false, the Subscription will use best-effort heuristics to // identify whether message bodies are raw or SNS JSON; this may be // inefficient for raw messages. // // See https://aws.amazon.com/sns/faqs/#Raw_message_delivery. Raw bool // NackLazy determines what Nack does. // // By default, Nack uses ChangeMessageVisibility to set the VisibilityTimeout // for the nacked message to 0, so that it will be redelivered immediately. // Set NackLazy to true to bypass this behavior; Nack will do nothing, // and the message will be redelivered after the existing VisibilityTimeout // expires (defaults to 30s, but can be configured per queue). // // See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html. NackLazy bool // WaitTime passed to ReceiveMessage to enable long polling. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling. // Note that a non-zero WaitTime can delay delivery of messages // by up to that duration. WaitTime time.Duration // ReceiveBatcherOptions adds constraints to the default batching done for receives. ReceiveBatcherOptions batcher.Options // AckBatcherOptions adds constraints to the default batching done for acks. AckBatcherOptions batcher.Options }
SubscriptionOptions will contain configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct { // BodyBase64Encoding determines when message bodies are base64 encoded. // The default is NonUTF8Only. BodyBase64Encoding BodyBase64Encoding // BatcherOptions adds constraints to the default batching done for sends. BatcherOptions batcher.Options }
TopicOptions contains configuration options for topics.
type URLOpener ¶
type URLOpener struct { // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens AWS SNS/SQS URLs like "awssns:///sns-topic-arn" for SNS topics or "awssqs://sqs-queue-url" for SQS topics and subscriptions.
For SNS topics, the URL's host+path is used as the topic Amazon Resource Name (ARN). Since ARNs have ":" in them, and ":" precedes a port in URL hostnames, leave the host blank and put the ARN in the path (e.g., "awssns:///arn:aws:service:region:accountid:resourceType/resourcePath").
For SQS topics and subscriptions, the URL's host+path is prefixed with "https://" to create the queue URL.
See https://pkg.go.dev/gocloud.dev/aws#V2ConfigFromURLParams.
In addition, the following query parameters are supported:
- raw (for "awssqs" Subscriptions only): sets SubscriberOptions.Raw. The value must be parseable by `strconv.ParseBool`.
- nacklazy (for "awssqs" Subscriptions only): sets SubscriberOptions.NackLazy. The value must be parseable by `strconv.ParseBool`.
- waittime: sets SubscriberOptions.WaitTime, in time.ParseDuration formats.
See gocloud.dev/aws/ConfigFromURLParams for other query parameters that affect the default AWS session.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (*URLOpener) OpenTopicURL ¶
OpenTopicURL opens a pubsub.Topic based on u.
Source Files ¶
awssnssqs.go
- Version
- v0.42.0
- Published
- Jun 28, 2025
- Platform
- linux/amd64
- Imports
- 25 packages
- Last checked
- 2 days ago –
Tools for package owners.