package topicwriter
import "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
Index ¶
- Variables
- type Message
- type PublicInitialInfo
- type TxWriter
- func NewTxWriterInternal(w *topicwriterinternal.WriterWithTransaction) *TxWriter
- func (w *TxWriter) WaitInit(ctx context.Context) (err error)
- func (w *TxWriter) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err error)
- func (w *TxWriter) Write(ctx context.Context, messages ...Message) error
- type WriteOption
- type Writer
- func NewWriter(writer *topicwriterinternal.WriterReconnector) *Writer
- func (w *Writer) Close(ctx context.Context) error
- func (w *Writer) Flush(ctx context.Context) error
- func (w *Writer) WaitInit(ctx context.Context) (err error)
- func (w *Writer) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err error)
- func (w *Writer) Write(ctx context.Context, messages ...Message) error
Examples ¶
Variables ¶
var ( ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError )
Types ¶
type Message ¶
type Message = topicwriterinternal.PublicMessage
type PublicInitialInfo ¶
type PublicInitialInfo struct { LastSeqNum int64 }
PublicInitialInfo is an information about writer after initialize
type TxWriter ¶
type TxWriter struct {
// contains filtered or unexported fields
}
TxWriter used for send messages to the transaction
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func NewTxWriterInternal ¶
func NewTxWriterInternal(w *topicwriterinternal.WriterWithTransaction) *TxWriter
func (*TxWriter) WaitInit ¶
WaitInit waits until the reader is initialized or an error occurs, return PublicInitialInfo and err
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (*TxWriter) WaitInitInfo ¶
func (w *TxWriter) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err error)
WaitInitInfo waits until the reader is initialized or an error occurs, return PublicInitialInfo and err
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (*TxWriter) Write ¶
Write messages to the transaction
It has not retries. If fails - needs to retry full transaction, as with any other error with table.
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type WriteOption ¶
type WriteOption interface {
// contains filtered or unexported methods
}
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer represent write session to topic It handles connection problems, reconnect to server when need and resend buffered messages
func NewWriter ¶
func NewWriter(writer *topicwriterinternal.WriterReconnector) *Writer
NewWriter create new writer from internal type. Used internally only.
func (*Writer) Close ¶
Close will flush rested messages from buffer and close the writer. You can't write new messages after call Close
func (*Writer) Flush ¶
Flush waits till all in-flight messages are acknowledged.
func (*Writer) WaitInit ¶
WaitInit waits until the reader is initialized or an error occurs, return PublicInitialInfo and err
func (*Writer) WaitInitInfo ¶
func (w *Writer) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err error)
WaitInitInfo waits until the reader is initialized or an error occurs, return PublicInitialInfo and err
func (*Writer) Write ¶
Write send messages to topic return after save messages into buffer in async mode (default) and after ack from server in sync mode. see topicoptions.WithSyncWrite
The method will wait first initial connection even for async mode, that mean first write may be slower. especially when connection has problems.
It returns ErrQueueLimitExceed (must be checked by errors.Is)
if ctx cancelled before messages put to internal buffer or try to add more messages, that can be put to queue.
If err != nil you can check errors.Is(err, ErrMessagesPutToInternalQueueBeforeError) for check if the messages
put to buffer before error. It means that it is messages can be delivered to the server.
Code:
Example¶
{
ctx := context.Background()
db, err := ydb.Open(ctx, os.Getenv("YDB_CONNECTION_STRING"))
if err != nil {
log.Fatalf("failed ydb connection: %v", err)
}
writer, err := db.Topic().StartWriter("topicName")
if err != nil {
log.Fatalf("failed to create topic writer: %v", err)
}
err = writer.Write(ctx,
topicwriter.Message{Data: strings.NewReader("1")},
topicwriter.Message{Data: strings.NewReader("2")},
topicwriter.Message{Data: strings.NewReader("3")},
)
if err != nil {
fmt.Println("OK")
} else {
log.Fatalf("failed write to stream")
}
}
Source Files ¶
topicwriter.go write_options.go
- Version
- v3.96.0
- Published
- Jan 15, 2025
- Platform
- darwin/amd64
- Imports
- 2 packages
- Last checked
- 1 second ago –
Tools for package owners.