package wire

import "cloud.google.com/go/pubsublite/internal/wire"

Index

Constants

const (
	// MaxPublishRequestCount is the maximum number of messages that can be
	// batched in a single publish request.
	MaxPublishRequestCount = 1000

	// MaxPublishRequestBytes is the maximum allowed serialized size of a single
	// publish request (containing a batch of messages) in bytes. Must be lower
	// than the gRPC limit of 4 MiB.
	MaxPublishRequestBytes int = 3.5 * 1024 * 1024

	// MinTimeout is the minimum timeout value that can be set for publisher and
	// subscriber settings.
	MinTimeout = 2 * time.Minute
)

Variables

var (
	// ErrOverflow indicates that the publish buffers have overflowed. See
	// comments for PublishSettings.BufferedByteLimit.
	ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

	// ErrOversizedMessage indicates that the user published a message over the
	// allowed serialized byte size limit. It is wrapped in another error.
	ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)

	// ErrServiceUninitialized indicates that a service (e.g. publisher or
	// subscriber) cannot perform an operation because it is uninitialized.
	ErrServiceUninitialized = errors.New("pubsublite: service must be started")

	// ErrServiceStarting indicates that a service (e.g. publisher or subscriber)
	// cannot perform an operation because it is starting up.
	ErrServiceStarting = errors.New("pubsublite: service is starting up")

	// ErrServiceStopped indicates that a service (e.g. publisher or subscriber)
	// cannot perform an operation because it has stopped or is in the process of
	// stopping.
	ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")

	// ErrBackendUnavailable indicates that the backend service has been
	// unavailable for a period of time. The timeout can be configured using
	// PublishSettings.Timeout or ReceiveSettings.Timeout.
	ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable")
)

Errors exported from this package.

var DefaultPublishSettings = PublishSettings{
	DelayThreshold: 10 * time.Millisecond,
	CountThreshold: 100,
	ByteThreshold:  1e6,
	Timeout:        7 * 24 * time.Hour,

	BufferedByteLimit: 1 << 30,
	EnableIdempotence: true,
	ConfigPollPeriod:  10 * time.Minute,
}

DefaultPublishSettings holds the default values for PublishSettings.

var DefaultReceiveSettings = ReceiveSettings{
	MaxOutstandingMessages: 1000,
	MaxOutstandingBytes:    1e9,
	Timeout:                7 * 24 * time.Hour,
}

DefaultReceiveSettings holds the default values for ReceiveSettings.

Functions

func LocationToRegion

func LocationToRegion(location string) (string, error)

LocationToRegion returns the region that the given location is in.

func NewAdminClient

func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error)

NewAdminClient creates a new gapic AdminClient for a region.

func ValidateRegion

func ValidateRegion(input string) error

ValidateRegion verifies that the `input` string has the format of a valid Google Cloud region. An example region is "europe-west1". See https://cloud.google.com/compute/docs/regions-zones for more information.

Types

type AckConsumer

type AckConsumer interface {
	Ack()
}

AckConsumer is the interface exported from this package for acking messages.

type FrameworkType

type FrameworkType string

FrameworkType is the user-facing API for Cloud Pub/Sub Lite.

const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"

FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.

type LocationPath

type LocationPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
	Location string
}

LocationPath stores a path consisting of a project and zone/region.

func ParseLocationPath

func ParseLocationPath(input string) (LocationPath, error)

ParseLocationPath parses a project/location path.

func (LocationPath) String

func (l LocationPath) String() string

type MessageMetadata

type MessageMetadata struct {
	// The topic partition the message was published to.
	Partition int

	// The offset the message was assigned.
	Offset int64
}

MessageMetadata holds properties of a message published to the Pub/Sub Lite service.

NOTE: This is duplicated in the pscompat package in order to generate nicer docs and should be kept consistent.

func (*MessageMetadata) String

func (m *MessageMetadata) String() string

type MessageReceiverFunc

type MessageReceiverFunc func(*ReceivedMessage)

MessageReceiverFunc receives a Pub/Sub message from a topic partition.

type PartitionSet

type PartitionSet map[int]struct{}

PartitionSet is a set of partition numbers.

func NewPartitionSet

func NewPartitionSet(partitions []int) PartitionSet

NewPartitionSet creates a partition set initialized from the given partition numbers.

func (PartitionSet) Contains

func (ps PartitionSet) Contains(partition int) bool

Contains returns true if this set contains the specified partition.

func (PartitionSet) Ints

func (ps PartitionSet) Ints() (partitions []int)

Ints returns the partitions contained in this set as an unsorted slice.

func (PartitionSet) SortedInts

func (ps PartitionSet) SortedInts() (partitions []int)

SortedInts returns the partitions contained in this set as a sorted slice.

type PublishResultFunc

type PublishResultFunc func(*MessageMetadata, error)

PublishResultFunc receives the result of a publish.

type PublishSettings

