package topic

import "github.com/ydb-platform/ydb-go-sdk/v3/topic"

Example (AlterTopic)

Code:play 

package main

import (
	"context"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	err = db.Topic().Alter(ctx, "topic-path",
		topicoptions.AlterWithAddConsumers(topictypes.Consumer{
			Name:            "new-consumer",
			SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional
		}),
	)
	if err != nil {
		log.Printf("failed alter topic: %v", err)

		return
	}
}
Example (CreateTopic)

Code:play 

package main

import (
	"context"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	err = db.Topic().Create(ctx, "topic-path",

		// optional
		topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),

		// optional
		topicoptions.CreateWithMinActivePartitions(3),
	)
	if err != nil {
		log.Printf("failed create topic: %v", err)

		return
	}
}
Example (DescribeTopic)

Code:play 

package main

import (
	"context"
	"fmt"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	descResult, err := db.Topic().Describe(ctx, "topic-path")
	if err != nil {
		log.Printf("failed drop topic: %v", err)

		return
	}
	fmt.Printf("describe: %#v\n", descResult)
}
Example (DropTopic)

Code:play 

package main

import (
	"context"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	err = db.Topic().Drop(ctx, "topic-path")
	if err != nil {
		log.Printf("failed drop topic: %v", err)

		return
	}
}
Example (ReadMessage)

Code:play 

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	reader, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("/topic/path"))
	if err != nil {
		fmt.Printf("failed start reader: %v", err)

		return
	}

	for {
		mess, err := reader.ReadMessage(ctx)
		if err != nil {
			fmt.Printf("failed start reader: %v", err)

			return
		}

		content, err := io.ReadAll(mess)
		if err != nil {
			fmt.Printf("failed start reader: %v", err)

			return
		}
		fmt.Println(string(content))
	}
}

Index

Examples

Types

type Client

type Client interface {
	// Alter change topic options
	Alter(ctx context.Context, path string, opts ...topicoptions.AlterOption) error

	// Create topic
	Create(ctx context.Context, path string, opts ...topicoptions.CreateOption) error

	// Describe topic
	Describe(ctx context.Context, path string, opts ...topicoptions.DescribeOption) (topictypes.TopicDescription, error)

	// Drop topic
	Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error

	// StartReader start read messages from topic
	// it is fast non block call, connection starts in background
	StartReader(
		consumer string,
		readSelectors topicoptions.ReadSelectors,
		opts ...topicoptions.ReaderOption,
	) (*topicreader.Reader, error)

	// StartWriter start write session to topic
	// it is fast non block call, connection starts in background
	StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error)
}

Client is interface for topic client Attention: the interface may be extended in the future.

Source Files

client.go

Directories

PathSynopsis
topic/topicoptions
topic/topicreaderPackage topicreader provide Reader to receive messages from YDB topics More examples in examples repository
topic/topicsugar
topic/topictypes
topic/topicwriter
Version
v3.57.4
Published
Mar 12, 2024
Platform
windows/amd64
Imports
5 packages
Last checked
38 seconds ago

Tools for package owners.