package topic
import "github.com/ydb-platform/ydb-go-sdk/v3/topic"
Example (AlterTopic)¶
Code:play
package main import ( "context" "log" "os" ydb "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" ) func main() { ctx := context.TODO() connectionString := os.Getenv("YDB_CONNECTION_STRING") if connectionString == "" { connectionString = "grpc://localhost:2136/local" } db, err := ydb.Open(ctx, connectionString) if err != nil { log.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources err = db.Topic().Alter(ctx, "topic-path", topicoptions.AlterWithAddConsumers(topictypes.Consumer{ Name: "new-consumer", SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional }), ) if err != nil { log.Printf("failed alter topic: %v", err) return } }
Example (CreateTopic)¶
Code:play
package main import ( "context" "log" "os" ydb "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" ) func main() { ctx := context.TODO() connectionString := os.Getenv("YDB_CONNECTION_STRING") if connectionString == "" { connectionString = "grpc://localhost:2136/local" } db, err := ydb.Open(ctx, connectionString) if err != nil { log.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources err = db.Topic().Create(ctx, "topic-path", // optional topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip), // optional topicoptions.CreateWithMinActivePartitions(3), ) if err != nil { log.Printf("failed create topic: %v", err) return } }
Example (DescribeTopic)¶
Code:play
package main
import (
"context"
"fmt"
"log"
"os"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)
func main() {
ctx := context.TODO()
connectionString := os.Getenv("YDB_CONNECTION_STRING")
if connectionString == "" {
connectionString = "grpc://localhost:2136/local"
}
db, err := ydb.Open(ctx, connectionString)
if err != nil {
log.Printf("failed connect: %v", err)
return
}
defer db.Close(ctx) // cleanup resources
descResult, err := db.Topic().Describe(ctx, "topic-path")
if err != nil {
log.Printf("failed drop topic: %v", err)
return
}
fmt.Printf("describe: %#v\n", descResult)
}
Example (DropTopic)¶
Code:play
package main
import (
"context"
"log"
"os"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)
func main() {
ctx := context.TODO()
connectionString := os.Getenv("YDB_CONNECTION_STRING")
if connectionString == "" {
connectionString = "grpc://localhost:2136/local"
}
db, err := ydb.Open(ctx, connectionString)
if err != nil {
log.Printf("failed connect: %v", err)
return
}
defer db.Close(ctx) // cleanup resources
err = db.Topic().Drop(ctx, "topic-path")
if err != nil {
log.Printf("failed drop topic: %v", err)
return
}
}
Example (ReadMessage)¶
Code:play
package main
import (
"context"
"fmt"
"io"
"log"
"os"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
)
func main() {
ctx := context.TODO()
connectionString := os.Getenv("YDB_CONNECTION_STRING")
if connectionString == "" {
connectionString = "grpc://localhost:2136/local"
}
db, err := ydb.Open(ctx, connectionString)
if err != nil {
log.Printf("failed connect: %v", err)
return
}
defer db.Close(ctx) // cleanup resources
reader, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("/topic/path"))
if err != nil {
fmt.Printf("failed start reader: %v", err)
return
}
for {
mess, err := reader.ReadMessage(ctx)
if err != nil {
fmt.Printf("failed start reader: %v", err)
return
}
content, err := io.ReadAll(mess)
if err != nil {
fmt.Printf("failed start reader: %v", err)
return
}
fmt.Println(string(content))
}
}
Index ¶
Examples ¶
- package (AlterTopic)
- package (CreateTopic)
- package (DescribeTopic)
- package (DropTopic)
- package (ReadMessage)
Types ¶
type Client ¶
type Client interface { // Alter change topic options Alter(ctx context.Context, path string, opts ...topicoptions.AlterOption) error // Create topic Create(ctx context.Context, path string, opts ...topicoptions.CreateOption) error // Describe topic Describe(ctx context.Context, path string, opts ...topicoptions.DescribeOption) (topictypes.TopicDescription, error) // Drop topic Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error // StartListener starts read listen topic with the handler // it is fast non block call, connection starts in background // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental StartListener( consumer string, handler topiclistener.EventHandler, readSelectors topicoptions.ReadSelectors, opts ...topicoptions.ListenerOption, ) (*topiclistener.TopicListener, error) // StartReader start read messages from topic // it is fast non block call, connection starts in background StartReader( consumer string, readSelectors topicoptions.ReadSelectors, opts ...topicoptions.ReaderOption, ) (*topicreader.Reader, error) // StartWriter start write session to topic // it is fast non block call, connection starts in background StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error) // StartTransactionalWriter start writer for write messages within transaction // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental StartTransactionalWriter( tx tx.Identifier, topicpath string, opts ...topicoptions.WriterOption, ) (*topicwriter.TxWriter, error) }
Client is interface for topic client Attention: the interface may be extended in the future.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
topic/topiclistener | |
topic/topicoptions | |
topic/topicreader | Package topicreader provide Reader to receive messages from YDB topics More examples in examples repository |
topic/topicsugar | |
topic/topictypes | |
topic/topicwriter |
- Version
- v3.81.4
- Published
- Oct 2, 2024
- Platform
- darwin/amd64
- Imports
- 7 packages
- Last checked
- 1 second ago –
Tools for package owners.