package topiclistenerinternal

import "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topiclistenerinternal"

Index

Variables

var (
	ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
)

Types

type EventHandler

type EventHandler interface {
	// OnStartPartitionSessionRequest called when server send start partition session request method.
	// You can use it to store read progress on your own side.
	// You must call event.Confirm(...) for start to receive messages from the partition.
	// You can set topiclistener.StartPartitionSessionConfirm for change default settings.
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	OnStartPartitionSessionRequest(ctx context.Context, event *PublicEventStartPartitionSession) error

	// OnReadMessages called with batch of messages. Max count of messages limited by internal buffer size
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	OnReadMessages(ctx context.Context, event *PublicReadMessages) error

	// OnStopPartitionSessionRequest called when the server send stop partition message.
	// It means that no more OnReadMessages calls for the partition session.
	// You must call event.Confirm() for allow the server to stop the partition session (if event.Graceful=true).
	// Confirm is optional for event.Graceful=false
	// The method can be called twice: with event.Graceful=true, then event.Graceful=false.
	// It is guaranteed about the method will be called least once.
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	OnStopPartitionSessionRequest(ctx context.Context, event *PublicEventStopPartitionSession) error
}

type PublicEventStartPartitionSession

type PublicEventStartPartitionSession struct {
	PartitionSession topicreadercommon.PublicPartitionSession
	CommittedOffset  int64
	PartitionOffsets PublicOffsetsRange
	// contains filtered or unexported fields
}

PublicEventStartPartitionSession

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewPublicStartPartitionSessionEvent

func NewPublicStartPartitionSessionEvent(
	session topicreadercommon.PublicPartitionSession,
	committedOffset int64,
	partitionOffsets PublicOffsetsRange,
) *PublicEventStartPartitionSession

func (*PublicEventStartPartitionSession) Confirm

func (e *PublicEventStartPartitionSession) Confirm()

Confirm

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*PublicEventStartPartitionSession) ConfirmWithParams

type PublicEventStopPartitionSession

type PublicEventStopPartitionSession struct {
	PartitionSession topicreadercommon.PublicPartitionSession

	// Graceful mean about server is waiting for client finish work with the partition and confirm stop the work
	// if the field is false it mean about server stop lease the partition to the client and can assignee the partition
	// to other read session (on this or other connection).
	Graceful        bool
	CommittedOffset int64
	// contains filtered or unexported fields
}

PublicEventStopPartitionSession

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewPublicStopPartitionSessionEvent

func NewPublicStopPartitionSessionEvent(
	partitionSession topicreadercommon.PublicPartitionSession,
	graceful bool,
	committedOffset int64,
) *PublicEventStopPartitionSession

func (*PublicEventStopPartitionSession) Confirm

func (e *PublicEventStopPartitionSession) Confirm()

Confirm

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicOffsetsRange

type PublicOffsetsRange struct {
	Start int64
	End   int64
}

PublicOffsetsRange

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicReadMessages

type PublicReadMessages struct {
	PartitionSession topicreadercommon.PublicPartitionSession
	Batch            *topicreadercommon.PublicBatch
	// contains filtered or unexported fields
}

PublicReadMessages

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewPublicReadMessages

func NewPublicReadMessages(
	session topicreadercommon.PublicPartitionSession,
	batch *topicreadercommon.PublicBatch,
	listener *streamListener,
) *PublicReadMessages

func (*PublicReadMessages) Confirm

func (e *PublicReadMessages) Confirm()

Confirm of the process messages from the batch. Send commit message the server in background. The method returns fast, without wait commits ack.

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*PublicReadMessages) ConfirmWithAck

func (e *PublicReadMessages) ConfirmWithAck(ctx context.Context) error

ConfirmWithAck commit the batch and wait ack from the server. The method will be blocked until receive ack, error or expire ctx.

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicStartPartitionSessionConfirm

type PublicStartPartitionSessionConfirm struct {
	CommitOffset *int64 ``
	// contains filtered or unexported fields
}

PublicStartPartitionSessionConfirm

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (PublicStartPartitionSessionConfirm) WithCommitOffset

WithCommitOffset

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (PublicStartPartitionSessionConfirm) WithReadOffet

WithReadOffet

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type StreamListenerConfig

type StreamListenerConfig struct {
	BufferSize             int
	Decoders               topicreadercommon.DecoderMap
	Selectors              []*topicreadercommon.PublicReadSelector
	Consumer               string
	ConnectWithoutConsumer bool
	// contains filtered or unexported fields
}

func NewStreamListenerConfig

func NewStreamListenerConfig() StreamListenerConfig

func (*StreamListenerConfig) Validate

func (cfg *StreamListenerConfig) Validate() error

type TopicClient

type TopicClient interface {
	StreamRead(ctxStreamLifeTime context.Context, readerID int64, tracer *trace.Topic) (rawtopicreader.StreamReader, error)
}

type TopicListenerReconnector

type TopicListenerReconnector struct {
	// contains filtered or unexported fields
}

func NewTopicListenerReconnector

func NewTopicListenerReconnector(
	client TopicClient,
	streamConfig *StreamListenerConfig,
	handler EventHandler,
) (*TopicListenerReconnector, error)

func (*TopicListenerReconnector) Close

func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error

func (*TopicListenerReconnector) WaitInit

func (lr *TopicListenerReconnector) WaitInit(ctx context.Context) error

func (*TopicListenerReconnector) WaitStop

func (lr *TopicListenerReconnector) WaitStop(ctx context.Context) error

Source Files

event_handler.go listener_config.go stream_listener.go topic_client.go topic_listener_reconnector.go

Version
v3.108.0 (latest)
Published
Apr 21, 2025
Platform
linux/amd64
Imports
17 packages
Last checked
2 minutes ago

Tools for package owners.