package amqpwrap

import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"

Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.

Index

Variables

var ErrConnResetNeeded = errors.New("connection must be reset, link/connection state may be inconsistent")

Functions

func WrapError

func WrapError(err error, connID uint64, linkName string, partitionID string) error

Types

type AMQPClient

type AMQPClient interface {
	Close() error
	NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)
	ID() uint64
}

type AMQPClientWrapper

type AMQPClientWrapper struct {
	ConnID uint64
	Inner  goamqpConn
}

AMQPClientWrapper is a simple interface, implemented by *AMQPClientWrapper It exists only so we can return AMQPSession, which itself only exists so we can return interfaces for AMQPSender and AMQPReceiver from AMQPSession.

func (*AMQPClientWrapper) Close

func (w *AMQPClientWrapper) Close() error

func (*AMQPClientWrapper) ID

func (w *AMQPClientWrapper) ID() uint64

func (*AMQPClientWrapper) NewSession

func (w *AMQPClientWrapper) NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)

type AMQPReceiver

type AMQPReceiver interface {
	IssueCredit(credit uint32) error
	Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
	Prefetched() *amqp.Message

	// settlement functions
	AcceptMessage(ctx context.Context, msg *amqp.Message) error
	RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
	ReleaseMessage(ctx context.Context, msg *amqp.Message) error
	ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error

	LinkName() string
	LinkSourceFilterValue(name string) any

	// Credits returns the # of credits still active on this link.
	Credits() uint32

	ConnID() uint64
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPReceiverCloser

type AMQPReceiverCloser interface {
	AMQPReceiver
	Close(ctx context.Context) error
}

AMQPReceiverCloser is implemented by *amqp.Receiver

type AMQPReceiverWrapper

type AMQPReceiverWrapper struct {
	Inner goamqpReceiver

	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPReceiverWrapper) AcceptMessage

func (rw *AMQPReceiverWrapper) AcceptMessage(ctx context.Context, msg *amqp.Message) error

settlement functions

func (*AMQPReceiverWrapper) Close

func (rw *AMQPReceiverWrapper) Close(ctx context.Context) error

func (*AMQPReceiverWrapper) ConnID

func (rw *AMQPReceiverWrapper) ConnID() uint64

func (*AMQPReceiverWrapper) Credits

func (rw *AMQPReceiverWrapper) Credits() uint32

func (*AMQPReceiverWrapper) IssueCredit

func (rw *AMQPReceiverWrapper) IssueCredit(credit uint32) error

func (*AMQPReceiverWrapper) LinkName

func (rw *AMQPReceiverWrapper) LinkName() string

func (*AMQPReceiverWrapper) LinkSourceFilterValue

func (rw *AMQPReceiverWrapper) LinkSourceFilterValue(name string) any

func (*AMQPReceiverWrapper) ModifyMessage

func (rw *AMQPReceiverWrapper) ModifyMessage(ctx context.Context, msg *amqp.Message, options *amqp.ModifyMessageOptions) error

func (*AMQPReceiverWrapper) Prefetched

func (rw *AMQPReceiverWrapper) Prefetched() *amqp.Message

func (*AMQPReceiverWrapper) Receive

func (rw *AMQPReceiverWrapper) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)

func (*AMQPReceiverWrapper) RejectMessage

func (rw *AMQPReceiverWrapper) RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error

func (*AMQPReceiverWrapper) ReleaseMessage

func (rw *AMQPReceiverWrapper) ReleaseMessage(ctx context.Context, msg *amqp.Message) error

type AMQPSender

type AMQPSender interface {
	Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
	MaxMessageSize() uint64
	LinkName() string
	ConnID() uint64
}

AMQPSender is implemented by *amqp.Sender

type AMQPSenderCloser

type AMQPSenderCloser interface {
	AMQPSender
	Close(ctx context.Context) error
}

AMQPSenderCloser is implemented by *amqp.Sender

type AMQPSenderWrapper

type AMQPSenderWrapper struct {
	Inner                goamqpSender
	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPSenderWrapper) Close

func (sw *AMQPSenderWrapper) Close(ctx context.Context) error

func (*AMQPSenderWrapper) ConnID

func (sw *AMQPSenderWrapper) ConnID() uint64

func (*AMQPSenderWrapper) LinkName

func (sw *AMQPSenderWrapper) LinkName() string

func (*AMQPSenderWrapper) MaxMessageSize

func (sw *AMQPSenderWrapper) MaxMessageSize() uint64

func (*AMQPSenderWrapper) Send

func (sw *AMQPSenderWrapper) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error

type AMQPSession

type AMQPSession interface {
	Close(ctx context.Context) error
	ConnID() uint64
	NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)
	NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
}

AMQPSession is a simple interface, implemented by *AMQPSessionWrapper. It exists only so we can return AMQPReceiver/AMQPSender interfaces.

type AMQPSessionWrapper

type AMQPSessionWrapper struct {
	Inner                goamqpSession
	ContextWithTimeoutFn ContextWithTimeoutFn
	// contains filtered or unexported fields
}

func (*AMQPSessionWrapper) Close

func (w *AMQPSessionWrapper) Close(ctx context.Context) error

func (*AMQPSessionWrapper) ConnID

func (w *AMQPSessionWrapper) ConnID() uint64

func (*AMQPSessionWrapper) NewReceiver

func (w *AMQPSessionWrapper) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (AMQPReceiverCloser, error)

func (*AMQPSessionWrapper) NewSender

func (w *AMQPSessionWrapper) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)

type ContextWithTimeoutFn

type ContextWithTimeoutFn func(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)

ContextWithTimeoutFn matches the signature for `context.WithTimeout` and is used when we want to stub things out for tests.

type Error

type Error struct {
	ConnID      uint64
	LinkName    string
	PartitionID string
	Err         error
}

Error is a wrapper that has the context of which connection and link the error happened with.

func (Error) As

func (e Error) As(target any) bool

func (Error) Error

func (e Error) Error() string

func (Error) Is

func (e Error) Is(target error) bool
type RPCLink interface {
	Close(ctx context.Context) error
	ConnID() uint64
	RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
	LinkName() string
}

RPCLink is implemented by *rpc.Link

type RPCResponse

type RPCResponse struct {
	// Code is the response code - these originate from Service Bus. Some
	// common values are called out below, with the RPCResponseCode* constants.
	Code        int
	Description string
	Message     *amqp.Message
}

RPCResponse is the simplified response structure from an RPC like call

Source Files

amqpwrap.go error.go rpc.go

Version
v1.3.0 (latest)
Published
Feb 8, 2025
Platform
linux/amd64
Imports
4 packages
Last checked
4 months ago

Tools for package owners.