package table

import "github.com/ydb-platform/ydb-go-sdk/v3/table"

Example (BulkUpsert)

Code:

{
	ctx := context.TODO()
	db, err := ydb.Open(ctx, "grpcs://localhost:2135/?database=/local")
	if err != nil {
		fmt.Printf("failed connect: %v", err)
		return
	}
	defer db.Close(ctx) // cleanup resources
	type logMessage struct {
		App       string
		Host      string
		Timestamp time.Time
		HTTPCode  uint32
		Message   string
	}
	// prepare native go data
	const batchSize = 10000
	logs := make([]logMessage, 0, batchSize)
	for i := 0; i < batchSize; i++ {
		message := logMessage{
			App:       fmt.Sprintf("App_%d", i/256),
			Host:      fmt.Sprintf("192.168.0.%d", i%256),
			Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)),
			HTTPCode:  200,
		}
		if i%2 == 0 {
			message.Message = "GET / HTTP/1.1"
		} else {
			message.Message = "GET /images/logo.png HTTP/1.1"
		}
		logs = append(logs, message)
	}
	// execute bulk upsert with native ydb data
	err = db.Table().Do( // Do retry operation on errors with best effort
		ctx, // context manage exiting from Do
		func(ctx context.Context, s table.Session) (err error) { // retry operation
			rows := make([]types.Value, 0, len(logs))
			for _, msg := range logs {
				rows = append(rows, types.StructValue(
					types.StructFieldValue("App", types.UTF8Value(msg.App)),
					types.StructFieldValue("Host", types.UTF8Value(msg.Host)),
					types.StructFieldValue("Timestamp", types.TimestampValueFromTime(msg.Timestamp)),
					types.StructFieldValue("HTTPCode", types.Uint32Value(msg.HTTPCode)),
					types.StructFieldValue("Message", types.UTF8Value(msg.Message)),
				))
			}
			return s.BulkUpsert(ctx, "/local/bulk_upsert_example", types.ListValue(rows...))
		},
	)
	if err != nil {
		fmt.Printf("unexpected error: %v", err)
	}
}
Example (CreateTable)

Code:

{
	ctx := context.TODO()
	db, err := ydb.Open(ctx, "grpcs://localhost:2135/?database=/local")
	if err != nil {
		fmt.Printf("failed connect: %v", err)
		return
	}
	defer db.Close(ctx) // cleanup resources
	err = db.Table().Do(
		ctx,
		func(ctx context.Context, s table.Session) (err error) {
			return s.CreateTable(ctx, path.Join(db.Name(), "series"),
				options.WithColumn("series_id", types.Optional(types.TypeUint64)),
				options.WithColumn("title", types.Optional(types.TypeUTF8)),
				options.WithColumn("series_info", types.Optional(types.TypeUTF8)),
				options.WithColumn("release_date", types.Optional(types.TypeDate)),
				options.WithColumn("comment", types.Optional(types.TypeUTF8)),
				options.WithPrimaryKeyColumn("series_id"),
			)
		},
	)
	if err != nil {
		fmt.Printf("unexpected error: %v", err)
	}
}
Example (Select)

Code:

{
	ctx := context.TODO()
	db, err := ydb.Open(ctx, "grpcs://localhost:2135/?database=/local")
	if err != nil {
		fmt.Printf("failed connect: %v", err)
		return
	}
	defer db.Close(ctx) // cleanup resources
	var (
		query = `SELECT 42 as id, "my string" as myStr`
		id    int32  // required value
		myStr string // optional value
	)
	err = db.Table().Do( // Do retry operation on errors with best effort
		ctx, // context manage exiting from Do
		func(ctx context.Context, s table.Session) (err error) { // retry operation
			_, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil)
			if err != nil {
				return err // for auto-retry with driver
			}
			defer res.Close()                                // cleanup resources
			if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it
				return err // for auto-retry with driver
			}
			for res.NextRow() { // iterate over rows
				err = res.ScanNamed(
					named.Required("id", &id),
					named.OptionalWithDefault("myStr", &myStr),
				)
				if err != nil {
					return err // generally scan error not retryable, return it for driver check error
				}
				fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
			}
			return res.Err() // return finally result error for auto-retry with driver
		},
	)
	if err != nil {
		fmt.Printf("unexpected error: %v", err)
	}
}

