package work
import "git.sr.ht/~sircmpwn/dowork"
dowork is a generic task queueing system for Go programs. It queues, executes, and reschedules tasks in Goroutine in-process.
A global task queue is provided for simple use-cases. To use it:
import ( "context" work "git.sr.ht/~sircmpwn/dowork" ) work.Submit(func(ctx context.Context) error { // ...do work... return nil })
This task will be executed in the background. The first time a task is submitted to the global queue, it will be initialized and start running in the background. The global queue will run with a default buffer size of 64K tasks that will be handled by a single worker goroutine. You can tune these defaults before submitting your first task:
// Run one worker goroutine per CPU with a buffer size of 256 tasks. work.Start(256, runtime.NumCPU())
To customize options like maximum retries and timeouts, use work.Enqueue:
task := work.NewTask(func(ctx context.Context) error { // ... }). Retries(5). // Maximum number of attempts MaxTimeout(10 * time.Minute). // Maximum timeout between attempts Within(10 * time.Second). // Deadline for each attempt After(func(ctx context.Context, task *work.Task) { // Executed once the task completes, successful or not }) work.Enqueue(task)
Retries are conducted with an exponential backoff.
You may also manage your own work queues. Use NewQueue() to obtain a queue, (*Queue).Run() to run a worker that will dispatch tasks in its own goroutine or (*Queue).Start() to spin up goroutines and start dispatching tasks automatically.
Use work.Shutdown() or (*Queue).Shutdown() to perform a soft shutdown of the queue, which will stop accepting new tasks and block until all already-queued tasks complete.
Index ¶
- Variables
- func Enqueue(t *Task)
- func Join(queues ...*Queue)
- func Shutdown()
- func Start(size int, workers int)
- type Queue
- func NewQueue(name string, size int) *Queue
- func (q *Queue) Enqueue(t *Task) error
- func (q *Queue) Name() string
- func (q *Queue) Run(ctx context.Context)
- func (q *Queue) Shutdown()
- func (q *Queue) Size() int
- func (q *Queue) Start(ctx context.Context, workers int)
- func (q *Queue) Submit(fn TaskFunc) (*Task, error)
- func (q *Queue) TryEnqueue(t *Task) error
- func (q *Queue) TrySubmit(fn TaskFunc) (*Task, error)
- type Task
- func NewTask(fn TaskFunc) *Task
- func Submit(fn func(ctx context.Context) error) (*Task, error)
- func (t *Task) After(fn func(ctx context.Context, task *Task)) *Task
- func (t *Task) Attempt(ctx context.Context) (time.Time, error)
- func (t *Task) Attempts() int
- func (t *Task) Before(fn func(ctx context.Context, task *Task)) *Task
- func (t *Task) Done() bool
- func (t *Task) MaxTimeout(d time.Duration) *Task
- func (t *Task) NextAttempt() time.Time
- func (t *Task) NoJitter() *Task
- func (t *Task) NotBefore(date time.Time) *Task
- func (t *Task) Result() error
- func (t *Task) Retries(n int) *Task
- func (t *Task) Within(deadline time.Duration) *Task
- type TaskFunc
Variables ¶
var ( ErrQueueShuttingDown = errors.New("Queue is shutting down; new tasks are not being accepted") ErrQueueFull = errors.New("Queue is full") ErrQueueEmpty = errors.New("Queue is empty") )
var ( // Returned when a task is attempted which was already successfully completed. ErrAlreadyComplete = errors.New("This task was already successfully completed once") // If this is returned from a task function, the task shall not be re-attempted. ErrDoNotReattempt = errors.New("This task should not be re-attempted") // This task has been attempted too many times. ErrMaxRetriesExceeded = errors.New("The maximum retries for this task has been exceeded") // Set this function to influence the clock that will be used for // scheduling re-attempts. Now = func() time.Time { return time.Now().UTC() } )
Functions ¶
func Enqueue ¶
func Enqueue(t *Task)
Enqueues a task in the global queue. Will block if the queue is full until a worker can pick up the task.
func Join ¶
func Join(queues ...*Queue)
Shuts down any number of work queues in parallel and blocks until they're all finished.
func Shutdown ¶
func Shutdown()
Stops accepting new tasks and blocks until all queued tasks are completed.
func Start ¶
Ensures that the global queue is started
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
Create a new multi-producer, multi-consumer task queue. The name of the task queue is used in Prometheus label names and must match [a-zA-Z0-9:_] (snake case is used by convention).
The queue is bound to a maximum buffer size. Enqueue() may block if the buffer is full until one worker picks up one task. The buffer size may be set to 0 in which case all Enqueue() calls will block until one worker handles the task.
func (*Queue) Enqueue ¶
Enqueues a task. Will block if the queue is full until workers pick up tasks. Can be safely called from multiple goroutines.
An error will only be returned if the queue has been shut down.
ATTENTION: calling Enqueue() on unbuffered queue (size 0) before it is started will cause a panic().
func (*Queue) Name ¶
Name returns the name value of the given Queue
func (*Queue) Run ¶
Process tasks from the queue until it is stopped. Can be safely called from multiple goroutines. Blocks until the queue is shutdown and all tasks are done or the passed context is cancelled.
func (*Queue) Shutdown ¶
func (q *Queue) Shutdown()
Stop accepting new tasks and block until all already-queued tasks are complete. All started workers will be stopped. Will panic() if called multiple times.
Failed tasks that are waiting for retry will be reattempted immediately.
func (*Queue) Size ¶
Size returns the size of the queue buffer size
func (*Queue) Start ¶
Start processing tasks on a given number of workers. Each worker will be started with the same context. Will return once all workers are started.
func (*Queue) Submit ¶
Creates and enqueues a new task, returning the new task. Will block if the queue is full until workers pick up tasks.
Note that the caller cannot customize settings on the task without creating a race condition; so attempting to will panic. See NewTask and (*Queue).Enqueue to create tasks with customized options.
An error will only be returned if the queue has been shut down.
ATTENTION: calling Submit() on unbuffered queue (size 0) before it is started will cause a panic().
func (*Queue) TryEnqueue ¶
Tries to enqueue a task. Can be safely called from multiple goroutines.
ErrQueueShuttingDown error will be returned if the queue has been shut down, and ErrQueueFull will be returned if the queue is full.
func (*Queue) TrySubmit ¶
Creates and tries to enqueue a new task, returning the new task. Note that the caller cannot customize settings on the task without creating a race condition; so attempting to will panic. See NewTask and (*Queue).Enqueue to create tasks with customized options.
ErrQueueShuttingDown error will be returned if the queue has been shut down, and ErrQueueFull will be returned if the queue is full.
type Task ¶
type Task struct { Metadata map[string]interface{} // contains filtered or unexported fields }
Stores state for a task which shall be or has been executed. Each task may only be executed successfully once.
func NewTask ¶
Creates a new task for a given function.
func Submit ¶
See (*Queue).Submit
func (*Task) After ¶
Sets a function which will be executed once the task is completed, successfully or not. The final result (nil or an error) is passed to the callee.
func (*Task) Attempt ¶
Attempts to execute this task.
If successful, the zero time and nil are returned.
Otherwise, the error returned from the task function is returned to the caller. If an error is returned for which errors.Is(err, ErrDoNotReattempt) is true, the caller should not call Attempt again.
func (*Task) Attempts ¶
Returns the number of times this task has been attempted
func (*Task) Before ¶
Before Sets a function which will be executed before a task is attempted to run. This will be called before every attempt, including retries.
func (*Task) Done ¶
Returns true if this task was completed, successfully or not.
func (*Task) MaxTimeout ¶
Sets the maximum timeout between retries, or zero to exponentially increase the timeout indefinitely. Defaults to 30 minutes.
func (*Task) NextAttempt ¶
Returns the time the next attempt is scheduled for, or the zero value if it has not been attempted before.
func (*Task) NoJitter ¶
Specifies that randomness should not be introduced into the exponential backoff algorithm.
func (*Task) NotBefore ¶
Specifies the earliest possible time of the first execution.
func (*Task) Result ¶
Returns the result of the task. The task must have been completed for this to be valid.
func (*Task) Retries ¶
Set the maximum number of retries on failure, or -1 to attempt indefinitely. By default, tasks are not retried on failure.
func (*Task) Within ¶
Specifies an upper limit for the duration of each attempt.
type TaskFunc ¶
Source Files ¶
doc.go global.go queue.go task.go
Directories ¶
Path | Synopsis |
---|---|
examples |
- Version
- v0.0.0-20241219115748-a3928eb084db (latest)
- Published
- Dec 19, 2024
- Platform
- linux/amd64
- Imports
- 11 packages
- Last checked
- 2 months ago –
Tools for package owners.