package shared

import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"

Index

Constants

const (
	AttrAMQPDeliveryState string = "amqp.delivery_state"
	AttrAMQPStatusCode    string = "amqp.status_code"

	// TODO: I made these up entirely
	AttrMessageCount string = "amqp.message_count"
)
const (
	MetricStressSuccessfulCancels = "stress.cancels"
)

these metrics are specific to stress tests and wouldn't be in customer code.

Functions

func AddAuthFlags

func AddAuthFlags(fs *flag.FlagSet) func() (*azservicebus.Client, *admin.Client, error)

AddAuthFlags adds the flags needed for authenticating to Service Bus. Returns a function that can be called after the flags have been parsed, which will create the an *azservicebus.Client.

func ConstantlyUpdateQueue

func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, updateInterval time.Duration) error

ConstantlyUpdateQueue updates queue, changing the MaxDeliveryCount properly between 11 and 10, every `updateInterval` This will cause Service Bus to issue force-detaches to our links, allowing us to exercise our recovery logic.

func ForceQueueDetach

func ForceQueueDetach(ctx context.Context, adminClient *admin.Client, queue string) error

func LoadEnvironment

func LoadEnvironment() error

LoadEnvironment loads an .env file. If the env var `ENV_FILE` exists, we assume the value is a path to an .env file Otherwise we fall back to loading from the current directory.

func MustCreateAutoDeletingQueue

func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client

MustCreateAutoDeletingQueue creates a queue that will auto-delete 10 minutes after activity has ceased.

func MustCreateSubscriptions

func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func()

func MustGenerateMessages

func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)

func NewCtrlCContext

func NewCtrlCContext() (context.Context, context.CancelFunc)

NewCtrlCContext creates a context that cancels if the user hits ctrl+c.

func TrackDuration

func TrackDuration(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], name Metric) func(map[string]string)

TrackDuration tracks durations (as a metric), using the initial call to TrackDuration as the start. The duration is ended when you call the returned function. TrackDuration respects any included baggage in the context.

func TrackError

func TrackError(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], err error)

TrackError tracks an error (using the AppInsights exceptions table). TrackError respects any included baggage in the context.

NOTE: this function does not consider context cancellations/deadlines as errors.

func TrackMetric

func TrackMetric(ctx context.Context, tc *TelemetryClientWrapper[Metric, Event], name Metric, value float64, attrs map[string]string)

TrackMetric tracks metric and respects any included baggage in the context.

func UpdateBaggage

func UpdateBaggage(ctx context.Context, baggage map[string]string) map[string]string

func WithBaggage

func WithBaggage(ctx context.Context, baggage map[string]string) context.Context

Types

type Event

type Event string
const (
	EventEnd   Event = "end"
	EventStart Event = "start"
)

type Metric

type Metric string
const (
	MetricConnectionLost Metric = "messaging.servicebus.connectionlost"
	MetricMessagesSent   Metric = "messaging.servicebus.messages.sent"

	// metrics related to Service Bus sessions (NOT amqp sessions)
	MetricSessionAccept    Metric = "messaging.servicebus.session.accept"
	MetricSessionTimeoutMS Metric = "messaging.servicebus.session.timeout"

	MetricSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration"
	MetricReceiveLag                Metric = "messaging.servicebus.receiver.lag"

	MetricAMQPSendDuration Metric = "messaging.az.amqp.producer.send.duration"

	MetricAMQPMgmtRequestDuration Metric = "messaging.az.amqp.management.request.duration"

	MetricAMQPSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration"
	MetricAMQPSettlementSequenceNum     Metric = "messaging.servicebus.settlement.sequence_number"

	// TODO: I've made these up entirely.
	MetricMessageReceived Metric = "messaging.servicebus.messages.received"
	MetricMessagePeeked   Metric = "messaging.servicebus.messages.peeked"
	MetricCloseDuration   Metric = "messaging.servicebus.close.duration"
	MetricLockRenew       Metric = "messaging.servicebus.lockrenew.duration" // TODO: separate for session vs message lock?
)

These names are modeled off of the metrics from Java https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusMeter.java

and from our standard for attributes: https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92

type MustCreateSubscriptionsOptions

type MustCreateSubscriptionsOptions struct {
	Topic        *admin.CreateTopicOptions
	Subscription *admin.CreateSubscriptionOptions
}

type StreamingMessageBatch

type StreamingMessageBatch struct {
	// contains filtered or unexported fields
}

func NewStreamingMessageBatch

func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error)

func (*StreamingMessageBatch) Add

Add appends to the current batch. If it's full it'll send it, allocate a new one.

func (*StreamingMessageBatch) Close

func (sb *StreamingMessageBatch) Close(ctx context.Context) error

Close sends any messages currently held in our batch.

type StressContext

type StressContext struct {
	TC      *TelemetryClientWrapper[Metric, Event]
	Context context.Context

	// TestRunID represents the test run and can be used to tie into other container metrics generated within the test cluster.
	TestRunID string

	// Nano is the nanoseconds start time for the stress test run
	Nano string

	// Endpoint is the value from SERVICEBUS_ENDPOINT
	Endpoint string

	Cred azcore.TokenCredential
	// contains filtered or unexported fields
}