Index

Examples

Types

type Client

type Client interface {
	closer.Closer

	// CreateSession returns session or error for manually control of session lifecycle
	// CreateSession do not provide retry loop for failed create session requests.
	// Best effort policy may be implements with outer retry loop includes CreateSession call
	CreateSession(ctx context.Context, opts ...Option) (s ClosableSession, err error)

	// Do provide the best effort for execute operation
	// Do implements internal busy loop until one of the following conditions is met:
	// - deadline was canceled or deadlined
	// - retry operation returned nil as error
	// Warning: if context without deadline or cancellation func than Do can run indefinitely
	Do(ctx context.Context, op Operation, opts ...Option) error

	// DoTx provide the best effort for execute transaction
	// DoTx implements internal busy loop until one of the following conditions is met:
	// - deadline was canceled or deadlined
	// - retry operation returned nil as error
	// DoTx makes auto begin, commit and rollback of transaction
	// If op TxOperation returns nil - transaction will be committed
	// If op TxOperation return non nil - transaction will be rollback
	// Warning: if context without deadline or cancellation func than DoTx can run indefinitely
	DoTx(ctx context.Context, op TxOperation, opts ...Option) error
}

type ClosableSession

type ClosableSession interface {
	closer.Closer
	Session
}

type DataQuery

type DataQuery interface {
	String() string
	ID() string
	YQL() string
}

DataQuery only for tracers

type DataQueryExplanation

type DataQueryExplanation struct {
	Explanation

	AST string
}

DataQueryExplanation is a result of ExplainDataQuery call.

type Explanation

type Explanation struct {
	Plan string
}

Explanation is a result of Explain calls.

type Operation

type Operation func(ctx context.Context, s Session) error

Operation is the interface that holds an operation for retry. if Operation returns not nil - operation will retry if Operation returns nil - retry loop will break

type Option

type Option func(o *Options)

func WithIdempotent

func WithIdempotent() Option

func WithTrace

func WithTrace(t trace.Table) Option

func WithTxCommitOptions

func WithTxCommitOptions(opts ...options.CommitTransactionOption) Option

func WithTxSettings

func WithTxSettings(tx *TransactionSettings) Option

type Options

type Options struct {
	Idempotent      bool
	TxSettings      *TransactionSettings
	TxCommitOptions []options.CommitTransactionOption
	FastBackoff     backoff.Backoff
	SlowBackoff     backoff.Backoff
	Trace           trace.Table
}

type ParameterOption

type ParameterOption func(queryParams)

func ValueParam

func ValueParam(name string, v types.Value) ParameterOption

type QueryParameters

type QueryParameters struct {
	// contains filtered or unexported fields
}

func NewQueryParameters

func NewQueryParameters(opts ...ParameterOption) *QueryParameters

func (*QueryParameters) Add

func (q *QueryParameters) Add(opts ...ParameterOption)

func (*QueryParameters) Each

func (q *QueryParameters) Each(it func(name string, v types.Value))

func (*QueryParameters) Params

func (q *QueryParameters) Params() queryParams

func (*QueryParameters) String

func (q *QueryParameters) String() string

type ScriptingYQLExplanation

type ScriptingYQLExplanation struct {
	Explanation

	ParameterTypes map[string]types.Type
}

ScriptingYQLExplanation is a result of Explain calls.

type Session

