package wire
import "cloud.google.com/go/pubsublite/internal/wire"
Index ¶
- Constants
- Variables
- func LocationToRegion(location string) (string, error)
- func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error)
- func ValidateRegion(input string) error
- type AckConsumer
- type FrameworkType
- type LocationPath
- type MessageMetadata
- type MessageReceiverFunc
- type PartitionSet
- func NewPartitionSet(partitions []int) PartitionSet
- func (ps PartitionSet) Contains(partition int) bool
- func (ps PartitionSet) Ints() (partitions []int)
- func (ps PartitionSet) SortedInts() (partitions []int)
- type PublishResultFunc
- type PublishSettings
- type Publisher
- type ReassignmentHandlerFunc
- type ReceiveSettings
- type ReceivedMessage
- type ReservationPath
- func ParseReservationPath(input string) (ReservationPath, error)
- func (r ReservationPath) Location() LocationPath
- func (r ReservationPath) String() string
- type Subscriber
- type SubscriptionPath
- func ParseSubscriptionPath(input string) (SubscriptionPath, error)
- func (s SubscriptionPath) LocationPath() LocationPath
- func (s SubscriptionPath) String() string
- type TopicPath
Constants ¶
const ( // MaxPublishRequestCount is the maximum number of messages that can be // batched in a single publish request. MaxPublishRequestCount = 1000 // MaxPublishRequestBytes is the maximum allowed serialized size of a single // publish request (containing a batch of messages) in bytes. Must be lower // than the gRPC limit of 4 MiB. MaxPublishRequestBytes int = 3.5 * 1024 * 1024 // MinTimeout is the minimum timeout value that can be set for publisher and // subscriber settings. MinTimeout = 2 * time.Minute )
Variables ¶
var ( // ErrOverflow indicates that the publish buffers have overflowed. See // comments for PublishSettings.BufferedByteLimit. ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed") // ErrOversizedMessage indicates that the user published a message over the // allowed serialized byte size limit. It is wrapped in another error. ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes) // ErrServiceUninitialized indicates that a service (e.g. publisher or // subscriber) cannot perform an operation because it is uninitialized. ErrServiceUninitialized = errors.New("pubsublite: service must be started") // ErrServiceStarting indicates that a service (e.g. publisher or subscriber) // cannot perform an operation because it is starting up. ErrServiceStarting = errors.New("pubsublite: service is starting up") // ErrServiceStopped indicates that a service (e.g. publisher or subscriber) // cannot perform an operation because it has stopped or is in the process of // stopping. ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping") // ErrBackendUnavailable indicates that the backend service has been // unavailable for a period of time. The timeout can be configured using // PublishSettings.Timeout or ReceiveSettings.Timeout. = errors.New("pubsublite: backend service is unavailable") )
Errors exported from this package.
var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, Timeout: 7 * 24 * time.Hour, BufferedByteLimit: 1 << 30, EnableIdempotence: true, ConfigPollPeriod: 10 * time.Minute, }
DefaultPublishSettings holds the default values for PublishSettings.
var DefaultReceiveSettings = ReceiveSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, Timeout: 7 * 24 * time.Hour, }
DefaultReceiveSettings holds the default values for ReceiveSettings.
Functions ¶
func LocationToRegion ¶
LocationToRegion returns the region that the given location is in.
func NewAdminClient ¶
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error)
NewAdminClient creates a new gapic AdminClient for a region.
func ValidateRegion ¶
ValidateRegion verifies that the `input` string has the format of a valid Google Cloud region. An example region is "europe-west1". See https://cloud.google.com/compute/docs/regions-zones for more information.
Types ¶
type AckConsumer ¶
type AckConsumer interface {
Ack()
}
AckConsumer is the interface exported from this package for acking messages.
type FrameworkType ¶
type FrameworkType string
FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"
FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.
type LocationPath ¶
type LocationPath struct { // A Google Cloud project. The project ID (e.g. "my-project") or the project // number (e.g. "987654321") can be provided. Project string // A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1"). Location string }
LocationPath stores a path consisting of a project and zone/region.
func ParseLocationPath ¶
func ParseLocationPath(input string) (LocationPath, error)
ParseLocationPath parses a project/location path.
func (LocationPath) String ¶
func (l LocationPath) String() string
type MessageMetadata ¶
type MessageMetadata struct { // The topic partition the message was published to. Partition int // The offset the message was assigned. Offset int64 }
MessageMetadata holds properties of a message published to the Pub/Sub Lite service.
NOTE: This is duplicated in the pscompat package in order to generate nicer docs and should be kept consistent.
func (*MessageMetadata) String ¶
func (m *MessageMetadata) String() string
type MessageReceiverFunc ¶
type MessageReceiverFunc func(*ReceivedMessage)
MessageReceiverFunc receives a Pub/Sub message from a topic partition.
type PartitionSet ¶
type PartitionSet map[int]struct{}
PartitionSet is a set of partition numbers.
func NewPartitionSet ¶
func NewPartitionSet(partitions []int) PartitionSet
NewPartitionSet creates a partition set initialized from the given partition numbers.
func (PartitionSet) Contains ¶
func (ps PartitionSet) Contains(partition int) bool
Contains returns true if this set contains the specified partition.
func (PartitionSet) Ints ¶
func (ps PartitionSet) Ints() (partitions []int)
Ints returns the partitions contained in this set as an unsorted slice.
func (PartitionSet) SortedInts ¶
func (ps PartitionSet) SortedInts() (partitions []int)
SortedInts returns the partitions contained in this set as a sorted slice.
type PublishResultFunc ¶
type PublishResultFunc func(*MessageMetadata, error)
PublishResultFunc receives the result of a publish.
type PublishSettings ¶
type PublishSettings struct { // Publish a non-empty batch after this delay has passed. Must be > 0. DelayThreshold time.Duration // Publish a batch when it has this many messages. Must be > 0. The maximum is // MaxPublishRequestCount. CountThreshold int // Publish a batch when its size in bytes reaches this value. Must be > 0. The // maximum is MaxPublishRequestBytes. ByteThreshold int // The maximum time that the client will attempt to establish a publish stream // connection to the server. Must be >= 2 minutes. // // The timeout is exceeded, the publisher will terminate with the last error // that occurred while trying to reconnect. Note that if the timeout duration // is long, ErrOverflow may occur first. Timeout time.Duration // The maximum number of bytes that the publisher will keep in memory before // returning ErrOverflow. Must be > 0. // // Note that Pub/Sub Lite topics are provisioned a publishing throughput // capacity, per partition, shared by all publisher clients. Setting a large // buffer size can mitigate transient publish spikes. However, consistently // attempting to publish messages at a much higher rate than the publishing // throughput capacity can cause the buffers to overflow. For more // information, see https://cloud.google.com/pubsub/lite/docs/topics. BufferedByteLimit int // The polling interval to watch for topic partition count updates. Set to 0 // to disable polling if the number of partitions will never update. ConfigPollPeriod time.Duration // Whether idempotence is enabled, where the server will ensure that unique // messages within a single publisher session are stored only once. EnableIdempotence bool // The user-facing API type. Framework FrameworkType }
PublishSettings control the batching of published messages. These settings apply per partition.
type Publisher ¶
type Publisher interface { Publish(*pb.PubSubMessage, PublishResultFunc) Start() WaitStarted() error Stop() WaitStopped() error Error() error }
Publisher is the client interface exported from this package for publishing messages.
func NewPublisher ¶
func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPath string, opts ...option.ClientOption) (Publisher, error)
NewPublisher creates a new client for publishing messages.
type ReassignmentHandlerFunc ¶
type ReassignmentHandlerFunc func(before, after PartitionSet) error
ReassignmentHandlerFunc receives a partition assignment change.
type ReceiveSettings ¶
type ReceiveSettings struct { // MaxOutstandingMessages is the maximum number of unacknowledged messages. // Must be > 0. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged // messages. Must be > 0. MaxOutstandingBytes int // The maximum time that the client will attempt to establish a subscribe // stream connection to the server. Must be >= 2 minutes. // // The timeout is exceeded, the subscriber will terminate with the last error // that occurred while trying to reconnect. Timeout time.Duration // The topic partition numbers (zero-indexed) to receive messages from. // Values must be less than the number of partitions for the topic. If not // specified, the client will use the partition assignment service to // determine which partitions it should connect to. Partitions []int // The user-facing API type. Framework FrameworkType }
ReceiveSettings control the receiving of messages. These settings apply per partition.
type ReceivedMessage ¶
type ReceivedMessage struct { Msg *pb.SequencedMessage Ack AckConsumer Partition int }
ReceivedMessage stores a received Pub/Sub message and AckConsumer for acknowledging the message.
type ReservationPath ¶
type ReservationPath struct { // A Google Cloud project. The project ID (e.g. "my-project") or the project // number (e.g. "987654321") can be provided. Project string // A Google Cloud region. An example region is "us-central1". Region string // The ID of the Pub/Sub Lite reservation, for example "my-reservation-name". ReservationID string }
ReservationPath stores the full path of a Pub/Sub Lite reservation.
func ParseReservationPath ¶
func ParseReservationPath(input string) (ReservationPath, error)
ParseReservationPath parses the full path of a Pub/Sub Lite reservation.
func (ReservationPath) Location ¶
func (r ReservationPath) Location() LocationPath
Location returns the reservation's location path.
func (ReservationPath) String ¶
func (r ReservationPath) String() string
type Subscriber ¶
type Subscriber interface { Start() WaitStarted() error Stop() WaitStopped() error Terminate() PartitionActive(int) bool }
Subscriber is the client interface exported from this package for receiving messages.
func NewSubscriber ¶
func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, reassignmentHandler ReassignmentHandlerFunc, region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error)
NewSubscriber creates a new client for receiving messages.
type SubscriptionPath ¶
type SubscriptionPath struct { // A Google Cloud project. The project ID (e.g. "my-project") or the project // number (e.g. "987654321") can be provided. Project string // A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1"). Location string // The ID of the Pub/Sub Lite subscription, for example // "my-subscription-name". SubscriptionID string }
SubscriptionPath stores the full path of a Pub/Sub Lite subscription.
func ParseSubscriptionPath ¶
func ParseSubscriptionPath(input string) (SubscriptionPath, error)
ParseSubscriptionPath parses the full path of a Pub/Sub Lite subscription.
func (SubscriptionPath) LocationPath ¶
func (s SubscriptionPath) LocationPath() LocationPath
LocationPath returns the subscription's location path.
func (SubscriptionPath) String ¶
func (s SubscriptionPath) String() string
type TopicPath ¶
type TopicPath struct { // A Google Cloud project. The project ID (e.g. "my-project") or the project // number (e.g. "987654321") can be provided. Project string // A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1"). Location string // The ID of the Pub/Sub Lite topic, for example "my-topic-name". TopicID string }
TopicPath stores the full path of a Pub/Sub Lite topic.
func ParseTopicPath ¶
ParseTopicPath parses the full path of a Pub/Sub Lite topic.
func (TopicPath) LocationPath ¶
func (t TopicPath) LocationPath() LocationPath
LocationPath returns the topic's location path.
func (TopicPath) String ¶
Source Files ¶
acks.go assigner.go committer.go errors.go flow_control.go message_router.go partition_count.go periodic_task.go publish_batcher.go publisher.go request_timer.go resources.go rpc.go service.go settings.go streams.go subscriber.go version.go
- Version
- v1.8.2 (latest)
- Published
- Jun 5, 2024
- Platform
- linux/amd64
- Imports
- 34 packages
- Last checked
- 6 days ago –
Tools for package owners.