package topiclistenerinternal

import "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topiclistenerinternal"

Index

Variables

var (
	ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
)

Types

type CommitHandler

type CommitHandler interface {
	// contains filtered or unexported methods
}

CommitHandler interface for PublicReadMessages commit operations

type EventHandler

type EventHandler interface {
	// OnStartPartitionSessionRequest called when server send start partition session request method.
	// You can use it to store read progress on your own side.
	// You must call event.Confirm(...) for start to receive messages from the partition.
	// You can set topiclistener.StartPartitionSessionConfirm for change default settings.
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	OnStartPartitionSessionRequest(ctx context.Context, event *PublicEventStartPartitionSession) error

	// OnReadMessages called with batch of messages. Max count of messages limited by internal buffer size
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	OnReadMessages(ctx context.Context, event *PublicReadMessages) error

	// OnStopPartitionSessionRequest called when the server send stop partition message.
	// It means that no more OnReadMessages calls for the partition session.
	// You must call event.Confirm() for allow the server to stop the partition session (if event.Graceful=true).
	// Confirm is optional for event.Graceful=false
	// The method can be called twice: with event.Graceful=true, then event.Graceful=false.
	// It is guaranteed about the method will be called least once.
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	OnStopPartitionSessionRequest(ctx context.Context, event *PublicEventStopPartitionSession) error
}

type MessageSender

type MessageSender interface {
	SendRaw(msg rawtopicreader.ClientMessage)
}

MessageSender sends messages back to server

type PartitionWorker

type PartitionWorker struct {
	// contains filtered or unexported fields
}

PartitionWorker processes messages for a single partition

func NewPartitionWorker

func NewPartitionWorker(
	sessionID rawtopicreader.PartitionSessionID,
	session *topicreadercommon.PartitionSession,
	messageSender MessageSender,
	userHandler EventHandler,
	onStopped WorkerStoppedCallback,
	tracer *trace.Topic,
	listenerID string,
) *PartitionWorker

NewPartitionWorker creates a new PartitionWorker instance

func (*PartitionWorker) AddMessagesBatch

func (w *PartitionWorker) AddMessagesBatch(
	metadata rawtopiccommon.ServerMessageMetadata,
	batch *topicreadercommon.PublicBatch,
)

AddMessagesBatch sends a ready batch message

func (*PartitionWorker) AddRawServerMessage

func (w *PartitionWorker) AddRawServerMessage(msg rawtopicreader.ServerMessage)

AddRawServerMessage sends a raw server message

func (*PartitionWorker) AddUnifiedMessage

func (w *PartitionWorker) AddUnifiedMessage(msg unifiedMessage)

AddUnifiedMessage adds a unified message to the processing queue

func (*PartitionWorker) Close

func (w *PartitionWorker) Close(ctx context.Context, reason error) error

Close stops the worker gracefully

func (*PartitionWorker) Start

func (w *PartitionWorker) Start(ctx context.Context)

Start begins processing messages for this partition

type PublicEventStartPartitionSession

type PublicEventStartPartitionSession struct {
	PartitionSession topicreadercommon.PublicPartitionSession
	CommittedOffset  int64
	PartitionOffsets PublicOffsetsRange
	// contains filtered or unexported fields
}

PublicEventStartPartitionSession

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewPublicStartPartitionSessionEvent

func NewPublicStartPartitionSessionEvent(
	session topicreadercommon.PublicPartitionSession,
	committedOffset int64,
	partitionOffsets PublicOffsetsRange,
) *PublicEventStartPartitionSession

func (*PublicEventStartPartitionSession) Confirm

func (e *PublicEventStartPartitionSession) Confirm()

Confirm

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*PublicEventStartPartitionSession) ConfirmWithParams

type PublicEventStopPartitionSession

type PublicEventStopPartitionSession struct {
	PartitionSession topicreadercommon.PublicPartitionSession

	// Graceful mean about server is waiting for client finish work with the partition and confirm stop the work
	// if the field is false it mean about server stop lease the partition to the client and can assignee the partition
	// to other read session (on this or other connection).
	Graceful        bool
	CommittedOffset int64
	// contains filtered or unexported fields
}

