package dbutil
import "go.mau.fi/util/dbutil"
Index ¶
- Constants
- Variables
- func ConvertedPtr[Input Zeroable, Output any](val Input, converter func(Input) Output) *Output
- func DangerousInternalUpgradeVersionTable(ctx context.Context, db *Database) error
- func NumPtr[T constraints.Integer | constraints.Float](val T) *T
- func RowIterAsMap[T any, Key comparable, Value any](ri RowIter[T], getKeyValue func(T) (Key, Value)) (map[Key]Value, error)
- func ScanDataStruct[T NewableDataStruct[T]](rows Scannable) (T, error)
- func ScanSingleColumn[T any](rows Scannable) (val T, err error)
- func StrPtr[T ~string](val T) *string
- func UnixMilliPtr(val time.Time) *int64
- func UnixPtr(val time.Time) *int64
- func UntypedNil[T any](val *T) any
- func ValueOrErr[T any](val *T, err error) (*T, error)
- type Array
- type Config
- type Conn
- type ConvertRowFn
- type DataStruct
- type Database
- func NewFromConfig(owner string, cfg Config, logger DatabaseLogger) (*Database, error)
- func NewWithDB(db *sql.DB, rawDialect string) (*Database, error)
- func NewWithDialect(uri, rawDialect string) (*Database, error)
- func (db *Database) AcquireConn(ctx context.Context) (Conn, error)
- func (db *Database) BeginTx(ctx context.Context, opts *TxnOptions) (*LoggingTxn, error)
- func (db *Database) Child(versionTable string, upgradeTable UpgradeTable, log DatabaseLogger) *Database
- func (db *Database) Close() error
- func (db *Database) ColumnExists(ctx context.Context, table, column string) (exists bool, err error)
- func (db *Database) Configure(cfg Config) error
- func (db *Database) DoSQLiteTransactionWithoutForeignKeys(ctx context.Context, doUpgrade func(context.Context) error) error
- func (db *Database) DoTxn(ctx context.Context, opts *TxnOptions, fn func(ctx context.Context) error) error
- func (db *Database) Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (db *Database) Execable(ctx context.Context) Execable
- func (db *Database) Internals() *publishDatabaseInternals
- func (db *Database) Query(ctx context.Context, query string, args ...any) (Rows, error)
- func (db *Database) QueryRow(ctx context.Context, query string, args ...any) *sql.Row
- func (db *Database) TableExists(ctx context.Context, table string) (exists bool, err error)
- func (db *Database) Upgrade(ctx context.Context) error
- type DatabaseLogger
- func ZeroLogger(log zerolog.Logger, cfg ...ZeroLogSettings) DatabaseLogger
- func ZeroLoggerPtr(log *zerolog.Logger, cfg ...ZeroLogSettings) DatabaseLogger
- type Dialect
- type Execable
- type JSON
- func JSONPtr[T any](val *T) JSON
- func (j JSON) Scan(i any) error
- func (j JSON) Value() (driver.Value, error)
- type LoggingExecable
- func (le *LoggingExecable) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (le *LoggingExecable) QueryContext(ctx context.Context, query string, args ...any) (Rows, error)
- func (le *LoggingExecable) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type LoggingRows
- func (lrs *LoggingRows) Close() error
- func (lrs *LoggingRows) ColumnTypes() ([]*sql.ColumnType, error)
- func (lrs *LoggingRows) Columns() ([]string, error)
- func (lrs *LoggingRows) Err() error
- func (lrs *LoggingRows) Next() bool
- func (lrs *LoggingRows) NextResultSet() bool
- func (lrs *LoggingRows) Scan(dest ...any) error
- type LoggingTxn
- type MassInsertBuilder
- func NewMassInsertBuilder[Item MassInsertable[DynamicParams], StaticParams Array, DynamicParams Array]( singleInsertQuery, placeholderTemplate string, ) *MassInsertBuilder[Item, StaticParams, DynamicParams]
- func (mib *MassInsertBuilder[Item, StaticParams, DynamicParams]) Build(static StaticParams, data []Item) (query string, params []any)
- type MassInsertable
- type NewableDataStruct
- type PQErrorWithLine
- type PoolConfig
- type QueryHelper
- func MakeQueryHelper[T DataStruct[T]](db *Database, new func(qh *QueryHelper[T]) T) *QueryHelper[T]
- func (qh *QueryHelper[T]) Exec(ctx context.Context, query string, args ...any) error
- func (qh *QueryHelper[T]) GetDB() *Database
- func (qh *QueryHelper[T]) New() T
- func (qh *QueryHelper[T]) QueryMany(ctx context.Context, query string, args ...any) ([]T, error)
- func (qh *QueryHelper[T]) QueryManyIter(ctx context.Context, query string, args ...any) RowIter[T]
- func (qh *QueryHelper[T]) QueryOne(ctx context.Context, query string, args ...any) (val T, err error)
- type RowIter
- func NewRowIter[T any](rows Rows, convertFn ConvertRowFn[T]) RowIter[T]
- func NewRowIterWithError[T any](rows Rows, convertFn ConvertRowFn[T], err error) RowIter[T]
- func NewSimpleReflectRowIter[T any](rows Rows, err error) RowIter[*T]
- func NewSliceIter[T any](items []T) RowIter[T]
- func NewSliceIterWithError[T any](items []T, err error) RowIter[T]
- type Rows
- type Scannable
- type Transaction
- type TxnMode
- type TxnOptions
- type UnderlyingExecable
- type UnderlyingExecutableWithTx
- type UpgradeTable
- func (ut *UpgradeTable) Register(from, to, compat int, message string, txn TxnMode, fn upgradeFunc)
- func (ut *UpgradeTable) RegisterFS(fs fullFS)
- func (ut *UpgradeTable) RegisterFSPath(fs fullFS, dir string)
- type ZeroLogSettings
- type Zeroable
Constants ¶
const (
ContextKeyDoTxnCallerSkip contextKey = 1
)
Variables ¶
var ( ErrTxn = errors.New("transaction") ErrTxnBegin = fmt.Errorf("%w: begin", ErrTxn) ErrTxnCommit = fmt.Errorf("%w: commit", ErrTxn) )
var ErrAcquireDeadlock = errors.New("attempt to acquire connection without context in goroutine with transaction")
var ErrTransactionDeadlock = errors.New("attempt to start new transaction in goroutine with transaction")
var ForceDeadlockDetection bool
Functions ¶
func ConvertedPtr ¶
ConvertedPtr returns a pointer to the converted version of the given value, or nil if the input is zero.
This is primarily meant for time.Time, but it can be used with any type that has implements `IsZero() bool`.
yourTime := time.Now() unixMSPtr := dbutil.TimePtr(yourTime, time.Time.UnixMilli)
func DangerousInternalUpgradeVersionTable ¶
func NumPtr ¶
func NumPtr[T constraints.Integer | constraints.Float](val T) *T
NumPtr returns a pointer to the given number, or nil if the number is zero.
func RowIterAsMap ¶
func RowIterAsMap[T any, Key comparable, Value any](ri RowIter[T], getKeyValue func(T) (Key, Value)) (map[Key]Value, error)
func ScanDataStruct ¶
func ScanDataStruct[T NewableDataStruct[T]](rows Scannable) (T, error)
func ScanSingleColumn ¶
func StrPtr ¶
StrPtr returns a pointer to the given string, or nil if the string is empty.
func UnixMilliPtr ¶
UnixMilliPtr returns a pointer to the given time as unix milliseconds, or nil if the time is zero.
func UnixPtr ¶
UnixPtr returns a pointer to the given time as unix seconds, or nil if the time is zero.
func UntypedNil ¶
func ValueOrErr ¶
ValueOrErr is a helper function that returns the value if err is nil, or returns nil and the error if err is not nil. It can be used to avoid `if err != nil { return nil, err }` boilerplate in certain cases like DataStruct.Scan implementations.
Types ¶
type Array ¶
type Array interface { [0]any | [1]any | [2]any | [3]any | [4]any | [5]any | [6]any | [7]any | [8]any | [9]any | [10]any | [11]any | [12]any | [13]any | [14]any | [15]any | [16]any | [17]any | [18]any | [19]any | [20]any | [21]any | [22]any | [23]any | [24]any | [25]any | [26]any | [27]any | [28]any | [29]any }
Array is an interface for small fixed-size arrays. It exists because generics can't specify array sizes: https://github.com/golang/go/issues/44253
type Config ¶
type Config struct { PoolConfig `yaml:",inline"` ReadOnlyPool PoolConfig `yaml:"ro_pool"` DeadlockDetection bool `yaml:"deadlock_detection"` }
type Conn ¶
type Conn interface { Execable // contains filtered or unexported methods }
type ConvertRowFn ¶
func (ConvertRowFn[T]) NewRowIter ¶
func (crf ConvertRowFn[T]) NewRowIter(rows Rows, err error) RowIter[T]
NewRowIter is a proxy for NewRowIterWithError for more convenient usage.
For example:
func exampleConvertRowFn(rows Scannable) (*YourType, error) { ... } func exampleFunction() { iter := dbutil.ConvertRowFn[*YourType](exampleConvertRowFn).NewRowIter( db.Query("SELECT ..."), ) }
type DataStruct ¶
DataStruct is an interface for structs that represent a single database row.
type Database ¶
type Database struct { LoggingDB loggingDB RawDB *sql.DB ReadOnlyDB *sql.DB Owner string VersionTable string Log DatabaseLogger Dialect Dialect UpgradeTable UpgradeTable IgnoreForeignTables bool IgnoreUnsupportedDatabase bool DeadlockDetection bool // contains filtered or unexported fields }
func NewFromConfig ¶
func NewFromConfig(owner string, cfg Config, logger DatabaseLogger) (*Database, error)
func NewWithDB ¶
func NewWithDialect ¶
func (*Database) AcquireConn ¶
func (*Database) BeginTx ¶
func (db *Database) BeginTx(ctx context.Context, opts *TxnOptions) (*LoggingTxn, error)
func (*Database) Child ¶
func (db *Database) Child(versionTable string, upgradeTable UpgradeTable, log DatabaseLogger) *Database
func (*Database) Close ¶
func (*Database) ColumnExists ¶
func (db *Database) ColumnExists(ctx context.Context, table, column string) (exists bool, err error)
func (*Database) Configure ¶
func (*Database) DoSQLiteTransactionWithoutForeignKeys ¶
func (db *Database) DoSQLiteTransactionWithoutForeignKeys(ctx context.Context, doUpgrade func(context.Context) error) error
func (*Database) DoTxn ¶
func (db *Database) DoTxn(ctx context.Context, opts *TxnOptions, fn func(ctx context.Context) error) error
func (*Database) Exec ¶
func (*Database) Execable ¶
func (*Database) Internals ¶
func (db *Database) Internals() *publishDatabaseInternals
func (*Database) Query ¶
func (*Database) QueryRow ¶
func (*Database) TableExists ¶
func (*Database) Upgrade ¶
type DatabaseLogger ¶
type DatabaseLogger interface { QueryTiming(ctx context.Context, method, query string, args []any, nrows int, duration time.Duration, err error) WarnUnsupportedVersion(current, compat, latest int) PrepareUpgrade(current, compat, latest int) DoUpgrade(from, to int, message string, txn TxnMode) // Deprecated: legacy warning method, return errors instead Warn(msg string, args ...any) }
var NoopLogger DatabaseLogger = &noopLogger{}
func ZeroLogger ¶
func ZeroLogger(log zerolog.Logger, cfg ...ZeroLogSettings) DatabaseLogger
func ZeroLoggerPtr ¶
func ZeroLoggerPtr(log *zerolog.Logger, cfg ...ZeroLogSettings) DatabaseLogger
type Dialect ¶
type Dialect int
func ParseDialect ¶
func (Dialect) String ¶
type Execable ¶
type Execable interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row }
type JSON ¶
type JSON struct { Data any }
JSON is a utility type for using arbitrary JSON data as values in database Exec and Scan calls.
func JSONPtr ¶
JSONPtr is a convenience function for wrapping a pointer to a value in the JSON utility, but removing typed nils (i.e. preventing nils from turning into the string "null" in the database).
func (JSON) Scan ¶
func (JSON) Value ¶
type LoggingExecable ¶
type LoggingExecable struct { UnderlyingExecable UnderlyingExecable // contains filtered or unexported fields }
LoggingExecable is a wrapper for anything with database Exec methods (i.e. sql.Conn, sql.DB and sql.Tx) that can preprocess queries (e.g. replacing $ with ? on SQLite) and log query durations.
func (*LoggingExecable) ExecContext ¶
func (le *LoggingExecable) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
func (*LoggingExecable) QueryContext ¶
func (le *LoggingExecable) QueryContext(ctx context.Context, query string, args ...any) (Rows, error)
func (*LoggingExecable) QueryRowContext ¶
type LoggingRows ¶
type LoggingRows struct {
// contains filtered or unexported fields
}
func (*LoggingRows) Close ¶
func (lrs *LoggingRows) Close() error
func (*LoggingRows) ColumnTypes ¶
func (lrs *LoggingRows) ColumnTypes() ([]*sql.ColumnType, error)
func (*LoggingRows) Columns ¶
func (lrs *LoggingRows) Columns() ([]string, error)
func (*LoggingRows) Err ¶
func (lrs *LoggingRows) Err() error
func (*LoggingRows) Next ¶
func (lrs *LoggingRows) Next() bool
func (*LoggingRows) NextResultSet ¶
func (lrs *LoggingRows) NextResultSet() bool
func (*LoggingRows) Scan ¶
func (lrs *LoggingRows) Scan(dest ...any) error
type LoggingTxn ¶
type LoggingTxn struct { LoggingExecable UnderlyingTx *sql.Tx StartTime time.Time EndTime time.Time // contains filtered or unexported fields }
func (*LoggingTxn) Commit ¶
func (lt *LoggingTxn) Commit() error
func (*LoggingTxn) Rollback ¶
func (lt *LoggingTxn) Rollback() error
type MassInsertBuilder ¶
type MassInsertBuilder[Item MassInsertable[DynamicParams], StaticParams Array, DynamicParams Array] struct { // contains filtered or unexported fields }
MassInsertBuilder contains pre-validated templates for building mass insert SQL queries.
func NewMassInsertBuilder ¶
func NewMassInsertBuilder[Item MassInsertable[DynamicParams], StaticParams Array, DynamicParams Array]( singleInsertQuery, placeholderTemplate string, ) *MassInsertBuilder[Item, StaticParams, DynamicParams]
NewMassInsertBuilder creates a new MassInsertBuilder that can build mass insert database queries.
Parameters in mass insert queries are split into two types: static parameters and dynamic parameters. Static parameters are the same for all items being inserted, while dynamic parameters are different for each item.
The given query should be a normal INSERT query for a single row. It can also have ON CONFLICT clauses, as long as the clause uses `excluded` instead of positional parameters.
The placeholder template is used to replace the `VALUES` part of the given query. It should contain a positional placeholder ($1, $2, ...) for each static placeholder, and a fmt directive (`$%d`) for each dynamic placeholder.
The given query and placeholder template are validated here and the function will panic if they're invalid (e.g. if the `VALUES` part of the insert query can't be found, or if the placeholder template doesn't have the right things). The idea is to use this function to populate a global variable with the mass insert builder, so the panic will happen at startup if the query or placeholder template are invalid (instead of returning an error when trying to use the query later).
Example:
type Message struct { ChatID int RemoteID string MXID id.EventID Timestamp time.Time } func (msg *Message) GetMassInsertValues() [3]any { return [3]any{msg.RemoteID, msg.MXID, msg.Timestamp.UnixMilli()} } const insertMessageQuery = `INSERT INTO message (chat_id, remote_id, mxid, timestamp) VALUES ($1, $2, $3, $4)` var massInsertMessageBuilder = dbutil.NewMassInsertBuilder[Message, [2]any](insertMessageQuery, "($1, $%d, $%d, $%d, $%d)") func DoMassInsert(ctx context.Context, messages []*Message) error { query, params := massInsertMessageBuilder.Build([1]any{messages[0].ChatID}, messages) return db.Exec(ctx, query, params...) }
func (*MassInsertBuilder[Item, StaticParams, DynamicParams]) Build ¶
func (mib *MassInsertBuilder[Item, StaticParams, DynamicParams]) Build(static StaticParams, data []Item) (query string, params []any)
Build constructs a ready-to-use mass insert SQL query using the prepared templates in this builder.
This method always only produces one query. If there are lots of items, chunking them beforehand may be required to avoid query parameter limits. For example, SQLite (3.32+) has a limit of 32766 parameters by default, while Postgres allows up to 65535. To find out if there are too many items, divide the maximum number of parameters by the number of dynamic columns in your data and subtract the number of static columns.
Example of chunking input data:
var mib dbutil.MassInsertBuilder var db *dbutil.Database func MassInsert(ctx context.Context, ..., data []T) error { return db.DoTxn(ctx, nil, func(ctx context.Context) error { for _, chunk := range exslices.Chunk(data, 100) { query, params := mib.Build(staticParams) _, err := db.Exec(ctx, query, params...) if err != nil { return err } } return nil } }
type MassInsertable ¶
type MassInsertable[T Array] interface { GetMassInsertValues() T }
MassInsertable represents a struct that contains dynamic values for a mass insert query.
type NewableDataStruct ¶
type NewableDataStruct[T any] interface { DataStruct[T] New() T }
type PQErrorWithLine ¶
func (*PQErrorWithLine) Error ¶
func (pqe *PQErrorWithLine) Error() string
func (*PQErrorWithLine) Unwrap ¶
func (pqe *PQErrorWithLine) Unwrap() error
type PoolConfig ¶
type PoolConfig struct { Type string `yaml:"type"` URI string `yaml:"uri"` MaxOpenConns int `yaml:"max_open_conns"` MaxIdleConns int `yaml:"max_idle_conns"` ConnMaxIdleTime string `yaml:"conn_max_idle_time"` ConnMaxLifetime string `yaml:"conn_max_lifetime"` }
type QueryHelper ¶
type QueryHelper[T DataStruct[T]] struct { // contains filtered or unexported fields }
QueryHelper is a generic helper struct for SQL query execution boilerplate.
After implementing the Scan and Init methods in a data struct, the query helper allows writing query functions in a single line.
func MakeQueryHelper ¶
func MakeQueryHelper[T DataStruct[T]](db *Database, new func(qh *QueryHelper[T]) T) *QueryHelper[T]
func (*QueryHelper[T]) Exec ¶
Exec executes a query with ExecContext and returns the error.
It omits the sql.Result return value, as it is rarely used. When the result is wanted, use `qh.GetDB().Exec(...)` instead, which is otherwise equivalent.
func (*QueryHelper[T]) GetDB ¶
func (qh *QueryHelper[T]) GetDB() *Database
func (*QueryHelper[T]) New ¶
func (qh *QueryHelper[T]) New() T
func (*QueryHelper[T]) QueryMany ¶
QueryMany executes a query with QueryContext, uses the associated DataStruct to scan each row, and returns the values. If the query returns no rows, it returns a non-nil zero-length slice and no error.
func (*QueryHelper[T]) QueryManyIter ¶
QueryManyIter executes a query with QueryContext and returns a RowIter that will use the associated DataStruct to scan each row.
func (*QueryHelper[T]) QueryOne ¶
func (qh *QueryHelper[T]) QueryOne(ctx context.Context, query string, args ...any) (val T, err error)
QueryOne executes a query with QueryRowContext, uses the associated DataStruct to scan it, and returns the value. If the query returns no rows, it returns nil and no error.
type RowIter ¶
type RowIter[T any] interface { // Iter iterates over the rows and calls the given function for each row. // // If the function returns false, the iteration is stopped. // If the function returns an error, the iteration is stopped and the error is // returned. Iter(func(T) (bool, error)) error // AsList collects all rows into a slice. AsList() ([]T, error) }
RowIter is a wrapper for Rows that allows conveniently iterating over rows with a predefined scanner function.
func NewRowIter ¶
func NewRowIter[T any](rows Rows, convertFn ConvertRowFn[T]) RowIter[T]
NewRowIter creates a new RowIter from the given Rows and scanner function.
Deprecated: use NewRowIterWithError instead to avoid an unnecessary separate error check on the Query result.
Instead of
func DoQuery(...) (dbutil.RowIter, error) { rows, err := db.Query(...) if err != nil { return nil, err } return dbutil.NewRowIter(rows, convertFn), nil }
you should use
func DoQuery(...) dbutil.RowIter { rows, err := db.Query(...) return dbutil.NewRowIterWithError(rows, convertFn, err) }
or alternatively pre-wrap the convertFn
var converter = dbutil.ConvertRowFn(convertFn) func DoQuery(...) dbutil.RowIter { return converter.NewRowIter(db.Query(...)) }
Embedding the error in the iterator allows the caller to do only one error check instead of two:
iter, err := DoQuery(...) if err != nil { ... } result, err := iter.Iter(...) if err != nil { ... }
vs
result, err := DoQuery(...).Iter(...) if err != nil { ... }
func NewRowIterWithError ¶
func NewRowIterWithError[T any](rows Rows, convertFn ConvertRowFn[T], err error) RowIter[T]
NewRowIterWithError creates a new RowIter from the given Rows and scanner function with default error. If not nil, it will be returned without calling iterator function.
func NewSimpleReflectRowIter ¶
NewSimpleReflectRowIter creates a new RowIter that uses reflection to scan rows into the given type.
This is a simplified implementation that always scans to all struct fields. It does not support any kind of struct tags.
func NewSliceIter ¶
func NewSliceIterWithError ¶
type Rows ¶
type Rows interface { Close() error ColumnTypes() ([]*sql.ColumnType, error) Columns() ([]string, error) Err() error Next() bool NextResultSet() bool Scan(...any) error }
type Scannable ¶
type Transaction ¶
type TxnMode ¶
type TxnMode string
const ( TxnModeOn TxnMode = "on" TxnModeOff TxnMode = "off" TxnModeSQLiteForeignKeysOff TxnMode = "sqlite-fkey-off" )
type TxnOptions ¶
type TxnOptions struct { Isolation sql.IsolationLevel ReadOnly bool Conn Conn RetryBegin func(error, int) bool }
type UnderlyingExecable ¶
type UnderlyingExecable interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row }
type UnderlyingExecutableWithTx ¶
type UnderlyingExecutableWithTx interface { UnderlyingExecable BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) }
type UpgradeTable ¶
type UpgradeTable []upgrade
func (*UpgradeTable) Register ¶
func (ut *UpgradeTable) Register(from, to, compat int, message string, txn TxnMode, fn upgradeFunc)
func (*UpgradeTable) RegisterFS ¶
func (ut *UpgradeTable) RegisterFS(fs fullFS)
func (*UpgradeTable) RegisterFSPath ¶
func (ut *UpgradeTable) RegisterFSPath(fs fullFS, dir string)
type ZeroLogSettings ¶
type ZeroLogSettings struct { CallerSkipFrame int Caller bool // TraceLogAllQueries specifies whether or not all queries should be logged // at the TRACE level. TraceLogAllQueries bool }
type Zeroable ¶
type Zeroable interface { IsZero() bool }
Source Files ¶
connlog.go database.go iter.go json.go log.go massinsert.go queryhelper.go reflectscan.go transaction.go upgrades.go upgradetable.go
Directories ¶
Path | Synopsis |
---|---|
dbutil/litestream |
- Version
- v0.8.6 (latest)
- Published
- Mar 16, 2025
- Platform
- linux/amd64
- Imports
- 25 packages
- Last checked
- 1 week ago –
Tools for package owners.