type Session interface {
	SessionInfo

	CreateTable(
		ctx context.Context,
		path string,
		opts ...options.CreateTableOption,
	) (err error)

	DescribeTable(
		ctx context.Context,
		path string,
		opts ...options.DescribeTableOption,
	) (desc options.Description, err error)

	DropTable(
		ctx context.Context,
		path string,
		opts ...options.DropTableOption,
	) (err error)

	AlterTable(
		ctx context.Context,
		path string,
		opts ...options.AlterTableOption,
	) (err error)

	CopyTable(
		ctx context.Context,
		dst, src string,
		opts ...options.CopyTableOption,
	) (err error)

	Explain(
		ctx context.Context,
		query string,
	) (exp DataQueryExplanation, err error)

	// Prepare prepares query for executing in the future
	//
	// Deprecated: use Execute with KeepInCache policy option
	Prepare(
		ctx context.Context,
		query string,
	) (stmt Statement, err error)

	Execute(
		ctx context.Context,
		tx *TransactionControl,
		query string,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (txr Transaction, r result.Result, err error)

	ExecuteSchemeQuery(
		ctx context.Context,
		query string,
		opts ...options.ExecuteSchemeQueryOption,
	) (err error)

	DescribeTableOptions(
		ctx context.Context,
	) (desc options.TableOptionsDescription, err error)

	StreamReadTable(
		ctx context.Context,
		path string,
		opts ...options.ReadTableOption,
	) (r result.StreamResult, err error)

	StreamExecuteScanQuery(
		ctx context.Context,
		query string,
		params *QueryParameters,
		opts ...options.ExecuteScanQueryOption,
	) (_ result.StreamResult, err error)

	BulkUpsert(
		ctx context.Context,
		table string,
		rows types.Value,
	) (err error)

	BeginTransaction(
		ctx context.Context,
		tx *TransactionSettings,
	) (x Transaction, err error)

	KeepAlive(
		ctx context.Context,
	) error
}

type SessionInfo

type SessionInfo interface {
	ID() string
}

type Statement

type Statement interface {
	Execute(
		ctx context.Context,
		tx *TransactionControl,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (txr Transaction, r result.Result, err error)
	NumInput() int
	Text() string
}

type Transaction

type Transaction interface {
	TransactionActor

	CommitTx(
		ctx context.Context,
		opts ...options.CommitTransactionOption,
	) (r result.Result, err error)
	Rollback(
		ctx context.Context,
	) (err error)
}

type TransactionActor

type TransactionActor interface {
	TransactionIdentifier

	Execute(
		ctx context.Context,
		query string,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (result.Result, error)
	ExecuteStatement(
		ctx context.Context,
		stmt Statement,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (result.Result, error)
}

type TransactionControl

type TransactionControl struct {
	// contains filtered or unexported fields
}

func DefaultTxControl

func DefaultTxControl() *TransactionControl

DefaultTxControl returns default transaction control with serializable read-write isolation mode and auto-commit

func TxControl

func TxControl(opts ...TxControlOption) *TransactionControl

TxControl makes transaction control from given options

func (*TransactionControl) Desc

type TransactionIdentifier

type TransactionIdentifier interface {
	ID() string
}

type TransactionSettings

type TransactionSettings struct {
	// contains filtered or unexported fields
}

func TxSettings

func TxSettings(opts ...TxOption) *TransactionSettings

TxSettings returns transaction settings

func (*TransactionSettings) Settings

type TxControlOption

type TxControlOption func(*txControlDesc)

func BeginTx

func BeginTx(opts ...TxOption) TxControlOption

BeginTx returns begin transaction control option

func CommitTx

func CommitTx() TxControlOption

CommitTx returns commit transaction control option

func WithTx

func WithTx(t Transaction) TxControlOption

type TxOnlineReadOnlyOption

type TxOnlineReadOnlyOption func(*txOnlineReadOnly)

func WithInconsistentReads

func WithInconsistentReads() TxOnlineReadOnlyOption

type TxOperation

type TxOperation func(ctx context.Context, tx TransactionActor) error

TxOperation is the interface that holds an operation for retry. if TxOperation returns not nil - operation will retry if TxOperation returns nil - retry loop will break

type TxOption

type TxOption func(*txDesc)

Transaction control options

func WithOnlineReadOnly

func WithOnlineReadOnly(opts ...TxOnlineReadOnlyOption) TxOption

func WithSerializableReadWrite

func WithSerializableReadWrite() TxOption

func WithStaleReadOnly

func WithStaleReadOnly() TxOption

Source Files

table.go

Directories

PathSynopsis
table/options
table/result
table/result/indexed
table/result/named
table/stats
table/types
Version
v3.26.1
Published
May 22, 2022
Platform
linux/amd64
Imports
11 packages
Last checked
30 minutes ago

Tools for package owners.