package test
import "cloud.google.com/go/pubsublite/internal/test"
Index ¶
- func ErrorEqual(got, want error) bool
- func ErrorHasCode(got error, wantCode codes.Code) bool
- func ErrorHasMsg(got error, wantStr string) bool
- func MakeAny(msg proto.Message) *anypb.Any
- func RandomLiteZone() string
- type Barrier
- type Condition
- func NewCondition(name string) *Condition
- func (c *Condition) SetDone()
- func (c *Condition) VerifyNotDone(t *testing.T)
- func (c *Condition) WaitUntilDone(t *testing.T, duration time.Duration)
- type DuplicateMsgDetector
- func NewDuplicateMsgDetector() *DuplicateMsgDetector
- func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool
- func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool
- func (dm *DuplicateMsgDetector) Receive(data string, offset int64)
- func (dm *DuplicateMsgDetector) Status() string
- type FakeSource
- type MockServer
- type MsgTracker
- func NewMsgTracker() *MsgTracker
- func (mt *MsgTracker) Add(msgs ...string)
- func (mt *MsgTracker) Empty() bool
- func (mt *MsgTracker) Remove(msg string) bool
- func (mt *MsgTracker) Status() error
- func (mt *MsgTracker) Wait(timeout time.Duration) error
- type OrderingReceiver
- func NewOrderingReceiver() *OrderingReceiver
- func (or *OrderingReceiver) Receive(data, key string) error
- type OrderingSender
- type RPCVerifier
- func NewRPCVerifier(t *testing.T) *RPCVerifier
- func (v *RPCVerifier) Flush()
- func (v *RPCVerifier) Pop(gotRequest interface{}) (interface{}, error)
- func (v *RPCVerifier) Push(wantRequest interface{}, retResponse interface{}, retErr error)
- func (v *RPCVerifier) PushWithBarrier(wantRequest interface{}, retResponse interface{}, retErr error) *Barrier
- func (v *RPCVerifier) TryPop() (bool, interface{}, error)
- type Server
- func NewServer() (*Server, error)
- func (s *Server) ClientConn() option.ClientOption
- func (s *Server) Close()
- type Verifiers
- func NewVerifiers(t *testing.T) *Verifiers
- func (tv *Verifiers) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
- func (tv *Verifiers) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
- func (tv *Verifiers) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
- func (tv *Verifiers) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
Functions ¶
func ErrorEqual ¶
ErrorEqual compares two errors for equivalence.
func ErrorHasCode ¶
ErrorHasCode returns true if an error has the desired canonical code.
func ErrorHasMsg ¶
ErrorHasMsg returns true if an error message contains the desired substring.
func MakeAny ¶
MakeAny packs a message into an Any proto.
func RandomLiteZone ¶
func RandomLiteZone() string
RandomLiteZone chooses a random Pub/Sub Lite zone for integration tests.
Types ¶
type Barrier ¶
type Barrier struct {
// contains filtered or unexported fields
}
Barrier is used to perform two-way synchronization betwen the server and client (test) to ensure tests are deterministic.
func (*Barrier) Release ¶
func (b *Barrier) Release()
Release should be called by the test.
func (*Barrier) ReleaseAfter ¶
func (b *Barrier) ReleaseAfter(f func())
ReleaseAfter releases the barrier, after invoking f provided by the test.
type Condition ¶
type Condition struct {
// contains filtered or unexported fields
}
Condition allows tests to wait for some event to occur, or check that it has not occurred.
func NewCondition ¶
NewCondition creates a new condition.
func (*Condition) SetDone ¶
func (c *Condition) SetDone()
SetDone marks the condition as done.
func (*Condition) VerifyNotDone ¶
VerifyNotDone checks that the condition is not done.
func (*Condition) WaitUntilDone ¶
WaitUntilDone waits up to the specified duration for the condition to be marked done.
type DuplicateMsgDetector ¶
type DuplicateMsgDetector struct {
// contains filtered or unexported fields
}
DuplicateMsgDetector can be used to detect duplicate messages, either due to duplicate publishes or receives.
func NewDuplicateMsgDetector ¶
func NewDuplicateMsgDetector() *DuplicateMsgDetector
NewDuplicateMsgDetector creates a new DuplicateMsgDetector.
func (*DuplicateMsgDetector) HasPublishDuplicates ¶
func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool
HasPublishDuplicates returns true if duplicate published messages were detected.
func (*DuplicateMsgDetector) HasReceiveDuplicates ¶
func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool
HasReceiveDuplicates returns true if duplicate received messages were detected.
func (*DuplicateMsgDetector) Receive ¶
func (dm *DuplicateMsgDetector) Receive(data string, offset int64)
Receive checks the given message data and offset.
func (*DuplicateMsgDetector) Status ¶
func (dm *DuplicateMsgDetector) Status() string
Status returns a non-empty status string if there were duplicates detected.
type FakeSource ¶
type FakeSource struct { Ret int64 }
FakeSource is a fake source that returns a configurable constant.
func (*FakeSource) Int63 ¶
func (f *FakeSource) Int63() int64
Int63 returns the configured fake random number.
func (*FakeSource) Seed ¶
func (f *FakeSource) Seed(seed int64)
Seed is unimplemented.
type MockServer ¶
type MockServer interface { // OnTestStart must be called at the start of each test to clear any existing // state and set the test verifiers. OnTestStart(*Verifiers) // OnTestEnd should be called at the end of each test to flush the verifiers // (i.e. check whether any expected requests were not sent to the server). OnTestEnd() }
MockServer is an in-memory mock implementation of a Pub/Sub Lite service, which allows unit tests to inspect requests received by the server and send fake responses. This is the interface that should be used by tests.
type MsgTracker ¶
type MsgTracker struct {
// contains filtered or unexported fields
}
MsgTracker is a helper for checking whether a set of messages make a full round trip from publisher to subscriber.
Add() registers published messages. Remove() should be called when messages are received by subscribers. Call Wait() to block until all tracked messages are received. The same MsgTracker instance can be reused to repeat this sequence for multiple test cycles.
Add() and Remove() calls should not be interleaved.
func NewMsgTracker ¶
func NewMsgTracker() *MsgTracker
NewMsgTracker creates a new message tracker.
func (*MsgTracker) Add ¶
func (mt *MsgTracker) Add(msgs ...string)
Add a set of tracked messages.
func (*MsgTracker) Empty ¶
func (mt *MsgTracker) Empty() bool
Empty returns true if there are no tracked messages remaining.
func (*MsgTracker) Remove ¶
func (mt *MsgTracker) Remove(msg string) bool
Remove and return true if `msg` is tracked. Signals the `done` channel once all messages have been received.
func (*MsgTracker) Status ¶
func (mt *MsgTracker) Status() error
Status returns an error if there are tracked messages remaining.
func (*MsgTracker) Wait ¶
func (mt *MsgTracker) Wait(timeout time.Duration) error
Wait up to `timeout` to receive all tracked messages.
type OrderingReceiver ¶
type OrderingReceiver struct {
// contains filtered or unexported fields
}
OrderingReceiver consumes a message string generated by OrderingSender and verifies that messages in a partition are ordered. It is used in conjunction with Subscribers.
func NewOrderingReceiver ¶
func NewOrderingReceiver() *OrderingReceiver
NewOrderingReceiver creates a new OrderingReceiver.
func (*OrderingReceiver) Receive ¶
func (or *OrderingReceiver) Receive(data, key string) error
Receive checks the given message data and key and returns an error if unordered messages are detected.
Note: a normal scenario resulting in unordered messages is when the Publish stream breaks while there are in-flight batches, which are resent upon stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a test.
type OrderingSender ¶
type OrderingSender struct { TotalMsgCount int64 }
OrderingSender generates strings containing a message index to use for verifying message ordering. It is used on conjunction with Publishers.
func NewOrderingSender ¶
func NewOrderingSender() *OrderingSender
NewOrderingSender creats a new OrderingSender.
func (*OrderingSender) Next ¶
func (os *OrderingSender) Next(prefix string) string
Next generates the next string to publish.
type RPCVerifier ¶
type RPCVerifier struct {
// contains filtered or unexported fields
}
RPCVerifier stores an queue of requests expected from the client, and the corresponding response or error to return.
func NewRPCVerifier ¶
func NewRPCVerifier(t *testing.T) *RPCVerifier
NewRPCVerifier creates a new verifier for requests received by the server.
func (*RPCVerifier) Flush ¶
func (v *RPCVerifier) Flush()
Flush logs an error for any remaining {request, response, error} tuples, in case the client terminated early.
func (*RPCVerifier) Pop ¶
func (v *RPCVerifier) Pop(gotRequest interface{}) (interface{}, error)
Pop validates the received request with the next {request, response, error} tuple.
func (*RPCVerifier) Push ¶
func (v *RPCVerifier) Push(wantRequest interface{}, retResponse interface{}, retErr error)
Push appends a new {request, response, error} tuple.
Valid combinations for unary and streaming RPCs: - {request, response, nil} - {request, nil, error}
Additional combinations for streams only: - {nil, response, nil}: send a response without a request (e.g. messages). - {nil, nil, error}: break the stream without a request. - {request, nil, nil}: expect a request, but don't send any response.
func (*RPCVerifier) PushWithBarrier ¶
func (v *RPCVerifier) PushWithBarrier(wantRequest interface{}, retResponse interface{}, retErr error) *Barrier
PushWithBarrier is like Push, but returns a barrier that the test should call Release when it would like the response to be sent to the client. This is useful for synchronizing with work that needs to be done on the client.
func (*RPCVerifier) TryPop ¶
func (v *RPCVerifier) TryPop() (bool, interface{}, error)
TryPop should be used only for streams. It checks whether the request in the next tuple is nil, in which case the response or error should be returned to the client without waiting for a request. Useful for streams where the server continuously sends data (e.g. subscribe stream).
type Server ¶
type Server struct { LiteServer MockServer // contains filtered or unexported fields }
Server is a mock Pub/Sub Lite server that can be used for unit testing.
func NewServer ¶
NewServer creates a new mock Pub/Sub Lite server.
func (*Server) ClientConn ¶
func (s *Server) ClientConn() option.ClientOption
ClientConn creates a client connection to the gRPC test server.
func (*Server) Close ¶
func (s *Server) Close()
Close shuts down the server and releases all resources.
type Verifiers ¶
type Verifiers struct { // Global list of verifiers for all unary RPCs. GlobalVerifier *RPCVerifier // contains filtered or unexported fields }
Verifiers contains RPCVerifiers for unary RPCs and streaming RPCs.
func NewVerifiers ¶
NewVerifiers creates a new instance of Verifiers for a test.
func (*Verifiers) AddAssignmentStream ¶
func (tv *Verifiers) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
AddAssignmentStream adds verifiers for an assignment stream.
func (*Verifiers) AddCommitStream ¶
func (tv *Verifiers) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
AddCommitStream adds verifiers for a commit stream.
func (*Verifiers) AddPublishStream ¶
func (tv *Verifiers) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
AddPublishStream adds verifiers for a publish stream.
func (*Verifiers) AddSubscribeStream ¶
func (tv *Verifiers) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
AddSubscribeStream adds verifiers for a subscribe stream.
Source Files ¶
condition.go mock.go msg_tracker.go msg_validators.go util.go verifier.go zones.go
- Version
- v1.8.2 (latest)
- Published
- Jun 5, 2024
- Platform
- linux/amd64
- Imports
- 27 packages
- Last checked
- 6 days ago –
Tools for package owners.