package amqp
import "github.com/Azure/go-amqp"
Package amqp provides an AMQP 1.0 client implementation.
AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today.
The example below shows how to use this package to connect
to a Microsoft Azure Service Bus queue.
Code:play
Example¶
package main
import (
"context"
"fmt"
"log"
"time"
amqp "github.com/Azure/go-amqp"
)
func main() {
ctx := context.TODO()
// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer conn.Close()
// open a session
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// send a message
{
// create a sender
sender, err := session.NewSender(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")), nil)
if err != nil {
log.Fatal("Sending message:", err)
}
sender.Close(ctx)
cancel()
}
// continuously read messages
{
// create a receiver
receiver, err := session.NewReceiver(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
for {
// receive next message
msg, err := receiver.Receive(ctx, nil)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// accept message
if err = receiver.AcceptMessage(context.TODO(), msg); err != nil {
log.Fatalf("Failure accepting message: %v", err)
}
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
Index ¶
- type Annotations
- type Conn
- func Dial(ctx context.Context, addr string, opts *ConnOptions) (*Conn, error)
- func NewConn(ctx context.Context, conn net.Conn, opts *ConnOptions) (*Conn, error)
- func (c *Conn) Close() error
- func (c *Conn) Done() <-chan struct{}
- func (c *Conn) Err() error
- func (c *Conn) NewSession(ctx context.Context, opts *SessionOptions) (*Session, error)
- func (c *Conn) Properties() map[string]any
- type ConnError
- type ConnOptions
- type DeliveryState
- type DrainCreditOptions
- type Durability
- type ErrCond
- type Error
- type ExpiryPolicy
- type LinkError
- type LinkFilter
- func NewLinkFilter(name string, code uint64, value any) LinkFilter
- func NewSelectorFilter(filter string) LinkFilter
- type Message
- func NewMessage(data []byte) *Message
- func (m *Message) GetData() []byte
- func (m *Message) Marshal(wr *buffer.Buffer) error
- func (m *Message) MarshalBinary() ([]byte, error)
- func (m *Message) Unmarshal(r *buffer.Buffer) error
- func (m *Message) UnmarshalBinary(data []byte) error
- type MessageHeader
- func (h *MessageHeader) Marshal(wr *buffer.Buffer) error
- func (h *MessageHeader) Unmarshal(r *buffer.Buffer) error
- type MessageProperties
- func (p *MessageProperties) Marshal(wr *buffer.Buffer) error
- func (p *MessageProperties) Unmarshal(r *buffer.Buffer) error
- type ModifyMessageOptions
- type Null
- type ReceiveOptions
- type Receiver
- func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error
- func (r *Receiver) Address() string
- func (r *Receiver) Close(ctx context.Context) error
- func (r *Receiver) DrainCredit(ctx context.Context, _ *DrainCreditOptions) error
- func (r *Receiver) IssueCredit(credit uint32) error
- func (r *Receiver) LinkName() string
- func (r *Receiver) LinkSourceFilterValue(name string) any
- func (r *Receiver) ModifyMessage(ctx context.Context, msg *Message, options *ModifyMessageOptions) error
- func (r *Receiver) Prefetched() *Message
- func (r *Receiver) Properties() map[string]any
- func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message, error)
- func (r *Receiver) RejectMessage(ctx context.Context, msg *Message, e *Error) error
- func (r *Receiver) ReleaseMessage(ctx context.Context, msg *Message) error
- type ReceiverOptions
- type ReceiverSettleMode
- type SASLType
- func SASLTypeAnonymous() SASLType
- func SASLTypeExternal(resp string) SASLType
- func SASLTypePlain(username, password string) SASLType
- func SASLTypeXOAUTH2(username, bearer string, saslMaxFrameSizeOverride uint32) SASLType
- type SendOptions
- type SendReceipt
- func (s SendReceipt) DeliveryTag() []byte
- func (s *SendReceipt) Wait(ctx context.Context) (DeliveryState, error)
- type SendWithReceiptOptions
- type Sender
- func (s *Sender) Address() string
- func (s *Sender) Close(ctx context.Context) error
- func (s *Sender) LinkName() string
- func (s *Sender) MaxMessageSize() uint64
- func (s *Sender) Properties() map[string]any
- func (s *Sender) Send(ctx context.Context, msg *Message, opts *SendOptions) error
- func (s *Sender) SendWithReceipt(ctx context.Context, msg *Message, opts *SendWithReceiptOptions) (SendReceipt, error)
- type SenderOptions
- type SenderSettleMode
- type Session
- func (s *Session) Close(ctx context.Context) error
- func (s *Session) NewReceiver(ctx context.Context, source string, opts *ReceiverOptions) (*Receiver, error)
- func (s *Session) NewSender(ctx context.Context, target string, opts *SenderOptions) (*Sender, error)
- func (s *Session) Properties() map[string]any
- type SessionError
- type SessionOptions
- type StateAccepted
- type StateModified
- type StateReceived
- type StateRejected
- type StateReleased
- type Symbol
- type UUID
Examples ¶
Types ¶
type Annotations ¶
type Annotations = encoding.Annotations
Annotations keys must be of type string, int, or int64.
String keys are encoded as AMQP Symbols.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn is an AMQP connection.
func Dial ¶
Dial connects to an AMQP broker.
If the addr includes a scheme, it must be "amqp", "amqps", or "amqp+ssl". If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps" or "amqp+ssl".
If username and password information is not empty it's used as SASL PLAIN credentials, equal to passing ConnSASLPlain option.
opts: pass nil to accept the default values.
func NewConn ¶
NewConn establishes a new AMQP client connection over conn. NOTE: Conn takes ownership of the provided net.Conn and will close it as required. opts: pass nil to accept the default values.
func (*Conn) Close ¶
Close closes the connection.
Returns nil if there were no errors during shutdown, or a *ConnError. This error is not actionable and is purely for diagnostic purposes.
The error returned by subsequent calls to Close is idempotent, so the same value will always be returned.
func (*Conn) Done ¶
func (c *Conn) Done() <-chan struct{}
Done returns a channel that's closed when Conn is closed.
Code:play
Example¶
package main
import (
"context"
"errors"
"log"
amqp "github.com/Azure/go-amqp"
)
func main() {
ctx := context.TODO()
// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
// when the channel returned by Done is closed, conn has been closed
<-conn.Done()
// Err indicates why the connection was closed. a nil error indicates
// a client-side call to Close and there were no errors during shutdown.
closedErr := conn.Err()
// when Err returns a non-nil error, it means that either a client-side
// call to Close encountered an error during shutdown, a fatal error was
// encountered that caused the connection to close, or that the peer
// closed the connection.
if closedErr != nil {
// the error returned by Err is always a *ConnError
var connErr *amqp.ConnError
errors.As(closedErr, &connErr)
if connErr.RemoteErr != nil {
// the peer closed the connection and provided an error explaining why.
// note that the peer MAY send an error when closing the connection but
// is not required to.
} else {
// the connection encountered a fatal error or there was
// an error during client-side shutdown. this is for
// diagnostics, the connection has been closed.
}
}
}
func (*Conn) Err ¶
If Done is not yet closed, Err returns nil. If Done is closed, Err returns nil or a *ConnError explaining why. A nil error indicates that [Close] was called and there were no errors during shutdown.
A *ConnError indicates one of three things
- there was an error during shutdown from a client-side call to [Close]. the error is not actionable and is purely for diagnostic purposes.
- a fatal error was encountered that caused Conn to close
- the peer closed the connection. [ConnError.RemoteErr] MAY contain an error from the peer indicating why it closed the connection
func (*Conn) NewSession ¶
NewSession starts a new session on the connection.
- ctx controls waiting for the peer to acknowledge the session
- opts contains optional values, pass nil to accept the defaults
If the context's deadline expires or is cancelled before the operation completes, an error is returned. If the Session was successfully created, it will be cleaned up in future calls to NewSession.
func (*Conn) Properties ¶
Properties returns the peer's connection properties. Returns nil if the peer didn't send any properties.
type ConnError ¶
type ConnError struct { // RemoteErr contains any error information provided by the peer if the peer closed the AMQP connection. RemoteErr *Error // contains filtered or unexported fields }
ConnError is returned by methods on Conn and propagated to Session and Senders/Receivers
when the connection has been closed.
Code:play
Example¶
package main
import (
"context"
"errors"
"log"
amqp "github.com/Azure/go-amqp"
)
func main() {
// *ConnErrors are returned when the underlying connection has been closed.
// this error is propagated to all child Session, Sender, and Receiver instances.
ctx := context.TODO()
// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
// open a session
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// create a sender
sender, err := session.NewSender(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating sender link:", err)
}
// close the connection before sending the message
conn.Close()
// attempt to send message on a closed connection
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")), nil)
var connErr *amqp.ConnError
if !errors.As(err, &connErr) {
log.Fatalf("unexpected error type %T", err)
}
// similarly, methods on session will fail in the same way
_, err = session.NewReceiver(ctx, "/queue-name", nil)
if !errors.As(err, &connErr) {
log.Fatalf("unexpected error type %T", err)
}
// methods on the connection will also fail
_, err = conn.NewSession(ctx, nil)
if !errors.As(err, &connErr) {
log.Fatalf("unexpected error type %T", err)
}
}
func (*ConnError) Error ¶
Error implements the error interface for ConnError.
func (*ConnError) Unwrap ¶
Unwrap returns the RemoteErr, if any.
type ConnOptions ¶
type ConnOptions struct { // ContainerID sets the container-id to use when opening the connection. // // A container ID will be randomly generated if this option is not used. ContainerID string // HostName sets the hostname sent in the AMQP // Open frame and TLS ServerName (if not otherwise set). HostName string // IdleTimeout specifies the maximum period between // receiving frames from the peer. // // Specify a value less than zero to disable idle timeout. // // Default: 1 minute (60000000000). IdleTimeout time.Duration // MaxFrameSize sets the maximum frame size that // the connection will accept. // // Must be 512 or greater. // // Default: 65536. MaxFrameSize uint32 // MaxSessions sets the maximum number of channels. // The value must be greater than zero. // // Default: 65536. MaxSessions uint16 // Properties sets an entry in the connection properties map sent to the server. Properties map[string]any // SASLType contains the specified SASL authentication mechanism. SASLType SASLType // TLSConfig sets the tls.Config to be used during // TLS negotiation. // // This option is for advanced usage, in most scenarios // providing a URL scheme of "amqps://" is sufficient. TLSConfig *tls.Config // WriteTimeout controls the write deadline when writing AMQP frames to the // underlying net.Conn and no caller provided context.Context is available or // the context contains no deadline (e.g. context.Background()). // The timeout is set per write. // // Setting to a value less than zero means no timeout is set, so writes // defer to the underlying behavior of net.Conn with no write deadline. // // Default: 30s WriteTimeout time.Duration // contains filtered or unexported fields }
ConnOptions contains the optional settings for configuring an AMQP connection.
type DeliveryState ¶
type DeliveryState = encoding.DeliveryState
DeliveryState encapsulates the various concrete delivery states. Use a type switch to determine the concrete delivery state.
- *StateAccepted
- *StateModified
- *StateReceived
- *StateRejected
- *StateReleased
type DrainCreditOptions ¶
type DrainCreditOptions struct { }
DrainCreditOptions contains any optional values for the Receiver.DrainCredit method.
type Durability ¶
type Durability = encoding.Durability
Durability specifies the durability of a link.
const ( // No terminus state is retained durably. DurabilityNone Durability = encoding.DurabilityNone // Only the existence and configuration of the terminus is // retained durably. DurabilityConfiguration Durability = encoding.DurabilityConfiguration // In addition to the existence and configuration of the // terminus, the unsettled state for durable messages is // retained durably. DurabilityUnsettledState Durability = encoding.DurabilityUnsettledState )
Durability Policies
type ErrCond ¶
ErrCond is an AMQP defined error condition. See http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-amqp-error for info on their meaning.
const ( // AMQP Errors ErrCondDecodeError ErrCond = "amqp:decode-error" ErrCondFrameSizeTooSmall ErrCond = "amqp:frame-size-too-small" ErrCondIllegalState ErrCond = "amqp:illegal-state" ErrCondInternalError ErrCond = "amqp:internal-error" ErrCondInvalidField ErrCond = "amqp:invalid-field" ErrCondNotAllowed ErrCond = "amqp:not-allowed" ErrCondNotFound ErrCond = "amqp:not-found" ErrCondNotImplemented ErrCond = "amqp:not-implemented" ErrCondPreconditionFailed ErrCond = "amqp:precondition-failed" ErrCondResourceDeleted ErrCond = "amqp:resource-deleted" ErrCondResourceLimitExceeded ErrCond = "amqp:resource-limit-exceeded" ErrCondResourceLocked ErrCond = "amqp:resource-locked" ErrCond = "amqp:unauthorized-access" // Connection Errors ErrCondConnectionForced ErrCond = "amqp:connection:forced" ErrCondConnectionRedirect ErrCond = "amqp:connection:redirect" ErrCondFramingError ErrCond = "amqp:connection:framing-error" // Session Errors ErrCondErrantLink ErrCond = "amqp:session:errant-link" ErrCondHandleInUse ErrCond = "amqp:session:handle-in-use" ErrCondUnattachedHandle ErrCond = "amqp:session:unattached-handle" ErrCondWindowViolation ErrCond = "amqp:session:window-violation" // Link Errors ErrCondDetachForced ErrCond = "amqp:link:detach-forced" ErrCondLinkRedirect ErrCond = "amqp:link:redirect" ErrCondMessageSizeExceeded ErrCond = "amqp:link:message-size-exceeded" ErrCondStolen ErrCond = "amqp:link:stolen" ErrCondTransferLimitExceeded ErrCond = "amqp:link:transfer-limit-exceeded" )
Error Conditions
type Error ¶
Error is an AMQP error.
type ExpiryPolicy ¶
type ExpiryPolicy = encoding.ExpiryPolicy
ExpiryPolicy specifies when the expiry timer of a terminus starts counting down from the timeout value.
If the link is subsequently re-attached before the terminus is expired, then the count down is aborted. If the conditions for the terminus-expiry-policy are subsequently re-met, the expiry timer restarts from its originally configured timeout value.
const ( // The expiry timer starts when terminus is detached. ExpiryPolicyLinkDetach ExpiryPolicy = encoding.ExpiryLinkDetach // The expiry timer starts when the most recently // associated session is ended. ExpiryPolicySessionEnd ExpiryPolicy = encoding.ExpirySessionEnd // The expiry timer starts when most recently associated // connection is closed. ExpiryPolicyConnectionClose ExpiryPolicy = encoding.ExpiryConnectionClose // The terminus never expires. ExpiryPolicyNever ExpiryPolicy = encoding.ExpiryNever )
Expiry Policies
type LinkError ¶
type LinkError struct { // RemoteErr contains any error information provided by the peer if the peer detached the link. RemoteErr *Error // contains filtered or unexported fields }
LinkError is returned by methods on Sender/Receiver when the link has closed.
Code:play
Example¶
package main
import (
"context"
"errors"
"log"
amqp "github.com/Azure/go-amqp"
)
func main() {
// *LinkError are returned by methods on Senders/Receivers after Close() has been called.
// it can also be returned if the peer has closed the link. in this case, the *RemoteErr
// field should contain additional information about why the peer closed the link.
ctx := context.TODO()
// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer conn.Close()
// open a session
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// create a sender
sender, err := session.NewSender(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating sender link:", err)
}
// send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")), nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// now close the sender
sender.Close(ctx)
// attempt to send a message after close
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")), nil)
var linkErr *amqp.LinkError
if !errors.As(err, &linkErr) {
log.Fatalf("unexpected error type %T", err)
}
}
func (*LinkError) Error ¶
Error implements the error interface for LinkError.
func (*LinkError) Unwrap ¶
Unwrap returns the RemoteErr, if any.
type LinkFilter ¶
LinkFilter is an advanced API for setting non-standard source filters. Please file an issue or open a PR if a standard filter is missing from this library.
The name is the key for the filter map. It will be encoded as an AMQP symbol type.
The code is the descriptor of the described type value. The domain-id and descriptor-id should be concatenated together. If 0 is passed as the code, the name will be used as the descriptor.
The value is the value of the descriped types. Acceptable types for value are specific to the filter.
Example:
The standard selector-filter is defined as:
<descriptor name="apache.org:selector-filter:string" code="0x0000468C:0x00000004"/>
In this case the name is "apache.org:selector-filter:string" and the code is 0x0000468C00000004.
LinkSourceFilter("apache.org:selector-filter:string", 0x0000468C00000004, exampleValue)
References:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-filter-set http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-descriptor-values
func NewLinkFilter ¶
func NewLinkFilter(name string, code uint64, value any) LinkFilter
NewLinkFilter creates a new LinkFilter with the specified values. Any preexisting link filter with the same name will be updated with the new code and value.
func NewSelectorFilter ¶
func NewSelectorFilter(filter string) LinkFilter
NewSelectorFilter creates a new selector filter (apache.org:selector-filter:string) with the specified filter value. Any preexisting selector filter will be updated with the new filter value.
type Message ¶
type Message struct { // Message format code. // // The upper three octets of a message format code identify a particular message // format. The lowest octet indicates the version of said message format. Any // given version of a format is forwards compatible with all higher versions. Format uint32 // The DeliveryTag can be up to 32 octets of binary data. // Note that when mode one is enabled there will be no delivery tag. DeliveryTag []byte // The header section carries standard delivery details about the transfer // of a message through the AMQP network. Header *MessageHeader // The delivery-annotations section is used for delivery-specific non-standard // properties at the head of the message. Delivery annotations convey information // from the sending peer to the receiving peer. DeliveryAnnotations Annotations // The message-annotations section is used for properties of the message which // are aimed at the infrastructure. Annotations Annotations // The properties section is used for a defined set of standard properties of // the message. Properties *MessageProperties // The application-properties section is a part of the bare message used for // structured application data. Intermediaries can use the data within this // structure for the purposes of filtering or routing. ApplicationProperties map[string]any // Data payloads. // A data section contains opaque binary data. Data [][]byte // Value payload. // An amqp-value section contains a single AMQP value. // To send an AMQP null, populate with a [Null]. Value any // Sequence will contain AMQP sequence sections from the body of the message. // An amqp-sequence section contains an AMQP sequence. Sequence [][]any // The footer section is used for details about the message or delivery which // can only be calculated or evaluated once the whole bare message has been // constructed or seen (for example message hashes, HMACs, signatures and // encryption details). Annotations // contains filtered or unexported fields }
Message is an AMQP message.
func NewMessage ¶
NewMessage returns a *Message with data as the first payload in the Data field.
This constructor is intended as a helper for basic Messages with a single data payload. It is valid to construct a Message directly for more complex usages.
To create a Message using the Value or Sequence fields, don't use this constructor, create a new Message instead.
func (*Message) GetData ¶
GetData returns the first []byte from the Data field or nil if Data is empty.
func (*Message) Marshal ¶
func (*Message) MarshalBinary ¶
MarshalBinary encodes the message into binary form.
func (*Message) Unmarshal ¶
func (*Message) UnmarshalBinary ¶
UnmarshalBinary decodes the message from binary form.
type MessageHeader ¶
type MessageHeader struct { Durable bool Priority uint8 TTL time.Duration // from milliseconds FirstAcquirer bool DeliveryCount uint32 }
MessageHeader carries standard delivery details about the transfer of a message.
func (*MessageHeader) Marshal ¶
func (h *MessageHeader) Marshal(wr *buffer.Buffer) error
func (*MessageHeader) Unmarshal ¶
func (h *MessageHeader) Unmarshal(r *buffer.Buffer) error
type MessageProperties ¶
type MessageProperties struct { // Message-id, if set, uniquely identifies a message within the message system. // The message producer is usually responsible for setting the message-id in // such a way that it is assured to be globally unique. A broker MAY discard a // message as a duplicate if the value of the message-id matches that of a // previously received message sent to the same node. // // The value is restricted to the following types // - uint64, UUID, []byte, or string MessageID any // The identity of the user responsible for producing the message. // The client sets this value, and it MAY be authenticated by intermediaries. UserID []byte // The to field identifies the node that is the intended destination of the message. // On any given transfer this might not be the node at the receiving end of the link. To *string // A common field for summary information about the message content and purpose. Subject *string // The address of the node to send replies to. ReplyTo *string // This is a client-specific id that can be used to mark or identify messages // between clients. // // The value is restricted to the following types // - uint64, UUID, []byte, or string CorrelationID any // The RFC-2046 [RFC2046] MIME type for the message's application-data section // (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining // the character encoding used: e.g., 'text/plain; charset="utf-8"'. // // For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type // is unknown the content-type SHOULD NOT be set. This allows the recipient the // opportunity to determine the actual type. Where the section is known to be truly // opaque binary data, the content-type SHOULD be set to application/octet-stream. // // When using an application-data section with a section code other than data, // content-type SHOULD NOT be set. ContentType *string // The content-encoding property is used as a modifier to the content-type. // When present, its value indicates what additional content encodings have been // applied to the application-data, and thus what decoding mechanisms need to be // applied in order to obtain the media-type referenced by the content-type header // field. // // Content-encoding is primarily used to allow a document to be compressed without // losing the identity of its underlying content type. // // Content-encodings are to be interpreted as per section 3.5 of RFC 2616 [RFC2616]. // Valid content-encodings are registered at IANA [IANAHTTPPARAMS]. // // The content-encoding MUST NOT be set when the application-data section is other // than data. The binary representation of all other application-data section types // is defined completely in terms of the AMQP type system. // // Implementations MUST NOT use the identity encoding. Instead, implementations // SHOULD NOT set this property. Implementations SHOULD NOT use the compress encoding, // except as to remain compatible with messages originally sent with other protocols, // e.g. HTTP or SMTP. // // Implementations SHOULD NOT specify multiple content-encoding values except as to // be compatible with messages originally sent with other protocols, e.g. HTTP or SMTP. ContentEncoding *string // An absolute time when this message is considered to be expired. AbsoluteExpiryTime *time.Time // An absolute time when this message was created. CreationTime *time.Time // Identifies the group the message belongs to. GroupID *string // The relative position of this message within its group. // // The value is defined as a RFC-1982 sequence number GroupSequence *uint32 // This is a client-specific id that is used so that client can send replies to this // message to a specific group. ReplyToGroupID *string }
MessageProperties is the defined set of properties for AMQP messages.
func (*MessageProperties) Marshal ¶
func (p *MessageProperties) Marshal(wr *buffer.Buffer) error
func (*MessageProperties) Unmarshal ¶
func (p *MessageProperties) Unmarshal(r *buffer.Buffer) error
type ModifyMessageOptions ¶
type ModifyMessageOptions struct { // DeliveryFailed indicates that the server must consider this an // unsuccessful delivery attempt and increment the delivery count. DeliveryFailed bool // UndeliverableHere indicates that the server must not redeliver // the message to this link. UndeliverableHere bool // Annotations is an optional annotation map to be merged // with the existing message annotations, overwriting existing keys // if necessary. Annotations Annotations }
ModifyMessageOptions contains the optional parameters to ModifyMessage.
type Null ¶
type Null struct{}
Null is an AMQP null. Typically used in [Message.Value] to send a null.
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-null
func (Null) Marshal ¶
type ReceiveOptions ¶
type ReceiveOptions struct { }
ReceiveOptions contains any optional values for the Receiver.Receive method.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver receives messages on a single AMQP link.
func (*Receiver) AcceptMessage ¶
Accept notifies the server that the message has been accepted and does not require redelivery.
- ctx controls waiting for the peer to acknowledge the disposition
- msg is the message to accept
If the context's deadline expires or is cancelled before the operation completes, the message's disposition is in an unknown state.
func (*Receiver) Address ¶
Address returns the link's address.
func (*Receiver) Close ¶
Close closes the Receiver and AMQP link.
- ctx controls waiting for the peer to acknowledge the close
If the context's deadline expires or is cancelled before the operation completes, an error is returned. However, the operation will continue to execute in the background. Subsequent calls will return a *LinkError that contains the context's error message.
func (*Receiver) DrainCredit ¶
func (r *Receiver) DrainCredit(ctx context.Context, _ *DrainCreditOptions) error
DrainCredit sets the drain flag on the next outbound FLOW frame and blocks until the corresponding FLOW frame is received. While a drain is in progress, messages can continue to arrive. After a drain completes, the Receiver will have zero active credits. To begin receiving again, call IssueCredit() to add active credits to your Receiver.
You may only have a single Drain operation active, at a time.
If the context passed to DrainCredit expires or is cancelled then the receiver's issued credits should be considered ambiguous.
Returns nil if the drain has completed, error otherwise.
NOTE: The behavior of drain is optional, as per the AMQP spec. Check with your individual broker's documentation for implementation details.
func (*Receiver) IssueCredit ¶
IssueCredit adds credits to be requested in the next flow request. Attempting to issue more credit than the receiver's max credit as specified in ReceiverOptions.MaxCredit will result in an error.
func (*Receiver) LinkName ¶
LinkName returns associated link name or an empty string if link is not defined.
func (*Receiver) LinkSourceFilterValue ¶
LinkSourceFilterValue retrieves the specified link source filter value or nil if it doesn't exist.
func (*Receiver) ModifyMessage ¶
func (r *Receiver) ModifyMessage(ctx context.Context, msg *Message, options *ModifyMessageOptions) error
Modify notifies the server that the message was not acted upon and should be modifed.
- ctx controls waiting for the peer to acknowledge the disposition
- msg is the message to modify
- options contains the optional settings to modify
If the context's deadline expires or is cancelled before the operation completes, the message's disposition is in an unknown state.
func (*Receiver) Prefetched ¶
Prefetched returns the next message that is stored in the Receiver's prefetch cache. It does NOT wait for the remote sender to send messages and returns immediately if the prefetch cache is empty. To receive from the prefetch and wait for messages from the remote Sender use `Receive`.
Once a message is received, and if the sender is configured in any mode other than SenderSettleModeSettled, you *must* take an action on the message by calling one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage.
func (*Receiver) Properties ¶
Properties returns the peer's link properties. Returns nil if the peer didn't send any properties.
func (*Receiver) Receive ¶
Receive returns the next message from the sender. Blocks until a message is received, ctx completes, or an error occurs.
Once a message is received, and if the sender is configured in any mode other than SenderSettleModeSettled, you *must* take an action on the message by calling one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage.
func (*Receiver) RejectMessage ¶
Reject notifies the server that the message is invalid.
- ctx controls waiting for the peer to acknowledge the disposition
- msg is the message to reject
- e is an optional rejection error
If the context's deadline expires or is cancelled before the operation completes, the message's disposition is in an unknown state.
func (*Receiver) ReleaseMessage ¶
Release releases the message back to the server. The message may be redelivered to this or another consumer.
- ctx controls waiting for the peer to acknowledge the disposition
- msg is the message to release
If the context's deadline expires or is cancelled before the operation completes, the message's disposition is in an unknown state.
type ReceiverOptions ¶
type ReceiverOptions struct { // Capabilities is the list of extension capabilities the receiver supports. Capabilities []string // Credit specifies the maximum number of unacknowledged messages // the sender can transmit. Once this limit is reached, no more messages // will arrive until messages are acknowledged and settled. // // As messages are settled, any available credit will automatically be issued. // // Setting this to -1 requires manual management of link credit. // Credits can be added with IssueCredit(), and links can also be // drained with DrainCredit(). // This should only be enabled when complete control of the link's // flow control is required. // // Default: 1. Credit int32 // DesiredCapabilities maps to the desired-capabilities of an ATTACH frame. DesiredCapabilities []string // Durability indicates what state of the receiver will be retained durably. // // Default: DurabilityNone. Durability Durability // DynamicAddress indicates a dynamic address is to be used. // Any specified address will be ignored. // // Default: false. DynamicAddress bool // ExpiryPolicy determines when the expiry timer of the sender starts counting // down from the timeout value. If the link is subsequently re-attached before // the timeout is reached, the count down is aborted. // // Default: ExpirySessionEnd. ExpiryPolicy ExpiryPolicy // ExpiryTimeout is the duration in seconds that the sender will be retained. // // Default: 0. ExpiryTimeout uint32 // Filters contains the desired filters for this receiver. // If the peer cannot fulfill the filters the link will be detached. Filters []LinkFilter // MaxMessageSize sets the maximum message size that can // be received on the link. // // A size of zero indicates no limit. // // Default: 0. MaxMessageSize uint64 // Name sets the name of the link. // // Link names must be unique per-connection and direction. // // Default: randomly generated. Name string // Properties sets an entry in the link properties map sent to the server. Properties map[string]any // RequestedSenderSettleMode sets the requested sender settlement mode. // // If a settlement mode is explicitly set and the server does not // honor it an error will be returned during link attachment. // // Default: Accept the settlement mode set by the server, commonly ModeMixed. RequestedSenderSettleMode *SenderSettleMode // SettlementMode sets the settlement mode in use by this receiver. // // Default: ModeFirst. SettlementMode *ReceiverSettleMode // TargetAddress specifies the target address for this receiver. TargetAddress string // SourceCapabilities is the list of extension capabilities the receiver desires. SourceCapabilities []string // SourceDurability indicates what state of the peer will be retained durably. // // Default: DurabilityNone. SourceDurability Durability // SourceExpiryPolicy determines when the expiry timer of the peer starts counting // down from the timeout value. If the link is subsequently re-attached before // the timeout is reached, the count down is aborted. // // Default: ExpirySessionEnd. SourceExpiryPolicy ExpiryPolicy // SourceExpiryTimeout is the duration in seconds that the peer will be retained. // // Default: 0. SourceExpiryTimeout uint32 }
type ReceiverSettleMode ¶
type ReceiverSettleMode = encoding.ReceiverSettleMode
ReceiverSettleMode specifies how the receiver will settle messages.
const ( // Receiver is the first to consider the message as settled. // Once the corresponding disposition frame is sent, the message // is considered to be settled. ReceiverSettleModeFirst ReceiverSettleMode = encoding.ReceiverSettleModeFirst // Receiver is the second to consider the message as settled. // Once the corresponding disposition frame is sent, the settlement // is considered in-flight and the message will not be considered as // settled until the sender replies acknowledging the settlement. ReceiverSettleModeSecond ReceiverSettleMode = encoding.ReceiverSettleModeSecond )
Receiver Settlement Modes
type SASLType ¶
SASLType represents a SASL configuration to use during authentication.
func SASLTypeAnonymous ¶
func SASLTypeAnonymous() SASLType
ConnSASLAnonymous enables SASL ANONYMOUS authentication for the connection.
func SASLTypeExternal ¶
ConnSASLExternal enables SASL EXTERNAL authentication for the connection. The value for resp is dependent on the type of authentication (empty string is common for TLS). See https://datatracker.ietf.org/doc/html/rfc4422#appendix-A for additional info.
func SASLTypePlain ¶
ConnSASLPlain enables SASL PLAIN authentication for the connection.
SASL PLAIN transmits credentials in plain text and should only be used on TLS/SSL enabled connection.
func SASLTypeXOAUTH2 ¶
ConnSASLXOAUTH2 enables SASL XOAUTH2 authentication for the connection.
The saslMaxFrameSizeOverride parameter allows the limit that governs the maximum frame size this client will allow itself to generate to be raised for the sasl-init frame only. Set this when the size of the size of the SASL XOAUTH2 initial client response (which contains the username and bearer token) would otherwise breach the 512 byte min-max-frame-size (http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#definition-MIN-MAX-FRAME-SIZE). Pass -1 to keep the default.
SASL XOAUTH2 transmits the bearer in plain text and should only be used on TLS/SSL enabled connection.
type SendOptions ¶
type SendOptions struct { // Indicates the message is to be sent as settled when settlement mode is SenderSettleModeMixed. // If the settlement mode is SenderSettleModeUnsettled and Settled is true, an error is returned. Settled bool }
SendOptions contains any optional values for the Sender.Send method.
type SendReceipt ¶
type SendReceipt struct {
// contains filtered or unexported fields
}
SendReceipt is returned by Sender.SendWithReceipt and is used to defer the confirmation of settlement of a Message.
func (SendReceipt) DeliveryTag ¶
func (s SendReceipt) DeliveryTag() []byte
DeliveryTag returns the message's delivery tag that's associated with this receipt.
Can be used to correlate a message with its receipt.
func (*SendReceipt) Wait ¶
func (s *SendReceipt) Wait(ctx context.Context) (DeliveryState, error)
Wait blocks until the peer confirms message settlement or an error occurs. If the peer is configured for receiver settlement mode second, the call also blocks until the confirmation of settlement is sent.
Upon completion, one of the possible DeliveryState concrete types is returned, else nil and an error are returned.
If the context's deadline expires or is cancelled before the operation completes, the message is in an unknown state of settlement.
type SendWithReceiptOptions ¶
type SendWithReceiptOptions struct { }
SendWithReceiptOptions contains any optional values for the Sender.SendWithReceipt method.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender sends messages on a single AMQP link.
func (*Sender) Address ¶
Address returns the link's address.
func (*Sender) Close ¶
Close closes the Sender and AMQP link.
- ctx controls waiting for the peer to acknowledge the close
If the context's deadline expires or is cancelled before the operation completes, an error is returned. However, the operation will continue to execute in the background. Subsequent calls will return a *LinkError that contains the context's error message.
func (*Sender) LinkName ¶
LinkName() is the name of the link used for this Sender.
func (*Sender) MaxMessageSize ¶
MaxMessageSize is the maximum size of a single message.
func (*Sender) Properties ¶
Properties returns the peer's link properties. Returns nil if the peer didn't send any properties.
func (*Sender) Send ¶
Send sends a Message.
Blocks until the message is sent or an error occurs. If the peer is configured for receiver settlement mode second, the call also blocks until the peer confirms message settlement.
- ctx controls waiting for the message to be sent and possibly confirmed
- msg is the message to send
- opts contains optional values, pass nil to accept the defaults
If the context's deadline expires or is cancelled before the operation completes, the message is in an unknown state of transmission.
If the peer rejects the message, an error is returned.
Send is safe for concurrent use. Since only a single message can be sent on a link at a time, this is most useful when settlement confirmation has been requested (receiver settle mode is second). In this case, additional messages can be sent while the current goroutine is waiting for the confirmation.
func (*Sender) SendWithReceipt ¶
func (s *Sender) SendWithReceipt(ctx context.Context, msg *Message, opts *SendWithReceiptOptions) (SendReceipt, error)
SendWithReceipt sends a Message and returns a SendReceipt which is used to defer confirmation of settlement until SendReceipt.Wait is called. The SendReceipt can also be used to obtain the DeliveryState of the sent message. Call this method in order to rapidly send messages on a link without waiting for confirmation of settlement.
- ctx controls waiting for the message to be sent
- msg is the message to send
- opts contains optional values, pass nil to accept the defaults
If the context's deadline expires or is cancelled before the operation completes, the message is in an unknown state of transmission.
If the Sender has been configured with SenderSettleModeSettled an error is returned.
SendWithReceipt is safe for concurrent use.
Code:play
Example¶
package main
import (
"context"
"log"
amqp "github.com/Azure/go-amqp"
)
func main() {
ctx := context.TODO()
// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer conn.Close()
// open a session
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// create a sender
sender, err := session.NewSender(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating sender link:", err)
}
// send message
receipt, err := sender.SendWithReceipt(ctx, amqp.NewMessage([]byte("Hello!")), nil)
if err != nil {
log.Fatal("Sending message:", err)
}
// wait for confirmation of settlement
state, err := receipt.Wait(ctx)
if err != nil {
log.Fatal("Wait on receipt:", err)
}
// determine how the peer settled the message
switch stateType := state.(type) {
case *amqp.StateAccepted:
// message was accepted, no further action is required
case *amqp.StateModified:
// message must be modified and resent before it can be processed.
// the values in stateType provide further context.
case *amqp.StateReceived:
// see the fields in [StateReceived] for information on
// how to handle this delivery state.
case *amqp.StateRejected:
// the peer rejected the message
if stateType.Error != nil {
// the error will provide information about why the
// message was rejected. note that the peer isn't required
// to provide an error.
}
case *amqp.StateReleased:
// message was not and will not be acted upon
}
}
type SenderOptions ¶
type SenderOptions struct { // Capabilities is the list of extension capabilities the sender supports. Capabilities []string // Durability indicates what state of the sender will be retained durably. // // Default: DurabilityNone. Durability Durability // DynamicAddress indicates a dynamic address is to be used. // Any specified address will be ignored. // // Default: false. DynamicAddress bool // DesiredCapabilities maps to the desired-capabilities of an ATTACH frame. DesiredCapabilities []string // ExpiryPolicy determines when the expiry timer of the sender starts counting // down from the timeout value. If the link is subsequently re-attached before // the timeout is reached, the count down is aborted. // // Default: ExpirySessionEnd. ExpiryPolicy ExpiryPolicy // ExpiryTimeout is the duration in seconds that the sender will be retained. // // Default: 0. ExpiryTimeout uint32 // Name sets the name of the link. // // Link names must be unique per-connection and direction. // // Default: randomly generated. Name string // Properties sets an entry in the link properties map sent to the server. Properties map[string]any // RequestedReceiverSettleMode sets the requested receiver settlement mode. // // If a settlement mode is explicitly set and the server does not // honor it an error will be returned during link attachment. // // Default: Accept the settlement mode set by the server, commonly ModeFirst. RequestedReceiverSettleMode *ReceiverSettleMode // SettlementMode sets the settlement mode in use by this sender. // // Default: ModeMixed. SettlementMode *SenderSettleMode // SourceAddress specifies the source address for this sender. SourceAddress string // TargetCapabilities is the list of extension capabilities the sender desires. TargetCapabilities []string // TargetDurability indicates what state of the peer will be retained durably. // // Default: DurabilityNone. TargetDurability Durability // TargetExpiryPolicy determines when the expiry timer of the peer starts counting // down from the timeout value. If the link is subsequently re-attached before // the timeout is reached, the count down is aborted. // // Default: ExpirySessionEnd. TargetExpiryPolicy ExpiryPolicy // TargetExpiryTimeout is the duration in seconds that the peer will be retained. // // Default: 0. TargetExpiryTimeout uint32 }
type SenderSettleMode ¶
type SenderSettleMode = encoding.SenderSettleMode
SenderSettleMode specifies how the sender will settle messages.
const ( // Sender will send all deliveries initially unsettled to the receiver. SenderSettleModeUnsettled SenderSettleMode = encoding.SenderSettleModeUnsettled // Sender will send all deliveries settled to the receiver. SenderSettleModeSettled SenderSettleMode = encoding.SenderSettleModeSettled // Sender MAY send a mixture of settled and unsettled deliveries to the receiver. SenderSettleModeMixed SenderSettleMode = encoding.SenderSettleModeMixed )
Sender Settlement Modes
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is an AMQP session.
A session multiplexes Receivers.
func (*Session) Close ¶
Close closes the session.
- ctx controls waiting for the peer to acknowledge the session is closed
If the context's deadline expires or is cancelled before the operation completes, an error is returned. However, the operation will continue to execute in the background. Subsequent calls will return a *SessionError that contains the context's error message.
func (*Session) NewReceiver ¶
func (s *Session) NewReceiver(ctx context.Context, source string, opts *ReceiverOptions) (*Receiver, error)
NewReceiver opens a new receiver link on the session.
- ctx controls waiting for the peer to create a sending terminus
- source is the name of the peer's sending terminus
- opts contains optional values, pass nil to accept the defaults
If the context's deadline expires or is cancelled before the operation completes, an error is returned. If the Receiver was successfully created, it will be cleaned up in future calls to NewReceiver.
func (*Session) NewSender ¶
func (s *Session) NewSender(ctx context.Context, target string, opts *SenderOptions) (*Sender, error)
NewSender opens a new sender link on the session.
- ctx controls waiting for the peer to create a receiver terminus
- target is the name of the peer's receiver terminus
- opts contains optional values, pass nil to accept the defaults
If the context's deadline expires or is cancelled before the operation completes, an error is returned. If the Sender was successfully created, it will be cleaned up in future calls to NewSender.
func (*Session) Properties ¶
Properties returns the peer's session properties. Returns nil if the peer didn't send any properties.
type SessionError ¶
type SessionError struct { // RemoteErr contains any error information provided by the peer if the peer closed the session. RemoteErr *Error // contains filtered or unexported fields }
SessionError is returned by methods on Session and propagated to Senders/Receivers
when the session has been closed.
Code:play
Example¶
package main
import (
"context"
"errors"
"log"
amqp "github.com/Azure/go-amqp"
)
func main() {
// *SessionErrors are returned when a session has been closed.
// this error is propagated to all child Sender and Receiver instances.
ctx := context.TODO()
// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer conn.Close()
// open a session
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// create a sender
sender, err := session.NewSender(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating sender link:", err)
}
// close the session before sending the message
session.Close(ctx)
// attempt to send message on a closed session
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")), nil)
var sessionErr *amqp.SessionError
if !errors.As(err, &sessionErr) {
log.Fatalf("unexpected error type %T", err)
}
// similarly, methods on session will fail in the same way
_, err = session.NewReceiver(ctx, "/queue-name", nil)
if !errors.As(err, &sessionErr) {
log.Fatalf("unexpected error type %T", err)
}
}
func (*SessionError) Error ¶
func (e *SessionError) Error() string
Error implements the error interface for SessionError.
func (*SessionError) Unwrap ¶
func (e *SessionError) Unwrap() error
Unwrap returns the RemoteErr, if any.
type SessionOptions ¶
type SessionOptions struct { // MaxLinks sets the maximum number of links (Senders/Receivers) // allowed on the session. // // Minimum: 1. // Default: 4294967295. MaxLinks uint32 }
SessionOptions contains the optional settings for configuring an AMQP session.
type StateAccepted ¶
type StateAccepted = encoding.StateAccepted
StateAccepted indicates that an incoming message has been successfully processed, and that the receiver of the message is expecting the sender to transition the delivery to the accepted state at the source.
type StateModified ¶
type StateModified = encoding.StateModified
StateModifies indicates that a given transfer was not and will not be acted upon, and that the message SHOULD be modified in the specified ways at the node.
type StateReceived ¶
type StateReceived = encoding.StateReceived
StateReceived indicates the furthest point in the payload of the message which the target will not need to have resent if the link is resumed.
type StateRejected ¶
type StateRejected = encoding.StateRejected
StateRejected indicates that an incoming message is invalid and therefore unprocessable. The rejected outcome when applied to a message will cause the delivery-count to be incremented in the header of the rejected message.
type StateReleased ¶
type StateReleased = encoding.StateReleased
StateReleased indicates that a given transfer was not and will not be acted upon.
type Symbol ¶
Symbol is an AMQP symbolic string.
type UUID ¶
UUID is a 128 bit identifier as defined in RFC 4122.
Source Files ¶
conn.go const.go creditor.go delivery_state.go doc.go errors.go link.go link_options.go message.go receiver.go sasl.go sender.go session.go
Directories ¶
Path | Synopsis |
---|---|
internal |
- Version
- v1.4.0 (latest)
- Published
- Feb 19, 2025
- Platform
- linux/amd64
- Imports
- 19 packages
- Last checked
- 5 days ago –
Tools for package owners.