package topicwriterinternal

import "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"

Index

Variables

var (
	PublicErrQueueIsFull                           = xerrors.Wrap(errors.New("ydb: queue is full"))
	PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("" /* 131 byte string literal not displayed */))
)

Types

type ConnectFunc

type ConnectFunc func(ctx context.Context, tracer *trace.Topic) (RawTopicWriterStream, error)

type EncoderSelector

type EncoderSelector struct {
	// contains filtered or unexported fields
}

EncoderSelector not thread safe

func NewEncoderSelector

func NewEncoderSelector(
	m *MultiEncoder,
	allowedCodecs rawtopiccommon.SupportedCodecs,
	parallelCompressors int,
	tracer *trace.Topic,
	writerReconnectorID, sessionID string,
) EncoderSelector

func (*EncoderSelector) CompressMessages

func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (rawtopiccommon.Codec, error)

func (*EncoderSelector) ResetAllowedCodecs

func (s *EncoderSelector) ResetAllowedCodecs(allowedCodecs rawtopiccommon.SupportedCodecs)

type InitialInfo

type InitialInfo struct {
	LastSeqNum int64
}

type MessageQueueAckWaiter

type MessageQueueAckWaiter struct {
	// contains filtered or unexported fields
}

func (*MessageQueueAckWaiter) AddWaitIndex

func (m *MessageQueueAckWaiter) AddWaitIndex(index int)

type MultiEncoder

type MultiEncoder struct {
	// contains filtered or unexported fields
}

func NewMultiEncoder

func NewMultiEncoder() *MultiEncoder

func (*MultiEncoder) AddEncoder

func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc)

func (*MultiEncoder) Encode

func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error)

func (*MultiEncoder) EncodeBytes

func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error)

func (*MultiEncoder) GetSupportedCodecs

func (e *MultiEncoder) GetSupportedCodecs() rawtopiccommon.SupportedCodecs

func (*MultiEncoder) IsSupported

func (e *MultiEncoder) IsSupported(codec rawtopiccommon.Codec) bool

type PublicCreateEncoderFunc

type PublicCreateEncoderFunc func(writer io.Writer) (io.WriteCloser, error)

type PublicFuturePartitioning

type PublicFuturePartitioning struct {
	// contains filtered or unexported fields
}

PublicFuturePartitioning will be published in feature, after server implementation completed.

func NewPartitioningWithMessageGroupID

func NewPartitioningWithMessageGroupID(id string) PublicFuturePartitioning

func NewPartitioningWithPartitionID

func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning

func (PublicFuturePartitioning) ToRaw

type PublicMessage

type PublicMessage struct {
	SeqNo     int64
	CreatedAt time.Time
	Data      io.Reader
	Metadata  map[string][]byte
	// contains filtered or unexported fields
}

type PublicOnWriterInitResponseCallback

type PublicOnWriterInitResponseCallback func(info PublicWithOnWriterConnectedInfo) error

type PublicResetableWriter

type PublicResetableWriter interface {
	io.WriteCloser
	Reset(wr io.Writer)
}

type PublicWithOnWriterConnectedInfo

type PublicWithOnWriterConnectedInfo struct {
	LastSeqNo        int64
	SessionID        string
	PartitionID      int64
	CodecsFromServer []topictypes.Codec
	AllowedCodecs    []topictypes.Codec // Intersection between codecs from server and codecs, supported by writer
}

type PublicWriterOption

type PublicWriterOption func(cfg *WriterReconnectorConfig)

func WithAddEncoder

func WithAddEncoder(codec rawtopiccommon.Codec, encoderFunc PublicCreateEncoderFunc) PublicWriterOption

func WithAutoCodec

func WithAutoCodec() PublicWriterOption

func WithAutoSetSeqNo

func WithAutoSetSeqNo(val bool) PublicWriterOption

func WithAutosetCreatedTime

func WithAutosetCreatedTime(enable bool) PublicWriterOption

func WithClock

func WithClock(clock clockwork.Clock) PublicWriterOption

WithClock is private option for tests

func WithCodec

func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption

func WithCommonConfig

func WithCommonConfig(common config.Common) PublicWriterOption

WithCommonConfig

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func WithCompressorCount

func WithCompressorCount(num int) PublicWriterOption

func WithConnectFunc

func WithConnectFunc(connect ConnectFunc) PublicWriterOption

func WithConnectTimeout

func WithConnectTimeout(timeout time.Duration) PublicWriterOption

func WithCredentials

func WithCredentials(cred credentials.Credentials) PublicWriterOption

WithCredentials for internal usage only no proxy to public interface

func WithMaxGrpcMessageBytes

