package topicwriterinternal

import "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"

Index

Variables

var (
	PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
)

Types

type ConnectFunc

type ConnectFunc func(ctx context.Context) (RawTopicWriterStream, error)

type EncoderMap

type EncoderMap struct {
	// contains filtered or unexported fields
}

func NewEncoderMap

func NewEncoderMap() *EncoderMap

func (*EncoderMap) AddEncoder

func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc)

func (*EncoderMap) CreateLazyEncodeWriter

func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error)

func (*EncoderMap) GetSupportedCodecs

func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs

func (*EncoderMap) IsSupported

func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool

type EncoderSelector

type EncoderSelector struct {
	// contains filtered or unexported fields
}

EncoderSelector not thread safe

func NewEncoderSelector

func NewEncoderSelector(
	m *EncoderMap,
	allowedCodecs rawtopiccommon.SupportedCodecs,
	parallelCompressors int,
	tracer trace.Topic,
	writerReconnectorID, sessionID string,
) EncoderSelector

func (*EncoderSelector) CompressMessages

func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (rawtopiccommon.Codec, error)

func (*EncoderSelector) ResetAllowedCodecs

func (s *EncoderSelector) ResetAllowedCodecs(allowedCodecs rawtopiccommon.SupportedCodecs)

type Message

type Message struct {
	SeqNo     int64
	CreatedAt time.Time
	Data      io.Reader
	// contains filtered or unexported fields
}

type MessageQueueAckWaiter

type MessageQueueAckWaiter struct {
	// contains filtered or unexported fields
}

func (*MessageQueueAckWaiter) AddWaitIndex

func (m *MessageQueueAckWaiter) AddWaitIndex(index int)

type PublicCreateEncoderFunc

type PublicCreateEncoderFunc func(writer io.Writer) (io.WriteCloser, error)

type PublicFuturePartitioning

type PublicFuturePartitioning struct {
	// contains filtered or unexported fields
}

PublicFuturePartitioning will be published in feature, after server implementation completed.

func NewPartitioningWithMessageGroupID

func NewPartitioningWithMessageGroupID(id string) PublicFuturePartitioning

func NewPartitioningWithPartitionID

func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning

func (PublicFuturePartitioning) ToRaw

type PublicOnWriterInitResponseCallback

type PublicOnWriterInitResponseCallback func(info PublicWithOnWriterConnectedInfo) error

type PublicWithOnWriterConnectedInfo

type PublicWithOnWriterConnectedInfo struct {
	LastSeqNo        int64
	SessionID        string
	PartitionID      int64
	CodecsFromServer []topictypes.Codec
	AllowedCodecs    []topictypes.Codec // Intersection between codecs from server and codecs, supported by writer
}

type PublicWriterOption

type PublicWriterOption func(cfg *WriterReconnectorConfig)

func WithAddEncoder

func WithAddEncoder(codec rawtopiccommon.Codec, encoderFunc PublicCreateEncoderFunc) PublicWriterOption

func WithAutoCodec

func WithAutoCodec() PublicWriterOption

func WithAutoSetSeqNo

func WithAutoSetSeqNo(val bool) PublicWriterOption

func WithAutosetCreatedTime

func WithAutosetCreatedTime(enable bool) PublicWriterOption

func WithCodec

func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption

func WithCommonConfig

func WithCommonConfig(common config.Common) PublicWriterOption

WithCommonConfig

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func WithCompressorCount

func WithCompressorCount(num int) PublicWriterOption

func WithConnectFunc

func WithConnectFunc(connect ConnectFunc) PublicWriterOption

func WithConnectTimeout

func WithConnectTimeout(timeout time.Duration) PublicWriterOption

func WithCredentials

func WithCredentials(cred credentials.Credentials) PublicWriterOption

WithCredentials for internal usage only no proxy to public interface

func WithMaxQueueLen

func WithMaxQueueLen(num int) PublicWriterOption

func WithPartitioning

func WithPartitioning(partitioning PublicFuturePartitioning) PublicWriterOption

func WithProducerID

func WithProducerID(producerID string) PublicWriterOption

func WithSessionMeta

func WithSessionMeta(meta map[string]string) PublicWriterOption

func WithStartTimeout

func WithStartTimeout(timeout time.Duration) PublicWriterOption

func WithTokenUpdateInterval

func WithTokenUpdateInterval(interval time.Duration) PublicWriterOption

func WithTopic

func WithTopic(topic string) PublicWriterOption

func WithTrace

func WithTrace(tracer trace.Topic) PublicWriterOption

func WithWaitAckOnWrite

func WithWaitAckOnWrite(val bool) PublicWriterOption

type RawTopicWriterStream

type RawTopicWriterStream interface {
	Recv() (rawtopicwriter.ServerMessage, error)
	Send(mess rawtopicwriter.ClientMessage) error
	CloseSend() error
}

type SingleStreamWriter

type SingleStreamWriter struct {
	ReceivedLastSeqNum int64
	SessionID          string
	PartitionID        int64
	CodecsFromServer   rawtopiccommon.SupportedCodecs
	Encoder            EncoderSelector
	// contains filtered or unexported fields
}

func NewSingleStreamWriter

func NewSingleStreamWriter(
	ctxForPProfLabelsOnly context.Context,
	cfg SingleStreamWriterConfig,
) (*SingleStreamWriter, error)

func (*SingleStreamWriter) WaitClose

func (w *SingleStreamWriter) WaitClose(ctx context.Context) error

type SingleStreamWriterConfig

type SingleStreamWriterConfig struct {
	WritersCommonConfig
	// contains filtered or unexported fields
}

type StreamWriter

type StreamWriter interface {
	Write(ctx context.Context, messages []Message) error
	Close(ctx context.Context) error
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(cred credentials.Credentials, options []PublicWriterOption) (*Writer, error)

func (*Writer) Close

func (w *Writer) Close(ctx context.Context) error

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, messages ...Message) error

type WriterReconnector

type WriterReconnector struct {
	// contains filtered or unexported fields
}

func (*WriterReconnector) Close

func (w *WriterReconnector) Close(ctx context.Context) error

func (*WriterReconnector) Write

func (w *WriterReconnector) Write(ctx context.Context, messages []Message) error

type WriterReconnectorConfig

type WriterReconnectorConfig struct {
	WritersCommonConfig

	MaxMessageSize               int
	MaxQueueLen                  int
	Common                       config.Common
	AdditionalEncoders           map[rawtopiccommon.Codec]PublicCreateEncoderFunc
	Connect                      ConnectFunc
	WaitServerAck                bool
	AutoSetSeqNo                 bool
	AutoSetCreatedTime           bool
	OnWriterInitResponseCallback PublicOnWriterInitResponseCallback
	RetrySettings                topic.RetrySettings
	// contains filtered or unexported fields
}

type WritersCommonConfig

type WritersCommonConfig struct {
	// contains filtered or unexported fields
}

Source Files

encoders.go message.go public_callbacks.go queue.go writer.go writer_config.go writer_options.go writer_reconnector.go writer_single_stream.go writer_stream_interface.go

Version
v3.42.12
Published
Mar 3, 2023
Platform
linux/amd64
Imports
30 packages
Last checked
10 minutes ago

Tools for package owners.