package transfermanager
import "cloud.google.com/go/storage/transfermanager"
Package transfermanager provides an easy way to parallelize downloads in Google Cloud Storage.
More information about Google Cloud Storage is available at https://cloud.google.com/storage/docs.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
NOTE: This package is in preview. It is not stable, and is likely to change.
Index ¶
- type DownloadObjectInput
- type DownloadOutput
- type DownloadRange
- type Downloader
- func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error)
- func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error
- func (d *Downloader) WaitAndClose() ([]DownloadOutput, error)
- type Option
Examples ¶
Types ¶
type DownloadObjectInput ¶
type DownloadObjectInput struct { // Required fields Bucket string Object string Destination io.WriterAt // Optional fields Generation *int64 Conditions *storage.Conditions EncryptionKey []byte Range *DownloadRange // if specified, reads only a range // Callback will be run once the object is finished downloading. It must be // set if and only if the [WithCallbacks] option is set; otherwise, it must // not be set. Callback func(*DownloadOutput) // contains filtered or unexported fields }
DownloadObjectInput is the input for a single object to download.
type DownloadOutput ¶
type DownloadOutput struct { Bucket string Object string Range *DownloadRange // requested range, if it was specified Err error // error occurring during download Attrs *storage.ReaderObjectAttrs // attributes of downloaded object, if successful }
DownloadOutput provides output for a single object download, including all errors received while downloading object parts. If the download was successful, Attrs will be populated.
type DownloadRange ¶
type DownloadRange struct { // Offset is the starting offset (inclusive) from with the object is read. // If offset is negative, the object is read abs(offset) bytes from the end, // and length must also be negative to indicate all remaining bytes will be read. Offset int64 // Length is the number of bytes to read. // If length is negative or larger than the object size, the object is read // until the end. Length int64 }
DownloadRange specifies the object range.
type Downloader ¶
type Downloader struct {
// contains filtered or unexported fields
}
Downloader manages a set of parallelized downloads.
Code:play
Code:play
Example (Asynchronous)¶
package main
import (
"context"
"log"
"os"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/transfermanager"
)
func main() {
ctx := context.Background()
// Pass in any client opts or set retry policy here.
client, err := storage.NewClient(ctx) // can also use NewGRPCClient
if err != nil {
// handle error
}
// Create Downloader with callbacks plus any desired options, including
// number of workers, part size, per operation timeout, etc.
d, err := transfermanager.NewDownloader(client, transfermanager.WithCallbacks())
if err != nil {
// handle error
}
defer func() {
if _, err := d.WaitAndClose(); err != nil {
// one or more of the downloads failed
}
}()
// Create local file writer for output.
f, err := os.Create("/path/to/localfile")
if err != nil {
// handle error
}
// Create callback function
callback := func(out *transfermanager.DownloadOutput) {
if out.Err != nil {
log.Printf("download of %v failed with error %v", out.Object, out.Err)
} else {
log.Printf("download of %v succeeded", out.Object)
}
}
// Create download input
in := &transfermanager.DownloadObjectInput{
Bucket: "mybucket",
Object: "myblob",
Destination: f,
// Optionally specify params to apply to download.
EncryptionKey: []byte("mykey"),
// Specify the callback
Callback: callback,
}
// Add to Downloader.
if err := d.DownloadObject(ctx, in); err != nil {
// handle error
}
// Repeat if desired.
}
Example (Synchronous)¶
package main
import (
"context"
"log"
"os"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/transfermanager"
)
func main() {
ctx := context.Background()
// Pass in any client opts or set retry policy here.
client, err := storage.NewClient(ctx) // can also use NewGRPCClient
if err != nil {
// handle error
}
// Create Downloader with desired options, including number of workers,
// part size, per operation timeout, etc.
d, err := transfermanager.NewDownloader(client, transfermanager.WithWorkers(16))
if err != nil {
// handle error
}
// Create local file writer for output.
f, err := os.Create("/path/to/localfile")
if err != nil {
// handle error
}
// Create download input
in := &transfermanager.DownloadObjectInput{
Bucket: "mybucket",
Object: "myblob",
Destination: f,
// Optionally specify params to apply to download.
EncryptionKey: []byte("mykey"),
}
// Add to Downloader.
if err := d.DownloadObject(ctx, in); err != nil {
// handle error
}
// Repeat if desired.
// Wait for all downloads to complete.
results, err := d.WaitAndClose()
if err != nil {
// handle error
}
// Iterate through completed downloads and process results.
for _, out := range results {
if out.Err != nil {
log.Printf("download of %v failed with error %v", out.Object, out.Err)
} else {
log.Printf("download of %v succeeded", out.Object)
}
}
}
func NewDownloader ¶
func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error)
NewDownloader creates a new Downloader to add operations to. Choice of transport, etc is configured on the client that's passed in. The returned Downloader can be shared across goroutines to initiate downloads.
func (*Downloader) DownloadObject ¶
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error
DownloadObject queues the download of a single object. This will initiate the download but is non-blocking; call Downloader.Results or use the callback to process the result. DownloadObject is thread-safe and can be called simultaneously from different goroutines. The download may not start immediately if all workers are busy, so a deadline set on the ctx may time out before the download even starts. To set a timeout that starts with the download, use the [WithPerOpTimeout()] option.
func (*Downloader) WaitAndClose ¶
func (d *Downloader) WaitAndClose() ([]DownloadOutput, error)
WaitAndClose waits for all outstanding downloads to complete and closes the Downloader. Adding new downloads after this has been called will cause an error.
WaitAndClose returns all the results of the downloads and an error wrapping all errors that were encountered by the Downloader when downloading objects. These errors are also returned in the respective DownloadOutput for the failing download. The results are not guaranteed to be in any order. Results will be empty if using the WithCallbacks option.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
A Option is an option for a transfermanager Downloader or Uploader.
func WithCallbacks ¶
func WithCallbacks() Option
WithCallbacks returns a TransferManagerOption that allows the use of callbacks to process the results. If this option is set, then results will not be returned by Downloader.WaitAndClose and must be processed through the callback.
func WithPerOpTimeout ¶
WithPerOpTimeout returns a TransferManagerOption that sets a timeout on each operation that is performed to download or upload an object. The timeout is set when the operation begins processing, not when it is added. By default, no timeout is set other than an overall timeout as set on the provided context.
func WithWorkers ¶
WithWorkers returns a TransferManagerOption that specifies the maximum number of concurrent goroutines that will be used to download or upload objects. Defaults to runtime.NumCPU()/2.
Source Files ¶
doc.go downloader.go option.go
- Version
- v1.42.0
- Published
- Jun 10, 2024
- Platform
- linux/amd64
- Imports
- 9 packages
- Last checked
- 1 hour ago –
Tools for package owners.