package amqpwrap
import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Index ¶
- Variables
- func WrapError(err error, connID uint64, linkName string, partitionID string) error
- type AMQPClient
- type AMQPClientWrapper
- func (w *AMQPClientWrapper) Close() error
- func (w *AMQPClientWrapper) ID() uint64
- func (w *AMQPClientWrapper) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPReceiverWrapper
- func (rw *AMQPReceiverWrapper) AcceptMessage(ctx context.Context, msg *amqp.Message) error
- func (rw *AMQPReceiverWrapper) Close(ctx context.Context) error
- func (rw *AMQPReceiverWrapper) ConnID() uint64
- func (rw *AMQPReceiverWrapper) Credits() uint32
- func (rw *AMQPReceiverWrapper) IssueCredit(credit uint32) error
- func (rw *AMQPReceiverWrapper) LinkName() string
- func (rw *AMQPReceiverWrapper) LinkSourceFilterValue(name string) any
- func (rw *AMQPReceiverWrapper) ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error
- func (rw *AMQPReceiverWrapper) Prefetched() *amqp.Message
- func (rw *AMQPReceiverWrapper) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
- func (rw *AMQPReceiverWrapper) RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
- func (rw *AMQPReceiverWrapper) ReleaseMessage(ctx context.Context, msg *amqp.Message) error
- type AMQPSender
- type AMQPSenderCloser
- type AMQPSenderWrapper
- func (sw *AMQPSenderWrapper) Close(ctx context.Context) error
- func (sw *AMQPSenderWrapper) ConnID() uint64
- func (sw *AMQPSenderWrapper) LinkName() string
- func (sw *AMQPSenderWrapper) MaxMessageSize() uint64
- func (sw *AMQPSenderWrapper) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
- type AMQPSession
- type AMQPSessionWrapper
- func (w *AMQPSessionWrapper) Close(ctx context.Context) error
- func (w *AMQPSessionWrapper) ConnID() uint64
- func (w *AMQPSessionWrapper) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)
- func (w *AMQPSessionWrapper) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
- type ContextWithTimeoutFn
- type Error
- func (e Error) As(target any) bool
- func (e Error) Error() string
- func (e Error) Is(target error) bool
- type RPCLink
- type RPCResponse
Variables ¶
var ErrConnResetNeeded = errors.New("connection must be reset, link/connection state may be inconsistent")
Functions ¶
func WrapError ¶
Types ¶
type AMQPClient ¶
type AMQPClient interface { Close() error NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error) ID() uint64 }
type AMQPClientWrapper ¶
type AMQPClientWrapper struct { ConnID uint64 Inner goamqpConn }
AMQPClientWrapper is a simple interface, implemented by *AMQPClientWrapper It exists only so we can return AMQPSession, which itself only exists so we can return interfaces for AMQPSender and AMQPReceiver from AMQPSession.
func (*AMQPClientWrapper) Close ¶
func (w *AMQPClientWrapper) Close() error
func (*AMQPClientWrapper) ID ¶
func (w *AMQPClientWrapper) ID() uint64
func (*AMQPClientWrapper) NewSession ¶
func (w *AMQPClientWrapper) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)
type AMQPReceiver ¶
type AMQPReceiver interface { IssueCredit(credit uint32) error Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) Prefetched() *amqp.Message // settlement functions AcceptMessage(ctx context.Context, msg *amqp.Message) error RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error ReleaseMessage(ctx context.Context, msg *amqp.Message) error ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error LinkName() string LinkSourceFilterValue(name string) any // Credits returns the # of credits still active on this link. Credits() uint32 ConnID() uint64 }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser interface { AMQPReceiver Close(ctx context.Context) error }
AMQPReceiverCloser is implemented by *amqp.Receiver
type AMQPReceiverWrapper ¶
type AMQPReceiverWrapper struct { Inner goamqpReceiver ContextWithTimeoutFn ContextWithTimeoutFn // contains filtered or unexported fields }
func (*AMQPReceiverWrapper) AcceptMessage ¶
func (rw *AMQPReceiverWrapper) AcceptMessage(ctx context.Context, msg *amqp.Message) error
settlement functions
func (*AMQPReceiverWrapper) Close ¶
func (rw *AMQPReceiverWrapper) Close(ctx context.Context) error
func (*AMQPReceiverWrapper) ConnID ¶
func (rw *AMQPReceiverWrapper) ConnID() uint64
func (*AMQPReceiverWrapper) Credits ¶
func (rw *AMQPReceiverWrapper) Credits() uint32
func (*AMQPReceiverWrapper) IssueCredit ¶
func (rw *AMQPReceiverWrapper) IssueCredit(credit uint32) error
func (*AMQPReceiverWrapper) LinkName ¶
func (rw *AMQPReceiverWrapper) LinkName() string
func (*AMQPReceiverWrapper) LinkSourceFilterValue ¶
func (rw *AMQPReceiverWrapper) LinkSourceFilterValue(name string) any
func (*AMQPReceiverWrapper) ModifyMessage ¶
func (rw *AMQPReceiverWrapper) ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error
func (*AMQPReceiverWrapper) Prefetched ¶
func (rw *AMQPReceiverWrapper) Prefetched() *amqp.Message
func (*AMQPReceiverWrapper) Receive ¶
func (rw *AMQPReceiverWrapper) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
func (*AMQPReceiverWrapper) RejectMessage ¶
func (rw *AMQPReceiverWrapper) RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
func (*AMQPReceiverWrapper) ReleaseMessage ¶
func (rw *AMQPReceiverWrapper) ReleaseMessage(ctx context.Context, msg *amqp.Message) error
type AMQPSender ¶
type AMQPSender interface { Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error MaxMessageSize() uint64 LinkName() string ConnID() uint64 }
AMQPSender is implemented by *amqp.Sender
type AMQPSenderCloser ¶
type AMQPSenderCloser interface { AMQPSender Close(ctx context.Context) error }
AMQPSenderCloser is implemented by *amqp.Sender
type AMQPSenderWrapper ¶
type AMQPSenderWrapper struct { Inner goamqpSender ContextWithTimeoutFn ContextWithTimeoutFn // contains filtered or unexported fields }
func (*AMQPSenderWrapper) Close ¶
func (sw *AMQPSenderWrapper) Close(ctx context.Context) error
func (*AMQPSenderWrapper) ConnID ¶
func (sw *AMQPSenderWrapper) ConnID() uint64
func (*AMQPSenderWrapper) LinkName ¶
func (sw *AMQPSenderWrapper) LinkName() string
func (*AMQPSenderWrapper) MaxMessageSize ¶
func (sw *AMQPSenderWrapper) MaxMessageSize() uint64
func (*AMQPSenderWrapper) Send ¶
func (sw *AMQPSenderWrapper) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
type AMQPSession ¶
type AMQPSession interface { Close(ctx context.Context) error ConnID() uint64 NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error) }
AMQPSession is a simple interface, implemented by *AMQPSessionWrapper. It exists only so we can return AMQPReceiver/AMQPSender interfaces.
type AMQPSessionWrapper ¶
type AMQPSessionWrapper struct { Inner goamqpSession ContextWithTimeoutFn ContextWithTimeoutFn // contains filtered or unexported fields }
func (*AMQPSessionWrapper) Close ¶
func (w *AMQPSessionWrapper) Close(ctx context.Context) error
func (*AMQPSessionWrapper) ConnID ¶
func (w *AMQPSessionWrapper) ConnID() uint64
func (*AMQPSessionWrapper) NewReceiver ¶
func (w *AMQPSessionWrapper) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)
func (*AMQPSessionWrapper) NewSender ¶
func (w *AMQPSessionWrapper) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
type ContextWithTimeoutFn ¶
type ContextWithTimeoutFn func(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
ContextWithTimeoutFn matches the signature for `context.WithTimeout` and is used when we want to stub things out for tests.
type Error ¶
Error is a wrapper that has the context of which connection and link the error happened with.
func (Error) As ¶
func (Error) Error ¶
func (Error) Is ¶
type RPCLink ¶
type RPCLink interface { Close(ctx context.Context) error ConnID() uint64 RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error) LinkName() string }
RPCLink is implemented by *rpc.Link
type RPCResponse ¶
type RPCResponse struct { // Code is the response code - these originate from Service Bus. Some // common values are called out below, with the RPCResponseCode* constants. Code int Description string Message *amqp.Message }
RPCResponse is the simplified response structure from an RPC like call
Source Files ¶
- Version
- v1.3.0 (latest)
- Published
- Feb 8, 2025
- Platform
- linux/amd64
- Imports
- 4 packages
- Last checked
- 4 months ago –
Tools for package owners.