StressContext holds onto some common useful state for stress tests, including some simple stats tracking, a telemetry client and a context that represents the lifetime of the test itself (and will be cancelled if the user quits out of the stress)

func MustCreateStressContext

func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext

func (*StressContext) Assert

func (tracker *StressContext) Assert(condition bool, message string)

func (*StressContext) End

func (sc *StressContext) End()

func (*StressContext) Equal

func (tracker *StressContext) Equal(val1 any, val2 any)

func (*StressContext) Failf

func (tracker *StressContext) Failf(format string, args ...any)

func (*StressContext) LogIfFailed

func (sc *StressContext) LogIfFailed(message string, err error)

func (*StressContext) Nil

func (tracker *StressContext) Nil(val1 any)

func (*StressContext) NoError

func (tracker *StressContext) NoError(err error)

func (*StressContext) NoErrorf

func (tracker *StressContext) NoErrorf(err error, format string, args ...any)

func (*StressContext) PanicOnError

func (tracker *StressContext) PanicOnError(message string, err error)

PanicOnError logs, sends telemetry and then closes on error

func (*StressContext) Start

func (sc *StressContext) Start(entityName string, attributes map[string]string)

type StressContextOptions

type StressContextOptions struct {
	// Duration is the amount of time the stress test should run before
	// the StressContext.Context expires.
	Duration time.Duration

	// CommonBaggage will be added as part of the telemetry client, and will be included in each
	// metric/event/error that's reported.
	CommonBaggage map[string]string

	// EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry.
	EmitStartEvent bool
}

type TelemetryClientWrapper

type TelemetryClientWrapper[MetricT ~string, EventT ~string] struct {
	// contains filtered or unexported fields
}

TelemetryClientWrapper is a wrapper for telemetry client, once we get that phased back in.

func NewTelemetryClientWrapper

func NewTelemetryClientWrapper[MetricT ~string, EventT ~string]() *TelemetryClientWrapper[MetricT, EventT]

func (*TelemetryClientWrapper[MetricT, EventT]) Context

func (tc *TelemetryClientWrapper[MetricT, EventT]) Context() *TelemetryClientWrapperContext

Context returns the context that is included for each reported event or metric.

func (*TelemetryClientWrapper[MetricT, EventT]) Flush

func (tc *TelemetryClientWrapper[MetricT, EventT]) Flush()

func (*TelemetryClientWrapper[MetricT, EventT]) TrackEvent

func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackEvent(name EventT)

func (*TelemetryClientWrapper[MetricT, EventT]) TrackEventWithProps

func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackEventWithProps(name EventT, properties map[string]string)

func (*TelemetryClientWrapper[MetricT, EventT]) TrackException

func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackException(err error)

func (*TelemetryClientWrapper[MetricT, EventT]) TrackExceptionWithProps

func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackExceptionWithProps(err error, properties map[string]string)

func (*TelemetryClientWrapper[MetricT, EventT]) TrackMetricWithProps

func (tc *TelemetryClientWrapper[MetricT, EventT]) TrackMetricWithProps(name MetricT, value float64, properties map[string]string)

type TelemetryClientWrapperContext

type TelemetryClientWrapperContext struct {
	CommonProperties map[string]string
}

type TestContext

type TestContext struct {
	*StressContext
	Client *azservicebus.Client
}

type TrackingReceiver

type TrackingReceiver struct {
	// contains filtered or unexported fields
}

TrackingReceiver reports metrics and errors automatically for its methods.

func NewTrackingReceiverForQueue

func NewTrackingReceiverForQueue(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, queueName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)

func NewTrackingReceiverForSubscription

func NewTrackingReceiverForSubscription(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, topicName string, subscriptionName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)

func (*TrackingReceiver) AbandonMessage

func (*TrackingReceiver) Close

func (tr *TrackingReceiver) Close(ctx context.Context) error

func (*TrackingReceiver) CompleteMessage

func (*TrackingReceiver) PeekMessages

func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *azservicebus.PeekMessagesOptions) ([]*azservicebus.ReceivedMessage, error)

func (*TrackingReceiver) ReceiveMessages

func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)

func (*TrackingReceiver) RenewMessageLock

type TrackingSender

type TrackingSender struct {
	// contains filtered or unexported fields
}

TrackingSender reports metrics and errors automatically for its methods.

func NewTrackingSender

func NewTrackingSender(tc *TelemetryClientWrapper[Metric, Event], client *azservicebus.Client, queueOrTopic string, options *azservicebus.NewSenderOptions) (*TrackingSender, error)

func (*TrackingSender) Close

func (ts *TrackingSender) Close(ctx context.Context) error

func (*TrackingSender) NewMessageBatch

func (*TrackingSender) SendMessage

func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error

func (*TrackingSender) SendMessageBatch

Source Files

metric_names.go streaming_batch.go stress_context.go telemetry_client_wrapper.go tracking_receiver.go tracking_sender.go utils.go

Version
v1.8.0 (latest)
Published
Feb 8, 2025
Platform
linux/amd64
Imports
16 packages
Last checked
4 months ago

Tools for package owners.