package jobs
import "github.com/go-arrower/arrower/jobs"
Package jobs provides an easy way to run workloads in the background.
To do so create a Queue and customise it with different QueueOptions. Jobs can be any struct with arbitrary payload, and they can be enqueued to run ones or scheduled to run repeatedly.
Index ¶
- Constants
- Variables
- func FromContext(ctx context.Context) (string, bool)
- func NewNoop() *noopQueue
- type Enqueuer
- type Job
- type JobFunc
- type JobOption
- type JobType
- type MemoryQueue
- func NewMemoryQueue() *MemoryQueue
- func (q *MemoryQueue) Enqueue(_ context.Context, job Job, _ ...JobOption) error
- func (q *MemoryQueue) RegisterJobFunc(jf JobFunc) error
- func (q *MemoryQueue) Schedule(spec string, job Job) error
- func (q *MemoryQueue) Shutdown(_ context.Context) error
- type PersistenceCTXPayload
- type PersistencePayload
- type PollStrategy
- type PostgresJobsHandler
- func NewPostgresJobs( logger alog.Logger, meterProvider metric.MeterProvider, traceProvider trace.TracerProvider, pgxPool *pgxpool.Pool, opts ...QueueOption, ) (*PostgresJobsHandler, error)
- func (h *PostgresJobsHandler) Enqueue(ctx context.Context, job Job, opts ...JobOption) error
- func (h *PostgresJobsHandler) RegisterJobFunc(jf JobFunc) error
- func (h *PostgresJobsHandler) Schedule(spec string, job Job) error
- func (h *PostgresJobsHandler) Shutdown(ctx context.Context) error
- type Queue
- type QueueOption
- func WithPollInterval(d time.Duration) QueueOption
- func WithPollStrategy(s PollStrategy) QueueOption
- func WithPoolName(n string) QueueOption
- func WithPoolSize(n int) QueueOption
- func WithQueue(queue string) QueueOption
- type Scheduler
- type TestAssertions
- func (a *TestAssertions) Contains(job Job, msgAndArgs ...any) bool
- func (a *TestAssertions) Empty(msgAndArgs ...any) bool
- func (a *TestAssertions) NotContains(job Job, msgAndArgs ...any) bool
- func (a *TestAssertions) NotEmpty(msgAndArgs ...any) bool
- func (a *TestAssertions) Total(total int, msgAndArgs ...any) bool
- type TestQueue
Constants ¶
CTXJobID contains the current job ID.
Variables ¶
var ( ErrRegisterJobFuncFailed = errors.New("register JobFunc failed") ErrInvalidJobFunc = fmt.Errorf("%w: invalid JobFunc func signature", ErrRegisterJobFuncFailed) ErrEnqueueFailed = errors.New("enqueue failed") ErrScheduleFailed = errors.New("schedule failed") ErrInvalidJobType = fmt.Errorf("%w: invalid job type", ErrEnqueueFailed) ErrInvalidJobOpt = fmt.Errorf("%w: invalid job option", ErrEnqueueFailed) ErrInvalidQueueOpt = errors.New("todo") ErrJobFuncFailed = errors.New("arrower: job failed") )
Functions ¶
func FromContext ¶
FromContext returns the CTXJobID.
func NewNoop ¶
func NewNoop() *noopQueue
NewNoop returns an implementation of Queue that performs no operations. Ideal as dependency in tests.
Types ¶
type Enqueuer ¶
type Enqueuer interface { // Enqueue schedules new Jobs. Use the JobOpts to configure the Jobs scheduled. // You can schedule and individual or multiple jobs at the same time. // If ctx has a postgres.CtxTX present, that transaction is used to persist the new job(s). Enqueue(ctx context.Context, job Job, jobOptions ...JobOption) error }
Enqueuer is an interface that allows new Jobs to be enqueued.
type Job ¶
type Job any
Job has two purposes:
- Carry the payload passed between job creator and worker. The type of Job has to be a named struct that optionally implements JobType.
- Is a placeholder for any concrete implementation of the JobType interface. Its purpose is that it can be used as a type for the JobType functions.
Job can be a single struct, a slice of structs, or an arbitrary slice of structs, each element representing a job to be scheduled.
type JobFunc ¶
type JobFunc any // func(ctx context.Context, job Job) error
JobFunc is the subscriber's handler and must have the signature: func(ctx context.Context, job Job) error {}.
If using a signature for Register like: Register(f func(ctx context.Context, job any) error) error, the compiler throws: `cannot use func(ctx context.Context, data []byte) error {…} (value of type func(ctx context.Context, data []byte) error) as func(ctx context.Context, job any) error value in argument to Register.`
type JobOption ¶
JobOption are functions which allow specific changes in the behaviour of a Job, e.g. set a priority or a time at which the job should run at.
func WithPriority ¶
WithPriority changes the priority of a Job. The default priority is 0, and a lower number means a higher priority.
func WithRunAt ¶
WithRunAt defines the time when a Job should be run at. Use it to schedule the Job into the future. This is not a guarantee, that the Job will be executed at the exact runAt time, just that it will not be processed earlier. If your queue is full, or you have to few workers, it might be picked up later.
type JobType ¶
type JobType interface { JobType() string }
JobType returns the Job's type. It is optional and does not have to be implemented by each Job. If it's not implemented the struct type is used as JobType instead.
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
func NewMemoryQueue ¶
func NewMemoryQueue() *MemoryQueue
NewMemoryQueue is an in memory implementation of the Queue. No Jobs are persisted! Recommended use for local development and demos only.
func (*MemoryQueue) Enqueue ¶
func (*MemoryQueue) RegisterJobFunc ¶
func (q *MemoryQueue) RegisterJobFunc(jf JobFunc) error
func (*MemoryQueue) Schedule ¶
func (q *MemoryQueue) Schedule(spec string, job Job) error
func (*MemoryQueue) Shutdown ¶
func (q *MemoryQueue) Shutdown(_ context.Context) error
type PersistenceCTXPayload ¶
type PersistenceCTXPayload struct { UserID string `json:"userId"` // Carrier contains the otel tracing information. Carrier propagation.MapCarrier `json:"carrier"` }
PersistenceCTXPayload contains a selected subset of the values stored in the request ctx. These values are partially handed down to the job worker's ctx.
Note: slog.Attr saves the value as a pointer and does not persist when marshalled to json.
type PersistencePayload ¶
type PersistencePayload struct { // JobData is the actual data as string instead of []byte, // so that it is readable more easily when assessing it via psql directly. JobData interface{} `json:"jobData"` // JobStructPath is the full path of the struct / Job payload. // It will be the type's PkgPath.Name with the prefix of the executing module (your app) removed. JobStructPath string `json:"jobStructPath"` // GitHashEnqueued is the version of the source code used that got the Job enqueued. GitHashEnqueued string `json:"gitHashEnqueued"` // GitHashProcessed is the version the Job got processed with by a JobFunc. // It is only set when the PersistencePayload is stored in the history. GitHashProcessed string `json:"gitHashProcessed"` // Ctx persists some NOT ALL information stored in the context. Ctx PersistenceCTXPayload `json:"ctx"` }
PersistencePayload is the structure of how a Job is saved by each Queue implementation.
The JSON of this struct is shown in the admin UI => keep the order if fields (especially JobData at the top).
type PollStrategy ¶
type PollStrategy int
const ( // PriorityPollStrategy cares about the priority first to lock top priority Jobs first even if there are available // ones that should be executed earlier but with lower priority. PriorityPollStrategy PollStrategy = iota // RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there // are ones with a higher priority scheduled to a later time but already eligible for execution. RunAtPollStrategy )
type PostgresJobsHandler ¶
type PostgresJobsHandler struct {
// contains filtered or unexported fields
}
PostgresJobsHandler is the main jobs' abstraction.
func NewPostgresJobs ¶
func NewPostgresJobs( logger alog.Logger, meterProvider metric.MeterProvider, traceProvider trace.TracerProvider, pgxPool *pgxpool.Pool, opts ...QueueOption, ) (*PostgresJobsHandler, error)
NewPostgresJobs returns an initialised PostgresJobsHandler. Each Worker in the pool defaults to a poll interval of 5 seconds, which can be overridden by WithPollInterval option. The default queue is the nameless queue "", which can be overridden by WithQueue option.
func (*PostgresJobsHandler) Enqueue ¶
func (*PostgresJobsHandler) RegisterJobFunc ¶
func (h *PostgresJobsHandler) RegisterJobFunc(jf JobFunc) error
RegisterJobFunc registers new worker functions for a given JobType.
func (*PostgresJobsHandler) Schedule ¶
func (h *PostgresJobsHandler) Schedule(spec string, job Job) error
func (*PostgresJobsHandler) Shutdown ¶
func (h *PostgresJobsHandler) Shutdown(ctx context.Context) error
type Queue ¶
type Queue interface { Enqueuer Scheduler // RegisterJobFunc registers a new JobFunc in the Queue. The name of the Job struct of JobFunc is used // as the job type, except Job implements the JobType interface. Then that is used as a job type. // // The queue starts processing Jobs automatically after the given poll interval via WithPollInterval (default 5 sec), // as a waiting time for more JobFuncs to be registered. Consecutive calls to RegisterJobFunc reset the interval. // Subsequent calls to RegisterJobFunc will restart the queue, as the underlying library gue // requires all workers to be known before start. RegisterJobFunc(jobFunc JobFunc) error // Shutdown blocks and waits until all started jobs are finished. // Timeout does not work currently. Shutdown(vtc context.Context) error }
type QueueOption ¶
type QueueOption func(*queueOpt)
QueueOption are functions that allow different behaviour of a Queue.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) QueueOption
WithPollInterval sets the duration in which to check the database for new Jobs.
func WithPollStrategy ¶
func WithPollStrategy(s PollStrategy) QueueOption
WithPollStrategy overrides default poll strategy with given value.
func WithPoolName ¶
func WithPoolName(n string) QueueOption
WithPoolName sets the name of the worker pool.
func WithPoolSize ¶
func WithPoolSize(n int) QueueOption
WithPoolSize sets the number of workers used to poll from the queue.
func WithQueue ¶
func WithQueue(queue string) QueueOption
WithQueue sets the name of the queue used for all Jobs.
type Scheduler ¶
type Scheduler interface { // Schedule schedules a Job repeatingly. Spec is the crontab format with some additional nonstandard definitions. // // ┌───────────── minute (0 - 59) // │ ┌───────────── hour (0 - 23) // │ │ ┌───────────── day of the month (1 - 31) // │ │ │ ┌───────────── month (1 - 12) // │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday; // │ │ │ │ │ 7 is also Sunday on some systems) // │ │ │ │ │ // │ │ │ │ │ // * * * * * // // `@yearly` (or `@annually`) => `0 0 1 1 *` // `@monthly` => `0 0 1 * *` // `@weekly` => `0 0 * * 0` // `@daily` (or `@midnight`) => `0 0 * * *` // `@hourly` => `0 * * * *` // `@every [interval]` where interval is the duration string that can be parsed by time.ParseDuration. Schedule(spec string, job Job) error }
Scheduler is an interface that allows Jobs to be regularly scheduled.
type TestAssertions ¶
type TestAssertions struct {
// contains filtered or unexported fields
}
TestAssertions are assertions that work on a Queue, to make testing easier and convenient. The interface follows stretchr/testify as close as possible.
- Every assert func returns a bool indicating whether the assertion was successful or not, this is useful for if you want to go on making further assertions under certain conditions.
func (*TestAssertions) Contains ¶
func (a *TestAssertions) Contains(job Job, msgAndArgs ...any) bool
Contains asserts that the queue contains at least one Job of the same type as job.
func (*TestAssertions) Empty ¶
func (a *TestAssertions) Empty(msgAndArgs ...any) bool
Empty asserts that the queue has no pending Jobs.
func (*TestAssertions) NotContains ¶
func (a *TestAssertions) NotContains(job Job, msgAndArgs ...any) bool
NotContains asserts that the queue does not contains any Job of the same type as job.
func (*TestAssertions) NotEmpty ¶
func (a *TestAssertions) NotEmpty(msgAndArgs ...any) bool
NotEmpty asserts that the queue has at least one pending Job.
func (*TestAssertions) Total ¶
func (a *TestAssertions) Total(total int, msgAndArgs ...any) bool
Total asserts that the queue has exactly total number of jobs pending.
type TestQueue ¶
type TestQueue struct { *MemoryQueue *TestAssertions }
TestQueue is a special Queue for unit testing. It exposes all methods of Queue and can be injected as a dependency in any application. Additionally, TestQueue exposes a set of assertions TestAssertions on all the jobs stored in the Queue.
func Test ¶
Test returns a TestQueue tuned for unit testing.
func (*TestQueue) Get ¶
Get returns the pos'th Job in the queue or nil if the queue is empty. The Job stays in the queue.
func (*TestQueue) GetFirst ¶
GetFirst returns the first Job in the queue or nil if the queue is empty. The Job stays in the queue.
func (*TestQueue) GetFirstOf ¶
GetFirstOf returns the first Job of the same type as the given job or nil if the queue is empty. The Job stays in the queue.
func (*TestQueue) GetOf ¶
GetOf returns the pos'th Job of the same type as the given job or nil if the queue is empty. The Job stays in the queue.
func (*TestQueue) Jobs ¶
Source Files ¶
gue-logger-adapter.go inmemory.go jobs.go noop.go postgres.go testing.go
Directories ¶
Path | Synopsis |
---|---|
jobs/models |
- Version
- v0.0.0-20250311203644-ab26c1152cb4 (latest)
- Published
- Mar 11, 2025
- Platform
- linux/amd64
- Imports
- 32 packages
- Last checked
- 1 week ago –
Tools for package owners.