package test

import "cloud.google.com/go/pubsublite/internal/test"

Index

Functions

func ErrorEqual

func ErrorEqual(got, want error) bool

ErrorEqual compares two errors for equivalence.

func ErrorHasCode

func ErrorHasCode(got error, wantCode codes.Code) bool

ErrorHasCode returns true if an error has the desired canonical code.

func ErrorHasMsg

func ErrorHasMsg(got error, wantStr string) bool

ErrorHasMsg returns true if an error message contains the desired substring.

func MakeAny

func MakeAny(msg proto.Message) *anypb.Any

MakeAny packs a message into an Any proto.

func RandomLiteZone

func RandomLiteZone() string

RandomLiteZone chooses a random Pub/Sub Lite zone for integration tests.

Types

type Barrier

type Barrier struct {
	// contains filtered or unexported fields
}

Barrier is used to perform two-way synchronization betwen the server and client (test) to ensure tests are deterministic.

func (*Barrier) Release

func (b *Barrier) Release()

Release should be called by the test.

func (*Barrier) ReleaseAfter

func (b *Barrier) ReleaseAfter(f func())

ReleaseAfter releases the barrier, after invoking f provided by the test.

type Condition

type Condition struct {
	// contains filtered or unexported fields
}

Condition allows tests to wait for some event to occur, or check that it has not occurred.

func NewCondition

func NewCondition(name string) *Condition

NewCondition creates a new condition.

func (*Condition) SetDone

func (c *Condition) SetDone()

SetDone marks the condition as done.

func (*Condition) VerifyNotDone

func (c *Condition) VerifyNotDone(t *testing.T)

VerifyNotDone checks that the condition is not done.

func (*Condition) WaitUntilDone

func (c *Condition) WaitUntilDone(t *testing.T, duration time.Duration)

WaitUntilDone waits up to the specified duration for the condition to be marked done.

type DuplicateMsgDetector

type DuplicateMsgDetector struct {
	// contains filtered or unexported fields
}

DuplicateMsgDetector can be used to detect duplicate messages, either due to duplicate publishes or receives.

func NewDuplicateMsgDetector

func NewDuplicateMsgDetector() *DuplicateMsgDetector

NewDuplicateMsgDetector creates a new DuplicateMsgDetector.

func (*DuplicateMsgDetector) HasPublishDuplicates

func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool

HasPublishDuplicates returns true if duplicate published messages were detected.

func (*DuplicateMsgDetector) HasReceiveDuplicates

func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool

HasReceiveDuplicates returns true if duplicate received messages were detected.

func (*DuplicateMsgDetector) Receive

func (dm *DuplicateMsgDetector) Receive(data string, offset int64)

Receive checks the given message data and offset.

func (*DuplicateMsgDetector) Status

func (dm *DuplicateMsgDetector) Status() string

Status returns a non-empty status string if there were duplicates detected.

type FakeSource

type FakeSource struct {
	Ret int64
}

FakeSource is a fake source that returns a configurable constant.

func (*FakeSource) Int63

func (f *FakeSource) Int63() int64

Int63 returns the configured fake random number.

func (*FakeSource) Seed

func (f *FakeSource) Seed(seed int64)

Seed is unimplemented.

type MockServer

type MockServer interface {
	// OnTestStart must be called at the start of each test to clear any existing
	// state and set the test verifiers.
	OnTestStart(*Verifiers)
	// OnTestEnd should be called at the end of each test to flush the verifiers
	// (i.e. check whether any expected requests were not sent to the server).
	OnTestEnd()
}

MockServer is an in-memory mock implementation of a Pub/Sub Lite service, which allows unit tests to inspect requests received by the server and send fake responses. This is the interface that should be used by tests.

type MsgTracker

type MsgTracker struct {
	// contains filtered or unexported fields
}

MsgTracker is a helper for checking whether a set of messages make a full round trip from publisher to subscriber.

Add() registers published messages. Remove() should be called when messages are received by subscribers. Call Wait() to block until all tracked messages are received. The same MsgTracker instance can be reused to repeat this sequence for multiple test cycles.

Add() and Remove() calls should not be interleaved.

func NewMsgTracker

func NewMsgTracker() *MsgTracker

NewMsgTracker creates a new message tracker.

func (*MsgTracker) Add

func (mt *MsgTracker) Add(msgs ...string)

Add a set of tracked messages.

func (*MsgTracker) Empty

func (mt *MsgTracker) Empty() bool

Empty returns true if there are no tracked messages remaining.

func (*MsgTracker) Remove

func (mt *MsgTracker) Remove(msg string) bool

Remove and return true if `msg` is tracked. Signals the `done` channel once all messages have been received.

func (*MsgTracker) Status

func (mt *MsgTracker) Status() error

Status returns an error if there are tracked messages remaining.

