package shared
import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
Index ¶
- Constants
- func AddAuthFlags(fs *flag.FlagSet) func() (*azservicebus.Client, *admin.Client, error)
- func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, updateInterval time.Duration) error
- func ForceQueueDetach(ctx context.Context, adminClient *admin.Client, queue string) error
- func LoadEnvironment() error
- func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client
- func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func()
- func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)
- func NewCtrlCContext() (context.Context, context.CancelFunc)
- func TrackDuration(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], name Metric) func(map[string]string)
- func TrackError(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], err error)
- func TrackMetric(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], name Metric, value float64, attrs map[string]string)
- func UpdateBaggage(ctx context.Context, baggage map[string]string) map[string]string
- func WithBaggage(ctx context.Context, baggage map[string]string) context.Context
- type Event
- type Metric
- type MustCreateSubscriptionsOptions
- type StreamingMessageBatch
- func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error)
- func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Message, options *azservicebus.AddMessageOptions) error
- func (sb *StreamingMessageBatch) Close(ctx context.Context) error
- type StressContext
- func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext
- func (tracker *StressContext) Assert(condition bool, message string)
- func (sc *StressContext) End()
- func (tracker *StressContext) Equal(val1 any, val2 any)
- func (tracker *StressContext) Failf(format string, args ...any)
- func (sc *StressContext) LogIfFailed(message string, err error)
- func (tracker *StressContext) Nil(val1 any)
- func (tracker *StressContext) NoError(err error)
- func (tracker *StressContext) NoErrorf(err error, format string, args ...any)
- func (tracker *StressContext) PanicOnError(message string, err error)
- func (sc *StressContext) Start(entityName string, attributes map[string]string)
- type StressContextOptions
- type TelemetryClientWrapper
- func NewTelemetryClientWrapper[MetricT ~string, EventT ~string]() *TelemetryClientWrapper[MetricT, EventT]
- func (tc *TelemetryClientWrapper[MetricT, EventT]) Context() *TelemetryClientWrapperContext
- func (tc *TelemetryClientWrapper[MetricT, EventT]) Flush()
- func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackEvent(name EventT)
- func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackEventWithProps(name EventT, properties map[string]string)
- func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackException(err error)
- func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackExceptionWithProps(err error, properties map[string]string)
- func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackMetricWithProps(name MetricT, value float64, properties map[string]string)
- type TelemetryClientWrapperContext
- type TestContext
- type TrackingReceiver
- func NewTrackingReceiverForQueue(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, queueName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)
- func NewTrackingReceiverForSubscription(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, topicName string, subscriptionName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)
- func (tr *TrackingReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error
- func (tr *TrackingReceiver) Close(ctx context.Context) error
- func (tr *TrackingReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error
- func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *azservicebus.PeekMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
- func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
- func (tr *TrackingReceiver) RenewMessageLock(ctx context.Context, msg *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
- type TrackingSender
- func NewTrackingSender(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, queueOrTopic string, options *azservicebus.NewSenderOptions) (*TrackingSender, error)
- func (ts *TrackingSender) Close(ctx context.Context) error
- func (ts *TrackingSender) NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
- func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
- func (ts *TrackingSender) SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error
Constants ¶
const ( AttrAMQPDeliveryState string = "amqp.delivery_state" AttrAMQPStatusCode string = "amqp.status_code" // TODO: I made these up entirely AttrMessageCount string = "amqp.message_count" )
const (
MetricStressSuccessfulCancels = "stress.cancels"
)
these metrics are specific to stress tests and wouldn't be in customer code.
Functions ¶
func AddAuthFlags ¶
AddAuthFlags adds the flags needed for authenticating to Service Bus. Returns a function that can be called after the flags have been parsed, which will create the an *azservicebus.Client.
func ConstantlyUpdateQueue ¶
func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, updateInterval time.Duration) error
ConstantlyUpdateQueue updates queue, changing the MaxDeliveryCount properly between 11 and 10, every `updateInterval` This will cause Service Bus to issue force-detaches to our links, allowing us to exercise our recovery logic.
func ForceQueueDetach ¶
func LoadEnvironment ¶
func LoadEnvironment() error
LoadEnvironment loads an .env file. If the env var `ENV_FILE` exists, we assume the value is a path to an .env file Otherwise we fall back to loading from the current directory.
func MustCreateAutoDeletingQueue ¶
func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client
MustCreateAutoDeletingQueue creates a queue that will auto-delete 10 minutes after activity has ceased.
func MustCreateSubscriptions ¶
func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func()
func MustGenerateMessages ¶
func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)
func NewCtrlCContext ¶
func NewCtrlCContext() (context.Context, context.CancelFunc)
NewCtrlCContext creates a context that cancels if the user hits ctrl+c.
func TrackDuration ¶
func TrackDuration(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], name Metric) func(map[string]string)
TrackDuration tracks durations (as a metric), using the initial call to TrackDuration as the start. The duration is ended when you call the returned function. TrackDuration respects any included baggage in the context.
func TrackError ¶
TrackError tracks an error (using the AppInsights exceptions table). TrackError respects any included baggage in the context.
NOTE: this function does not consider context cancellations/deadlines as errors.
func TrackMetric ¶
func TrackMetric(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], name Metric, value float64, attrs map[string]string)
TrackMetric tracks metric and respects any included baggage in the context.
func UpdateBaggage ¶
func WithBaggage ¶
Types ¶
type Event ¶
type Event string
type Metric ¶
type Metric string
const ( MetricConnectionLost Metric = "messaging.servicebus.connectionlost" MetricMessagesSent Metric = "messaging.servicebus.messages.sent" // metrics related to Service Bus sessions (NOT amqp sessions) MetricSessionAccept Metric = "messaging.servicebus.session.accept" MetricSessionTimeoutMS Metric = "messaging.servicebus.session.timeout" MetricSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration" MetricReceiveLag Metric = "messaging.servicebus.receiver.lag" MetricAMQPSendDuration Metric = "messaging.az.amqp.producer.send.duration" MetricAMQPMgmtRequestDuration Metric = "messaging.az.amqp.management.request.duration" MetricAMQPSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration" MetricAMQPSettlementSequenceNum Metric = "messaging.servicebus.settlement.sequence_number" // TODO: I've made these up entirely. MetricMessageReceived Metric = "messaging.servicebus.messages.received" MetricMessagePeeked Metric = "messaging.servicebus.messages.peeked" MetricCloseDuration Metric = "messaging.servicebus.close.duration" MetricLockRenew Metric = "messaging.servicebus.lockrenew.duration" // TODO: separate for session vs message lock? )
These names are modeled off of the metrics from Java https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusMeter.java
and from our standard for attributes: https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92
type MustCreateSubscriptionsOptions ¶
type MustCreateSubscriptionsOptions struct { Topic *admin.CreateTopicOptions Subscription *admin.CreateSubscriptionOptions }
type StreamingMessageBatch ¶
type StreamingMessageBatch struct {
// contains filtered or unexported fields
}
func NewStreamingMessageBatch ¶
func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error)
func (*StreamingMessageBatch) Add ¶
func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Message, options *azservicebus.AddMessageOptions) error
Add appends to the current batch. If it's full it'll send it, allocate a new one.
func (*StreamingMessageBatch) Close ¶
func (sb *StreamingMessageBatch) Close(ctx context.Context) error
Close sends any messages currently held in our batch.
type StressContext ¶
type StressContext struct { TC *TelemetryClientWrapper[Metric, Event] Context context.Context // TestRunID represents the test run and can be used to tie into other container metrics generated within the test cluster. TestRunID string // Nano is the nanoseconds start time for the stress test run Nano string // Endpoint is the value from SERVICEBUS_ENDPOINT Endpoint string Cred azcore.TokenCredential // contains filtered or unexported fields }
StressContext holds onto some common useful state for stress tests, including some simple stats tracking, a telemetry client and a context that represents the lifetime of the test itself (and will be cancelled if the user quits out of the stress)
func MustCreateStressContext ¶
func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext
func (*StressContext) Assert ¶
func (tracker *StressContext) Assert(condition bool, message string)
func (*StressContext) End ¶
func (sc *StressContext) End()
func (*StressContext) Equal ¶
func (tracker *StressContext) Equal(val1 any, val2 any)
func (*StressContext) Failf ¶
func (tracker *StressContext) Failf(format string, args ...any)
func (*StressContext) LogIfFailed ¶
func (sc *StressContext) LogIfFailed(message string, err error)
func (*StressContext) Nil ¶
func (tracker *StressContext) Nil(val1 any)
func (*StressContext) NoError ¶
func (tracker *StressContext) NoError(err error)
func (*StressContext) NoErrorf ¶
func (tracker *StressContext) NoErrorf(err error, format string, args ...any)
func (*StressContext) PanicOnError ¶
func (tracker *StressContext) PanicOnError(message string, err error)
PanicOnError logs, sends telemetry and then closes on error
func (*StressContext) Start ¶
func (sc *StressContext) Start(entityName string, attributes map[string]string)
type StressContextOptions ¶
type StressContextOptions struct { // Duration is the amount of time the stress test should run before // the StressContext.Context expires. Duration time.Duration // CommonBaggage will be added as part of the telemetry client, and will be included in each // metric/event/error that's reported. CommonBaggage map[string]string // EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry. EmitStartEvent bool }
type TelemetryClientWrapper ¶
type TelemetryClientWrapper[MetricT ~string, EventT ~string] struct { // contains filtered or unexported fields }
TelemetryClientWrapper is a wrapper for telemetry client, once we get that phased back in.
func NewTelemetryClientWrapper ¶
func NewTelemetryClientWrapper[MetricT ~string, EventT ~string]() *TelemetryClientWrapper[MetricT, EventT]
func (*TelemetryClientWrapper[MetricT, EventT]) Context ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) Context() *TelemetryClientWrapperContext
Context returns the context that is included for each reported event or metric.
func (*TelemetryClientWrapper[MetricT, EventT]) Flush ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) Flush()
func (*TelemetryClientWrapper[MetricT, EventT]) TrackEvent ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackEvent(name EventT)
func (*TelemetryClientWrapper[MetricT, EventT]) TrackEventWithProps ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackEventWithProps(name EventT, properties map[string]string)
func (*TelemetryClientWrapper[MetricT, EventT]) TrackException ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackException(err error)
func (*TelemetryClientWrapper[MetricT, EventT]) TrackExceptionWithProps ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackExceptionWithProps(err error, properties map[string]string)
func (*TelemetryClientWrapper[MetricT, EventT]) TrackMetricWithProps ¶
func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackMetricWithProps(name MetricT, value float64, properties map[string]string)
type TelemetryClientWrapperContext ¶
type TestContext ¶
type TestContext struct { *StressContext Client *azservicebus.Client }
type TrackingReceiver ¶
type TrackingReceiver struct {
// contains filtered or unexported fields
}
TrackingReceiver reports metrics and errors automatically for its methods.
func NewTrackingReceiverForQueue ¶
func NewTrackingReceiverForQueue(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, queueName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)
func NewTrackingReceiverForSubscription ¶
func NewTrackingReceiverForSubscription(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, topicName string, subscriptionName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)
func (*TrackingReceiver) AbandonMessage ¶
func (tr *TrackingReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error
func (*TrackingReceiver) Close ¶
func (tr *TrackingReceiver) Close(ctx context.Context) error
func (*TrackingReceiver) CompleteMessage ¶
func (tr *TrackingReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error
func (*TrackingReceiver) PeekMessages ¶
func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *azservicebus.PeekMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
func (*TrackingReceiver) ReceiveMessages ¶
func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
func (*TrackingReceiver) RenewMessageLock ¶
func (tr *TrackingReceiver) RenewMessageLock(ctx context.Context, msg *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
type TrackingSender ¶
type TrackingSender struct {
// contains filtered or unexported fields
}
TrackingSender reports metrics and errors automatically for its methods.
func NewTrackingSender ¶
func NewTrackingSender(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, queueOrTopic string, options *azservicebus.NewSenderOptions) (*TrackingSender, error)
func (*TrackingSender) Close ¶
func (ts *TrackingSender) Close(ctx context.Context) error
func (*TrackingSender) NewMessageBatch ¶
func (ts *TrackingSender) NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
func (*TrackingSender) SendMessage ¶
func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
func (*TrackingSender) SendMessageBatch ¶
func (ts *TrackingSender) SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error
Source Files ¶
metric_names.go streaming_batch.go stress_context.go telemetry_client_wrapper.go tracking_receiver.go tracking_sender.go utils.go
- Version
- v1.8.0 (latest)
- Published
- Feb 8, 2025
- Platform
- linux/amd64
- Imports
- 16 packages
- Last checked
- 4 months ago –
Tools for package owners.