type PublishSettings struct {
	// Publish a non-empty batch after this delay has passed. Must be > 0.
	DelayThreshold time.Duration

	// Publish a batch when it has this many messages. Must be > 0. The maximum is
	// MaxPublishRequestCount.
	CountThreshold int

	// Publish a batch when its size in bytes reaches this value. Must be > 0. The
	// maximum is MaxPublishRequestBytes.
	ByteThreshold int

	// The maximum time that the client will attempt to establish a publish stream
	// connection to the server. Must be >= 2 minutes.
	//
	// The timeout is exceeded, the publisher will terminate with the last error
	// that occurred while trying to reconnect. Note that if the timeout duration
	// is long, ErrOverflow may occur first.
	Timeout time.Duration

	// The maximum number of bytes that the publisher will keep in memory before
	// returning ErrOverflow. Must be > 0.
	//
	// Note that Pub/Sub Lite topics are provisioned a publishing throughput
	// capacity, per partition, shared by all publisher clients. Setting a large
	// buffer size can mitigate transient publish spikes. However, consistently
	// attempting to publish messages at a much higher rate than the publishing
	// throughput capacity can cause the buffers to overflow. For more
	// information, see https://cloud.google.com/pubsub/lite/docs/topics.
	BufferedByteLimit int

	// The polling interval to watch for topic partition count updates. Set to 0
	// to disable polling if the number of partitions will never update.
	ConfigPollPeriod time.Duration

	// Whether idempotence is enabled, where the server will ensure that unique
	// messages within a single publisher session are stored only once.
	EnableIdempotence bool

	// The user-facing API type.
	Framework FrameworkType
}

PublishSettings control the batching of published messages. These settings apply per partition.

type Publisher

type Publisher interface {
	Publish(*pb.PubSubMessage, PublishResultFunc)

	Start()
	WaitStarted() error
	Stop()
	WaitStopped() error
	Error() error
}

Publisher is the client interface exported from this package for publishing messages.

func NewPublisher

func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPath string, opts ...option.ClientOption) (Publisher, error)

NewPublisher creates a new client for publishing messages.

type ReassignmentHandlerFunc

type ReassignmentHandlerFunc func(before, after PartitionSet) error

ReassignmentHandlerFunc receives a partition assignment change.

type ReceiveSettings

type ReceiveSettings struct {
	// MaxOutstandingMessages is the maximum number of unacknowledged messages.
	// Must be > 0.
	MaxOutstandingMessages int

	// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
	// messages. Must be > 0.
	MaxOutstandingBytes int

	// The maximum time that the client will attempt to establish a subscribe
	// stream connection to the server. Must be >= 2 minutes.
	//
	// The timeout is exceeded, the subscriber will terminate with the last error
	// that occurred while trying to reconnect.
	Timeout time.Duration

	// The topic partition numbers (zero-indexed) to receive messages from.
	// Values must be less than the number of partitions for the topic. If not
	// specified, the client will use the partition assignment service to
	// determine which partitions it should connect to.
	Partitions []int

	// The user-facing API type.
	Framework FrameworkType
}

ReceiveSettings control the receiving of messages. These settings apply per partition.

type ReceivedMessage

type ReceivedMessage struct {
	Msg       *pb.SequencedMessage
	Ack       AckConsumer
	Partition int
}

ReceivedMessage stores a received Pub/Sub message and AckConsumer for acknowledging the message.

type ReservationPath

type ReservationPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud region. An example region is "us-central1".
	Region string

	// The ID of the Pub/Sub Lite reservation, for example "my-reservation-name".
	ReservationID string
}

ReservationPath stores the full path of a Pub/Sub Lite reservation.

func ParseReservationPath

func ParseReservationPath(input string) (ReservationPath, error)

ParseReservationPath parses the full path of a Pub/Sub Lite reservation.

func (ReservationPath) Location

func (r ReservationPath) Location() LocationPath

Location returns the reservation's location path.

func (ReservationPath) String

func (r ReservationPath) String() string

type Subscriber

type Subscriber interface {
	Start()
	WaitStarted() error
	Stop()
	WaitStopped() error
	Terminate()
	PartitionActive(int) bool
}

Subscriber is the client interface exported from this package for receiving messages.

func NewSubscriber

func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, reassignmentHandler ReassignmentHandlerFunc,
	region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error)

NewSubscriber creates a new client for receiving messages.

type SubscriptionPath

type SubscriptionPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
	Location string

	// The ID of the Pub/Sub Lite subscription, for example
	// "my-subscription-name".
	SubscriptionID string
}

SubscriptionPath stores the full path of a Pub/Sub Lite subscription.

func ParseSubscriptionPath

func ParseSubscriptionPath(input string) (SubscriptionPath, error)

ParseSubscriptionPath parses the full path of a Pub/Sub Lite subscription.

func (SubscriptionPath) LocationPath

func (s SubscriptionPath) LocationPath() LocationPath

LocationPath returns the subscription's location path.

func (SubscriptionPath) String

func (s SubscriptionPath) String() string

type TopicPath

type TopicPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
	Location string

	// The ID of the Pub/Sub Lite topic, for example "my-topic-name".
	TopicID string
}

TopicPath stores the full path of a Pub/Sub Lite topic.

func ParseTopicPath

func ParseTopicPath(input string) (TopicPath, error)

ParseTopicPath parses the full path of a Pub/Sub Lite topic.

func (TopicPath) LocationPath

func (t TopicPath) LocationPath() LocationPath

LocationPath returns the topic's location path.

func (TopicPath) String

func (t TopicPath) String() string

Source Files

acks.go assigner.go committer.go errors.go flow_control.go message_router.go partition_count.go periodic_task.go publish_batcher.go publisher.go request_timer.go resources.go rpc.go service.go settings.go streams.go subscriber.go version.go

Version
v1.8.2 (latest)
Published
Jun 5, 2024
Platform
linux/amd64
Imports
34 packages
Last checked
6 days ago

Tools for package owners.