PublicEventStopPartitionSession

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewPublicStopPartitionSessionEvent

func NewPublicStopPartitionSessionEvent(
	partitionSession topicreadercommon.PublicPartitionSession,
	graceful bool,
	committedOffset int64,
) *PublicEventStopPartitionSession

func (*PublicEventStopPartitionSession) Confirm

func (e *PublicEventStopPartitionSession) Confirm()

Confirm

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicOffsetsRange

type PublicOffsetsRange struct {
	Start int64
	End   int64
}

PublicOffsetsRange

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicReadMessages

type PublicReadMessages struct {
	PartitionSession topicreadercommon.PublicPartitionSession
	Batch            *topicreadercommon.PublicBatch
	// contains filtered or unexported fields
}

PublicReadMessages

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewPublicReadMessages

func NewPublicReadMessages(
	session topicreadercommon.PublicPartitionSession,
	batch *topicreadercommon.PublicBatch,
	commitHandler CommitHandler,
) *PublicReadMessages

func (*PublicReadMessages) Confirm

func (e *PublicReadMessages) Confirm()

Confirm of the process messages from the batch. Send commit message the server in background. The method returns fast, without wait commits ack.

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*PublicReadMessages) ConfirmWithAck

func (e *PublicReadMessages) ConfirmWithAck(ctx context.Context) error

ConfirmWithAck commit the batch and wait ack from the server. The method will be blocked until receive ack, error or expire ctx.

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type PublicStartPartitionSessionConfirm

type PublicStartPartitionSessionConfirm struct {
	CommitOffset *int64 ``
	// contains filtered or unexported fields
}

PublicStartPartitionSessionConfirm

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (PublicStartPartitionSessionConfirm) WithCommitOffset

WithCommitOffset

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (PublicStartPartitionSessionConfirm) WithReadOffet

WithReadOffet

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type StreamListenerConfig

type StreamListenerConfig struct {
	BufferSize             int
	Decoders               topicreadercommon.DecoderMap
	Selectors              []*topicreadercommon.PublicReadSelector
	Consumer               string
	ConnectWithoutConsumer bool

	Tracer *trace.Topic
	// contains filtered or unexported fields
}

func NewStreamListenerConfig

func NewStreamListenerConfig() StreamListenerConfig

func (*StreamListenerConfig) Validate

func (cfg *StreamListenerConfig) Validate() error

type SyncCommitter

type SyncCommitter interface {
	Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
}

SyncCommitter interface for ConfirmWithAck support

type TopicClient

type TopicClient interface {
	StreamRead(ctxStreamLifeTime context.Context, readerID int64, tracer *trace.Topic) (rawtopicreader.StreamReader, error)
}

type TopicListenerReconnector

type TopicListenerReconnector struct {
	// contains filtered or unexported fields
}

func NewTopicListenerReconnector

func NewTopicListenerReconnector(
	client TopicClient,
	streamConfig *StreamListenerConfig,
	handler EventHandler,
) (*TopicListenerReconnector, error)

func (*TopicListenerReconnector) Close

func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error

func (*TopicListenerReconnector) WaitInit

func (lr *TopicListenerReconnector) WaitInit(ctx context.Context) error

func (*TopicListenerReconnector) WaitStop

func (lr *TopicListenerReconnector) WaitStop(ctx context.Context) error

type WorkerStoppedCallback

type WorkerStoppedCallback func(sessionID rawtopicreader.PartitionSessionID, reason error)

WorkerStoppedCallback notifies when worker is stopped

Source Files

event_handler.go listener_config.go partition_worker.go stream_listener.go topic_client.go topic_listener_reconnector.go

Version
v3.110.0 (latest)
Published
Jun 10, 2025
Platform
linux/amd64
Imports
19 packages
Last checked
1 hour ago

Tools for package owners.