func (*MsgTracker) Wait

func (mt *MsgTracker) Wait(timeout time.Duration) error

Wait up to `timeout` to receive all tracked messages.

type OrderingReceiver

type OrderingReceiver struct {
	// contains filtered or unexported fields
}

OrderingReceiver consumes a message string generated by OrderingSender and verifies that messages in a partition are ordered. It is used in conjunction with Subscribers.

func NewOrderingReceiver

func NewOrderingReceiver() *OrderingReceiver

NewOrderingReceiver creates a new OrderingReceiver.

func (*OrderingReceiver) Receive

func (or *OrderingReceiver) Receive(data, key string) error

Receive checks the given message data and key and returns an error if unordered messages are detected.

Note: a normal scenario resulting in unordered messages is when the Publish stream breaks while there are in-flight batches, which are resent upon stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a test.

type OrderingSender

type OrderingSender struct {
	TotalMsgCount int64
}

OrderingSender generates strings containing a message index to use for verifying message ordering. It is used on conjunction with Publishers.

func NewOrderingSender

func NewOrderingSender() *OrderingSender

NewOrderingSender creats a new OrderingSender.

func (*OrderingSender) Next

func (os *OrderingSender) Next(prefix string) string

Next generates the next string to publish.

type RPCVerifier

type RPCVerifier struct {
	// contains filtered or unexported fields
}

RPCVerifier stores an queue of requests expected from the client, and the corresponding response or error to return.

func NewRPCVerifier

func NewRPCVerifier(t *testing.T) *RPCVerifier

NewRPCVerifier creates a new verifier for requests received by the server.

func (*RPCVerifier) Flush

func (v *RPCVerifier) Flush()

Flush logs an error for any remaining {request, response, error} tuples, in case the client terminated early.

func (*RPCVerifier) Pop

func (v *RPCVerifier) Pop(gotRequest interface{}) (interface{}, error)

Pop validates the received request with the next {request, response, error} tuple.

func (*RPCVerifier) Push

func (v *RPCVerifier) Push(wantRequest interface{}, retResponse interface{}, retErr error)

Push appends a new {request, response, error} tuple.

Valid combinations for unary and streaming RPCs: - {request, response, nil} - {request, nil, error}

Additional combinations for streams only: - {nil, response, nil}: send a response without a request (e.g. messages). - {nil, nil, error}: break the stream without a request. - {request, nil, nil}: expect a request, but don't send any response.

func (*RPCVerifier) PushWithBarrier

func (v *RPCVerifier) PushWithBarrier(wantRequest interface{}, retResponse interface{}, retErr error) *Barrier

PushWithBarrier is like Push, but returns a barrier that the test should call Release when it would like the response to be sent to the client. This is useful for synchronizing with work that needs to be done on the client.

func (*RPCVerifier) TryPop

func (v *RPCVerifier) TryPop() (bool, interface{}, error)

TryPop should be used only for streams. It checks whether the request in the next tuple is nil, in which case the response or error should be returned to the client without waiting for a request. Useful for streams where the server continuously sends data (e.g. subscribe stream).

type Server

type Server struct {
	LiteServer MockServer
	// contains filtered or unexported fields
}

Server is a mock Pub/Sub Lite server that can be used for unit testing.

func NewServer

func NewServer() (*Server, error)

NewServer creates a new mock Pub/Sub Lite server.

func (*Server) ClientConn

func (s *Server) ClientConn() option.ClientOption

ClientConn creates a client connection to the gRPC test server.

func (*Server) Close

func (s *Server) Close()

Close shuts down the server and releases all resources.

type Verifiers

type Verifiers struct {

	// Global list of verifiers for all unary RPCs.
	GlobalVerifier *RPCVerifier
	// contains filtered or unexported fields
}

Verifiers contains RPCVerifiers for unary RPCs and streaming RPCs.

func NewVerifiers

func NewVerifiers(t *testing.T) *Verifiers

NewVerifiers creates a new instance of Verifiers for a test.

func (*Verifiers) AddAssignmentStream

func (tv *Verifiers) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)

AddAssignmentStream adds verifiers for an assignment stream.

func (*Verifiers) AddCommitStream

func (tv *Verifiers) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)

AddCommitStream adds verifiers for a commit stream.

func (*Verifiers) AddPublishStream

func (tv *Verifiers) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)

AddPublishStream adds verifiers for a publish stream.

func (*Verifiers) AddSubscribeStream

func (tv *Verifiers) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)

AddSubscribeStream adds verifiers for a subscribe stream.

Source Files

condition.go mock.go msg_tracker.go msg_validators.go util.go verifier.go zones.go

Version
v1.8.2 (latest)
Published
Jun 5, 2024
Platform
linux/amd64
Imports
27 packages
Last checked
6 days ago

Tools for package owners.