package topicreader
import "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
Example (EffectiveUnmarshalMessageContentToJSONStruct)¶
Code:play
package main import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" ) func main() { ctx := context.TODO() reader := readerConnect() type S struct { MyField int `json:"my_field"` } var v S msg, _ := reader.ReadMessage(ctx) _ = topicsugar.JSONUnmarshal(msg, &v) } func readerConnect() *topicreader.Reader { panic("example stub") }
Example (EffectiveUnmarshalMessageContentToOwnType)¶
Code:play
package main import ( "context" "encoding/binary" "errors" ) type MyMessage struct { ID byte ChangeType byte Delta uint32 } func (m *MyMessage) UnmarshalYDBTopicMessage(data []byte) error { if len(data) != 6 { return errors.New("bad data len") } m.ID = data[0] m.ChangeType = data[1] m.Delta = binary.BigEndian.Uint32(data[2:]) return nil } func main() { ctx := context.TODO() reader := readerConnect() var v MyMessage mess, _ := reader.ReadMessage(ctx) _ = mess.UnmarshalTo(&v) }
Example (HandlePartitionHardOff_NeedRare)¶
Code:play
package main import ( "bytes" "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() batch, _ := reader.ReadMessageBatch(ctx) if len(batch.Messages) == 0 { return } batchContext := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke buf := &bytes.Buffer{} for _, msg := range batch.Messages { if batchContext.Err() != nil { // if batch context cancelled - it mean client need to stop process messages from batch // next messages will send to other reader return } _, _ = buf.ReadFrom(msg) writeBatchToDB(ctx, batch.Messages[0].WrittenAt, buf.Bytes()) } } func writeBatchToDB(ctx context.Context, t time.Time, data []byte) { } func readerConnect() *topicreader.Reader { panic("example stub") }
Example (HandlePartitionSoftOff_NeedRare)¶
Code:
{
ctx := context.TODO()
db := dbConnect()
reader, _ := db.Topic().StartReader("consumer", nil,
topicoptions.WithBatchReadMinCount(1000),
)
for {
batch, _ := reader.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000
processBatch(batch)
_ = reader.Commit(batch.Context(), batch)
}
}
Example (ReadAndCommitEveryMessage)¶
Code:play
package main import ( "context" "io/ioutil" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { msg, _ := reader.ReadMessage(ctx) processMessage(msg) _ = reader.Commit(msg.Context(), msg) } } func processMessage(m *topicreader.Message) { body, _ := ioutil.ReadAll(m) writeToDB( m.Context(), m.SeqNo, body) } func writeToDB(ctx context.Context, id int64, body []byte) { } func readerConnect() *topicreader.Reader { panic("example stub") }
Example (ReadBatchWithMessageCommits)¶
Code:play
package main import ( "context" "io/ioutil" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { batch, _ := reader.ReadMessageBatch(ctx) for _, msg := range batch.Messages { processMessage(msg) _ = reader.Commit(msg.Context(), batch) } } } func processMessage(m *topicreader.Message) { body, _ := ioutil.ReadAll(m) writeToDB( m.Context(), m.SeqNo, body) } func writeToDB(ctx context.Context, id int64, body []byte) { } func readerConnect() *topicreader.Reader { panic("example stub") }
Example (ReadBatchesWithBatchCommit)¶
Code:play
package main import ( "bytes" "context" "io" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { batch, _ := reader.ReadMessageBatch(ctx) processBatch(batch) _ = reader.Commit(batch.Context(), batch) } } func processBatch(batch *topicreader.Batch) { ctx := batch.Context() if len(batch.Messages) == 0 { return } buf := &bytes.Buffer{} for _, msg := range batch.Messages { buf.Reset() _, _ = buf.ReadFrom(msg) _, _ = io.Copy(buf, msg) writeMessagesToDB(ctx, buf.Bytes()) } } func writeMessagesToDB(ctx context.Context, data []byte) {} func readerConnect() *topicreader.Reader { panic("example stub") }
Example (ReadMessagesWithAsyncBufferedCommit)¶
Code:
{ ctx := context.TODO() db := dbConnect() reader, _ := db.Topic().StartReader("consumer", nil, topicoptions.WithCommitMode(topicoptions.CommitModeAsync), topicoptions.WithCommitCountTrigger(1000), ) defer func() { _ = reader.Close(ctx) // wait until flush buffered commits }() for { msg, _ := reader.ReadMessage(ctx) processMessage(msg) _ = reader.Commit(ctx, msg) // will fast - in async mode commit will append to internal buffer only } }
Example (ReadMessagesWithCustomBatching)¶
Code:
{ ctx := context.TODO() db := dbConnect() reader, _ := db.Topic().StartReader("consumer", nil, topicoptions.WithBatchReadMinCount(1000), ) for { batch, _ := reader.ReadMessageBatch(ctx) processBatch(batch) _ = reader.Commit(batch.Context(), batch) } }
Example (ReadWithExplicitPartitionStartStopHandler)¶
Code:
{ ctx := context.TODO() db := dbConnect() readContext, stopReader := context.WithCancel(context.Background()) defer stopReader() reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"), topicoptions.WithTracer( trace.Topic{ OnPartitionReadStart: func(info trace.OnPartitionReadStartInfo) { err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID) if err != nil { stopReader() } }, OnPartitionReadStop: func(info trace.OnPartitionReadStopInfo) { if info.Graceful { err := externalSystemUnlock(ctx, info.Topic, info.PartitionID) if err != nil { stopReader() } } }, }, ), ) go func() { <-readContext.Done() _ = reader.Close(ctx) }() for { batch, _ := reader.ReadMessageBatch(readContext) processBatch(batch) _ = externalSystemCommit( batch.Context(), batch.Topic(), batch.PartitionID(), getEndOffset(batch), ) } }
Example (ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage)¶
Code:
{
ctx := context.TODO()
db := dbConnect()
readContext, stopReader := context.WithCancel(context.Background())
defer stopReader()
readStartPosition := func(
ctx context.Context,
req topicoptions.GetPartitionStartOffsetRequest,
) (res topicoptions.GetPartitionStartOffsetResponse, err error) {
offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
res.StartFrom(offset)
// Reader will stop if return err != nil
return res, err
}
onPartitionStart := func(info trace.OnPartitionReadStartInfo) {
err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
if err != nil {
stopReader()
}
}
onPartitionStop := func(info trace.OnPartitionReadStopInfo) {
if info.Graceful {
err := externalSystemUnlock(ctx, info.Topic, info.PartitionID)
if err != nil {
stopReader()
}
}
}
r, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
topicoptions.WithGetPartitionStartOffset(readStartPosition),
topicoptions.WithTracer(
trace.Topic{
OnPartitionReadStart: onPartitionStart,
OnPartitionReadStop: onPartitionStop,
},
),
)
go func() {
<-readContext.Done()
_ = r.Close(ctx)
}()
for {
batch, _ := r.ReadMessageBatch(readContext)
processBatch(batch)
_ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), getEndOffset(batch))
}
}
Example (ReadWithOwnReadProgressStorage)¶
Code:
{
ctx := context.TODO()
db := dbConnect()
reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
topicoptions.WithGetPartitionStartOffset(
func(
ctx context.Context,
req topicoptions.GetPartitionStartOffsetRequest,
) (
res topicoptions.GetPartitionStartOffsetResponse,
err error,
) {
offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
res.StartFrom(offset)
// Reader will stop if return err != nil
return res, err
}),
)
for {
batch, _ := reader.ReadMessageBatch(ctx)
processBatch(batch)
_ = externalSystemCommit(
batch.Context(),
batch.Topic(),
batch.PartitionID(),
getEndOffset(batch),
)
}
}
Example (ReceiveCommitNotify)¶
Code:
{
ctx := context.TODO()
db := dbConnect()
reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
topicoptions.WithTracer(trace.Topic{
OnPartitionCommittedNotify: func(info trace.OnPartitionCommittedInfo) {
// called when receive commit notify from server
fmt.Println(info.Topic, info.PartitionID, info.CommittedOffset)
},
},
),
)
for {
msg, _ := reader.ReadMessage(ctx)
processMessage(msg)
}
}
Example (SimplePrintMessageContent)¶
Code:play
package main import ( "context" "fmt" "io/ioutil" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { msg, _ := reader.ReadMessage(ctx) content, _ := ioutil.ReadAll(msg) fmt.Println(string(content)) } } func readerConnect() *topicreader.Reader { panic("example stub") }
Example (SimpleReadMessagesWithErrorHandle)¶
Code:play
package main import ( "context" "io/ioutil" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { msg, _ := reader.ReadMessage(ctx) processMessage(msg) } } func processMessage(m *topicreader.Message) { body, _ := ioutil.ReadAll(m) writeToDB( m.Context(), m.SeqNo, body) } func writeToDB(ctx context.Context, id int64, body []byte) { } func readerConnect() *topicreader.Reader { panic("example stub") }
Index ¶
- Variables
- type Batch
- type CommitRangeGetter
- type Message
- type MessageContentUnmarshaler
- type ReadBatchOption
- type Reader
- func NewReader(internalReader topicreaderinternal.Reader) *Reader
- func (r *Reader) Close(ctx context.Context) error
- func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error
- func (r *Reader) ReadMessage(ctx context.Context) (*Message, error)
- func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error)
- type WithBatchMaxCount
- type WithBatchPreferMinCount
Examples ¶
- package (EffectiveUnmarshalMessageContentToJSONStruct)
- package (EffectiveUnmarshalMessageContentToOwnType)
- package (HandlePartitionHardOff_NeedRare)
- package (HandlePartitionSoftOff_NeedRare)
- package (ReadAndCommitEveryMessage)
- package (ReadBatchWithMessageCommits)
- package (ReadBatchesWithBatchCommit)
- package (ReadMessagesWithAsyncBufferedCommit)
- package (ReadMessagesWithCustomBatching)
- package (ReadWithExplicitPartitionStartStopHandler)
- package (ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage)
- package (ReadWithOwnReadProgressStorage)
- package (ReceiveCommitNotify)
- package (SimplePrintMessageContent)
- package (SimpleReadMessagesWithErrorHandle)
Variables ¶
ErrConcurrencyCall return if method on reader called in concurrency
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
var ErrUnexpectedCodec = topicreaderinternal.PublicErrUnexpectedCodec
ErrUnexpectedCodec will return if topicreader receive message with unknown codec. client side must check error with errors.Is
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
Types ¶
type Batch ¶
type Batch = topicreaderinternal.PublicBatch
Batch is group of ordered messages from one partition
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type CommitRangeGetter ¶
type CommitRangeGetter = topicreaderinternal.PublicCommitRangeGetter
CommitRangeGetter
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type Message ¶
type Message = topicreaderinternal.PublicMessage
Message
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type MessageContentUnmarshaler ¶
type MessageContentUnmarshaler = topicreaderinternal.PublicMessageContentUnmarshaler
MessageContentUnmarshaler
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type ReadBatchOption ¶
type ReadBatchOption = topicreaderinternal.PublicReadBatchOption
ReadBatchOption
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader allow to read message from YDB topics reader methods must not call concurrency
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func NewReader ¶
func NewReader(internalReader topicreaderinternal.Reader) *Reader
NewReader
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (*Reader) Close ¶
Close stop work with reader return when reader complete internal works, flush commit buffer, ets or when ctx cancelled
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (*Reader) Commit ¶
func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error
Commit receive Message, Batch of single offset
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (*Reader) ReadMessage ¶
ReadMessage read exactly one message exactly one of message, error is nil
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (*Reader) ReadMessageBatch ¶
ReadMessageBatch read batch of messages Batch is ordered message group from one partition exactly one of Batch, err is nil if Batch is not nil - reader guarantee about all Batch.Messages are not nil
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type WithBatchMaxCount ¶
type WithBatchMaxCount int
WithBatchMaxCount
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (WithBatchMaxCount) Apply ¶
func (count WithBatchMaxCount) Apply( options topicreaderinternal.ReadMessageBatchOptions, ) topicreaderinternal.ReadMessageBatchOptions
Apply
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type WithBatchPreferMinCount ¶
type WithBatchPreferMinCount int
WithBatchPreferMinCount set prefer min count for batch size. Sometime result batch can be less then count for example if internal buffer full and can't receive more messages or server stop send messages in partition
count must be 1 or greater it will panic if count < 1
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (WithBatchPreferMinCount) Apply ¶
func (count WithBatchPreferMinCount) Apply( options topicreaderinternal.ReadMessageBatchOptions, ) topicreaderinternal.ReadMessageBatchOptions
Apply
Experimental
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
Source Files ¶
batch_options.go errors.go reader.go
- Version
- v3.31.0
- Published
- Aug 1, 2022
- Platform
- darwin/amd64
- Imports
- 5 packages
- Last checked
- 36 seconds ago –
Tools for package owners.