package internal
import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func IsCancelError(err error) bool
- func IsFatalEHError(err error) bool
- func IsNotAllowedError(err error) bool
- func IsOwnershipLostError(err error) bool
- func IsQuickRecoveryError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
- func TransformError(err error) error
- type AMQPLink
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type Closeable
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) Credits() uint32
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) LinkName() string
- func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
- type FakeAMQPSender
- type FakeAMQPSession
- func (sess *FakeAMQPSession) Close(ctx context.Context) error
- func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
- func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
- type FakeNSForPartClient
- func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *FakeNSForPartClient) Recover(ctx context.Context, clientRevision uint64) error
- type LinkRetrier
- func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error
- func (l LinkRetrier[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn RetryCallback[LinkT]) error
- type LinkWithID
- type Links
- func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]
- func (l *Links[LinkT]) Close(ctx context.Context) error
- func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)
- func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[amqpwrap.RPCLink], error)
- func (l *Links[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error
- func (l *Links[LinkT]) RetryManagement(ctx context.Context, eventName azlog.Event, operation string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[amqpwrap.RPCLink]) error) error
- type LinksForPartitionClient
- type Namespace
- func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(ctx context.Context, permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) error
- type NamespaceForAMQPLinks
- type NamespaceForManagementOps
- type NamespaceForProducerOrConsumer
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithCustomEndpoint(customEndpoint string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.WebSocketConnParams) (net.Conn, error)) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NewLinksFn
- type RPCError
- type RPCLinkArgs
- type RPCLinkOption
- type RecoveryKind
- type RetryCallback
Constants ¶
const Version = "v1.3.0"
Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
Functions ¶
func IsCancelError ¶
func IsFatalEHError ¶
func IsNotAllowedError ¶
func IsOwnershipLostError ¶
func IsQuickRecoveryError ¶
func NegotiateClaim ¶
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶
func NewRPCLink ¶
NewRPCLink will build a new request response link
func TransformError ¶
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPLink ¶
type AMQPReceiver ¶
type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender ¶
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser ¶
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct { amqpwrap.AMQPReceiverCloser // ActiveCredits are incremented and decremented by IssueCredit and Receive. ActiveCredits int32 // IssuedCredit just accumulates, so we can get an idea of how many credits we issued overall. IssuedCredit []uint32 // CreditsSetFromOptions is similar to issuedCredit, but only tracks credits added in via the LinkOptions.Credit // field (ie, enabling prefetch). CreditsSetFromOptions int32 // ManualCreditsSetFromOptions is the value of the LinkOptions.ManualCredits value. ManualCreditsSetFromOptions bool Messages []*amqp.Message NameForLink string CloseCalled int CloseError error }
func (*FakeAMQPReceiver) Close ¶
func (r *FakeAMQPReceiver) Close(ctx context.Context) error
func (*FakeAMQPReceiver) Credits ¶
func (r *FakeAMQPReceiver) Credits() uint32
func (*FakeAMQPReceiver) IssueCredit ¶
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) LinkName ¶
func (r *FakeAMQPReceiver) LinkName() string
func (*FakeAMQPReceiver) Receive ¶
func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
type FakeAMQPSender ¶
type FakeAMQPSender struct { amqpwrap.AMQPSenderCloser CloseCalled int CloseError error }
func (*FakeAMQPSender) Close ¶
func (s *FakeAMQPSender) Close(ctx context.Context) error
type FakeAMQPSession ¶
type FakeAMQPSession struct { amqpwrap.AMQPSession NS *FakeNSForPartClient CloseCalled int }
func (*FakeAMQPSession) Close ¶
func (sess *FakeAMQPSession) Close(ctx context.Context) error
func (*FakeAMQPSession) NewReceiver ¶
func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
func (*FakeAMQPSession) NewSender ¶
func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
type FakeNSForPartClient ¶
type FakeNSForPartClient struct { NamespaceForAMQPLinks Receiver *FakeAMQPReceiver NewReceiverErr error NewReceiverCalled int Sender *FakeAMQPSender NewSenderErr error NewSenderCalled int RecoverFn func(ctx context.Context, clientRevision uint64) error }
func (*FakeNSForPartClient) NegotiateClaim ¶
func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
func (*FakeNSForPartClient) NewAMQPSession ¶
func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
func (*FakeNSForPartClient) Recover ¶
func (ns *FakeNSForPartClient) Recover(ctx context.Context, clientRevision uint64) error
type LinkRetrier ¶
type LinkRetrier[LinkT AMQPLink] struct { // GetLink is set to [Links.GetLink] GetLink func(ctx context.Context, partitionID string) (LinkWithID[LinkT], error) // CloseLink is set to [Links.closePartitionLinkIfMatch] CloseLink func(ctx context.Context, partitionID string, linkName string) error // NSRecover is set to [Namespace.Recover] NSRecover func(ctx context.Context, connID uint64) error }
func (LinkRetrier[LinkT]) RecoverIfNeeded ¶
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error
RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..) NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.
func (LinkRetrier[LinkT]) Retry ¶
func (l LinkRetrier[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn RetryCallback[LinkT]) error
Retry runs the fn argument in a loop, respecting retry counts. If connection/link failures occur it also takes care of running recovery logic to bring them back, or return an appropriate error if retries are exhausted.
type LinkWithID ¶
type LinkWithID[LinkT AMQPLink] interface { ConnID() uint64 Link() LinkT PartitionID() string Close(ctx context.Context) error String() string }
LinkWithID is a readonly interface over the top of a linkState.
type Links ¶
type Links[LinkT AMQPLink] struct { // contains filtered or unexported fields }
func NewLinks ¶
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]
func (*Links[LinkT]) Close ¶
func (*Links[LinkT]) GetLink ¶
func (*Links[LinkT]) GetManagementLink ¶
func (*Links[LinkT]) Retry ¶
func (l *Links[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error
func (*Links[LinkT]) RetryManagement ¶
func (l *Links[LinkT]) RetryManagement(ctx context.Context, eventName azlog.Event, operation string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[amqpwrap.RPCLink]) error) error
type LinksForPartitionClient ¶
type LinksForPartitionClient[LinkT AMQPLink] interface { // Retry is [Links.Retry] Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error // Close is [Links.Close] Close(ctx context.Context) error }
LinksForPartitionClient are the functions that the PartitionClient uses within Links[T] (for unit testing only)
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // NOTE: exported only so it can be checked in a test RetryOptions exported.RetryOptions // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) Close ¶
Close closes the current cached client.
func (*Namespace) GetAMQPClientImpl ¶
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetTokenForEntity ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
func (*Namespace) Recover ¶
Recover destroys the currently held AMQP connection and recreates it, if needed.
NOTE: cancelling the context only cancels the initialization of a new AMQP connection - the previous connection is always closed.
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error) GetEntityAudience(entityPath string) string // Recover destroys the currently held AMQP connection and recreates it, if needed. // // NOTE: cancelling the context only cancels the initialization of a new AMQP // connection - the previous connection is always closed. Recover(ctx context.Context, clientRevision uint64) error Close(ctx context.Context, permanently bool) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForManagementOps ¶
type NamespaceForManagementOps interface { NamespaceForAMQPLinks GetTokenForEntity(eventHub string) (*auth.Token, error) }
type NamespaceForProducerOrConsumer ¶
type NamespaceForProducerOrConsumer = NamespaceForManagementOps
TODO: might just consolidate.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Event Hub namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string
func NamespaceWithCustomEndpoint ¶
func NamespaceWithCustomEndpoint(customEndpoint string) NamespaceOption
NamespaceWithCustomEndpoint sets a custom endpoint, useful for when you're connecting through a TCP proxy. When establishing a TCP connection we connect to this address. The audience is extracted from the fullyQualifiedNamespace given to NamespaceWithTokenCredential or the endpoint in the connection string passed to NamespaceWithConnectionString.
func NamespaceWithRetryOptions ¶
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.WebSocketConnParams) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface { Check() error }
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewLinksFn ¶
type NewLinksFn[LinkT AMQPLink] func(ctx context.Context, session amqpwrap.AMQPSession, entityPath string, partitionID string) (LinkT, error)
type RPCError ¶
type RPCError struct { Resp *amqpwrap.RPCResponse Message string }
RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
func (RPCError) Error ¶
Error is a string representation of the error.
func (RPCError) RPCCode ¶
RPCCode is the code that comes back in the rpc response. This code is intended for programs toreact to programatically.
type RPCLinkArgs ¶
type RPCLinkArgs struct { Client amqpwrap.AMQPClient Address string LogEvent azlog.Event }
type RPCLinkOption ¶
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RecoveryKind ¶
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
type RetryCallback ¶
type RetryCallback[LinkT AMQPLink] func(ctx context.Context, lwid LinkWithID[LinkT]) error
Source Files ¶
amqpInterfaces.go amqp_fakes.go cbs.go constants.go errors.go links.go links_recover.go namespace.go namespace_eh.go rpc.go
Directories ¶
Path | Synopsis |
---|---|
internal/amqpwrap | Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
internal/auth | Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
internal/eh | |
internal/eh/stress | |
internal/eh/stress/shared | |
internal/eh/stress/tests | |
internal/exported | |
internal/mock | Package mock is a generated GoMock package. |
internal/sas | |
internal/sbauth | |
internal/test | |
internal/utils |
- Version
- v1.3.0 (latest)
- Published
- Feb 8, 2025
- Platform
- linux/amd64
- Imports
- 21 packages
- Last checked
- 4 months ago –
Tools for package owners.