func WithMaxGrpcMessageBytes(num int) PublicWriterOption

func WithMaxQueueLen

func WithMaxQueueLen(num int) PublicWriterOption

func WithPartitioning

func WithPartitioning(partitioning PublicFuturePartitioning) PublicWriterOption

func WithProducerID

func WithProducerID(producerID string) PublicWriterOption

func WithSessionMeta

func WithSessionMeta(meta map[string]string) PublicWriterOption

func WithStartTimeout

func WithStartTimeout(timeout time.Duration) PublicWriterOption

func WithTokenUpdateInterval

func WithTokenUpdateInterval(interval time.Duration) PublicWriterOption

func WithTopic

func WithTopic(topic string) PublicWriterOption

func WithTrace

func WithTrace(tracer *trace.Topic) PublicWriterOption

func WithWaitAckOnWrite

func WithWaitAckOnWrite(val bool) PublicWriterOption

type RawTopicWriterStream

type RawTopicWriterStream interface {
	Recv() (rawtopicwriter.ServerMessage, error)
	Send(mess rawtopicwriter.ClientMessage) error
	CloseSend() error
}

type SingleStreamWriter

type SingleStreamWriter struct {
	Encoder EncoderSelector

	CodecsFromServer rawtopiccommon.SupportedCodecs

	SessionID string

	ReceivedLastSeqNum int64
	PartitionID        int64

	LastSeqNumRequested bool
	// contains filtered or unexported fields
}

func NewSingleStreamWriter

func NewSingleStreamWriter(
	ctxForPProfLabelsOnly context.Context,
	cfg SingleStreamWriterConfig,
) (*SingleStreamWriter, error)

func (*SingleStreamWriter) WaitClose

func (w *SingleStreamWriter) WaitClose(ctx context.Context) error

type SingleStreamWriterConfig

type SingleStreamWriterConfig struct {
	WritersCommonConfig
	// contains filtered or unexported fields
}

type StreamWriter

type StreamWriter interface {
	Write(ctx context.Context, messages []PublicMessage) error
	WaitInit(ctx context.Context) (info InitialInfo, err error)
	Close(ctx context.Context) error
	Flush(ctx context.Context) error
}

type WriterReconnector

type WriterReconnector struct {
	// contains filtered or unexported fields
}

func NewWriterReconnector

func NewWriterReconnector(
	cfg WriterReconnectorConfig,
) (*WriterReconnector, error)

func (*WriterReconnector) Close

func (w *WriterReconnector) Close(ctx context.Context) error

func (*WriterReconnector) Flush

func (w *WriterReconnector) Flush(ctx context.Context) error

func (*WriterReconnector) GetSessionID

func (w *WriterReconnector) GetSessionID() (sessionID string)

func (*WriterReconnector) WaitInit

func (w *WriterReconnector) WaitInit(ctx context.Context) (info InitialInfo, err error)

func (*WriterReconnector) Write

func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) (resErr error)

type WriterReconnectorConfig

type WriterReconnectorConfig struct {
	WritersCommonConfig

	MaxMessageSize               int
	MaxQueueLen                  int
	Common                       config.Common
	AdditionalEncoders           map[rawtopiccommon.Codec]PublicCreateEncoderFunc
	Connect                      ConnectFunc
	WaitServerAck                bool
	AutoSetSeqNo                 bool
	AutoSetCreatedTime           bool
	OnWriterInitResponseCallback PublicOnWriterInitResponseCallback
	RetrySettings                topic.RetrySettings
	// contains filtered or unexported fields
}

func NewWriterReconnectorConfig

func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnectorConfig

type WriterWithTransaction

type WriterWithTransaction struct {
	// contains filtered or unexported fields
}

func NewTopicWriterTransaction

func NewTopicWriterTransaction(w *WriterReconnector, tx tx.Transaction, tracer *trace.Topic) *WriterWithTransaction

func (*WriterWithTransaction) Close

func (*WriterWithTransaction) WaitInit

func (w *WriterWithTransaction) WaitInit(ctx context.Context) (info InitialInfo, err error)

func (*WriterWithTransaction) Write

func (w *WriterWithTransaction) Write(ctx context.Context, messages ...PublicMessage) error

type WritersCommonConfig

type WritersCommonConfig struct {
	Tracer *trace.Topic
	// contains filtered or unexported fields
}

Source Files

encoders.go message.go public_callbacks.go queue.go writer_config.go writer_options.go writer_reconnector.go writer_single_stream.go writer_stream_interface.go writer_transaction.go

Version
v3.103.0
Published
Mar 14, 2025
Platform
darwin/amd64
Imports
32 packages
Last checked
4 minutes ago

Tools for package owners.