package jobs

import "github.com/go-arrower/arrower/jobs"

Package jobs provides an easy way to run workloads in the background.

To do so create a Queue and customise it with different QueueOptions. Jobs can be any struct with arbitrary payload, and they can be enqueued to run ones or scheduled to run repeatedly.

Index

Constants

const CTXJobID ctx2.CTXKey = "arrower.jobs"

CTXJobID contains the current job ID.

Variables

var (
	ErrRegisterJobFuncFailed = errors.New("register JobFunc failed")
	ErrInvalidJobFunc        = fmt.Errorf("%w: invalid JobFunc func signature", ErrRegisterJobFuncFailed)
	ErrEnqueueFailed         = errors.New("enqueue failed")
	ErrScheduleFailed        = errors.New("schedule failed")
	ErrInvalidJobType        = fmt.Errorf("%w: invalid job type", ErrEnqueueFailed)
	ErrInvalidJobOpt         = fmt.Errorf("%w: invalid job option", ErrEnqueueFailed)
	ErrInvalidQueueOpt       = errors.New("todo")
	ErrJobFuncFailed         = errors.New("arrower: job failed")
)

Functions

func FromContext

func FromContext(ctx context.Context) (string, bool)

FromContext returns the CTXJobID.

func NewNoop

func NewNoop() *noopQueue

NewNoop returns an implementation of Queue that performs no operations. Ideal as dependency in tests.

Types

type Enqueuer

type Enqueuer interface {
	// Enqueue schedules new Jobs. Use the JobOpts to configure the Jobs scheduled.
	// You can schedule and individual or multiple jobs at the same time.
	// If ctx has a postgres.CtxTX present, that transaction is used to persist the new job(s).
	Enqueue(ctx context.Context, job Job, jobOptions ...JobOption) error
}

Enqueuer is an interface that allows new Jobs to be enqueued.

type Job

type Job any

Job has two purposes:

  1. Carry the payload passed between job creator and worker. The type of Job has to be a named struct that optionally implements JobType.
  2. Is a placeholder for any concrete implementation of the JobType interface. Its purpose is that it can be used as a type for the JobType functions.

Job can be a single struct, a slice of structs, or an arbitrary slice of structs, each element representing a job to be scheduled.

type JobFunc

type JobFunc any // func(ctx context.Context, job Job) error

JobFunc is the subscriber's handler and must have the signature: func(ctx context.Context, job Job) error {}.

If using a signature for Register like: Register(f func(ctx context.Context, job any) error) error, the compiler throws: `cannot use func(ctx context.Context, data []byte) error {…} (value of type func(ctx context.Context, data []byte) error) as func(ctx context.Context, job any) error value in argument to Register.`

type JobOption

type JobOption func(p Job) error

JobOption are functions which allow specific changes in the behaviour of a Job, e.g. set a priority or a time at which the job should run at.

func WithPriority

func WithPriority(priority int16) JobOption

WithPriority changes the priority of a Job. The default priority is 0, and a lower number means a higher priority.

func WithRunAt

func WithRunAt(runAt time.Time) JobOption

WithRunAt defines the time when a Job should be run at. Use it to schedule the Job into the future. This is not a guarantee, that the Job will be executed at the exact runAt time, just that it will not be processed earlier. If your queue is full, or you have to few workers, it might be picked up later.

type JobType

type JobType interface {
	JobType() string
}

JobType returns the Job's type. It is optional and does not have to be implemented by each Job. If it's not implemented the struct type is used as JobType instead.

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

NewMemoryQueue is an in memory implementation of the Queue. No Jobs are persisted! Recommended use for local development and demos only.

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(_ context.Context, job Job, _ ...JobOption) error

func (*MemoryQueue) RegisterJobFunc

func (q *MemoryQueue) RegisterJobFunc(jf JobFunc) error

func (*MemoryQueue) Schedule

func (q *MemoryQueue) Schedule(spec string, job Job) error

func (*MemoryQueue) Shutdown

func (q *MemoryQueue) Shutdown(_ context.Context) error

type PersistenceCTXPayload

type PersistenceCTXPayload struct {
	UserID string `json:"userId"`

	// Carrier contains the otel tracing information.
	Carrier propagation.MapCarrier `json:"carrier"`
}

PersistenceCTXPayload contains a selected subset of the values stored in the request ctx. These values are partially handed down to the job worker's ctx.

Note: slog.Attr saves the value as a pointer and does not persist when marshalled to json.

type PersistencePayload

type PersistencePayload struct {
	// JobData is the actual data as string instead of []byte,
	// so that it is readable more easily when assessing it via psql directly.
	JobData interface{} `json:"jobData"`

	// JobStructPath is the full path of the struct / Job payload.
	// It will be the type's PkgPath.Name with the prefix of the executing module (your app) removed.
	JobStructPath string `json:"jobStructPath"`

	// GitHashEnqueued is the version of the source code used that got the Job enqueued.
	GitHashEnqueued string `json:"gitHashEnqueued"`
	// GitHashProcessed is the version the Job got processed with by a JobFunc.
	// It is only set when the PersistencePayload is stored in the history.
	GitHashProcessed string `json:"gitHashProcessed"`

	// Ctx persists some NOT ALL information stored in the context.
	Ctx PersistenceCTXPayload `json:"ctx"`
}

PersistencePayload is the structure of how a Job is saved by each Queue implementation.

The JSON of this struct is shown in the admin UI => keep the order if fields (especially JobData at the top).

type PollStrategy

type PollStrategy int
const (
	// PriorityPollStrategy cares about the priority first to lock top priority Jobs first even if there are available
	// ones that should be executed earlier but with lower priority.
	PriorityPollStrategy PollStrategy = iota

	// RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there
	// are ones with a higher priority scheduled to a later time but already eligible for execution.
	RunAtPollStrategy
)

type PostgresJobsHandler

type PostgresJobsHandler struct {
	// contains filtered or unexported fields
}

PostgresJobsHandler is the main jobs' abstraction.

func NewPostgresJobs

func NewPostgresJobs(
	logger alog.Logger,
	meterProvider metric.MeterProvider,
	traceProvider trace.TracerProvider,
	pgxPool *pgxpool.Pool,
	opts ...QueueOption,
) (*PostgresJobsHandler, error)

NewPostgresJobs returns an initialised PostgresJobsHandler. Each Worker in the pool defaults to a poll interval of 5 seconds, which can be overridden by WithPollInterval option. The default queue is the nameless queue "", which can be overridden by WithQueue option.

func (*PostgresJobsHandler) Enqueue

func (h *PostgresJobsHandler) Enqueue(ctx context.Context, job Job, opts ...JobOption) error

func (*PostgresJobsHandler) RegisterJobFunc

func (h *PostgresJobsHandler) RegisterJobFunc(jf JobFunc) error

RegisterJobFunc registers new worker functions for a given JobType.

func (*PostgresJobsHandler) Schedule

func (h *PostgresJobsHandler) Schedule(spec string, job Job) error

func (*PostgresJobsHandler) Shutdown

func (h *PostgresJobsHandler) Shutdown(ctx context.Context) error

type Queue

type Queue interface {
	Enqueuer
	Scheduler

	// RegisterJobFunc registers a new JobFunc in the Queue. The name of the Job struct of JobFunc is used
	// as the job type, except Job implements the JobType interface. Then that is used as a job type.
	//
	// The queue starts processing Jobs automatically after the given poll interval via WithPollInterval (default 5 sec),
	// as a waiting time for more JobFuncs to be registered. Consecutive calls to RegisterJobFunc reset the interval.
	// Subsequent calls to RegisterJobFunc will restart the queue, as the underlying library gue
	// requires all workers to be known before start.
	RegisterJobFunc(jobFunc JobFunc) error

	// Shutdown blocks and waits until all started jobs are finished.
	// Timeout does not work currently.
	Shutdown(vtc context.Context) error
}

type QueueOption

type QueueOption func(*queueOpt)

QueueOption are functions that allow different behaviour of a Queue.

func WithPollInterval

func WithPollInterval(d time.Duration) QueueOption

WithPollInterval sets the duration in which to check the database for new Jobs.

func WithPollStrategy

func WithPollStrategy(s PollStrategy) QueueOption

WithPollStrategy overrides default poll strategy with given value.

func WithPoolName

func WithPoolName(n string) QueueOption

WithPoolName sets the name of the worker pool.

func WithPoolSize

func WithPoolSize(n int) QueueOption

WithPoolSize sets the number of workers used to poll from the queue.

func WithQueue

func WithQueue(queue string) QueueOption

WithQueue sets the name of the queue used for all Jobs.

type Scheduler

type Scheduler interface {
	// Schedule schedules a Job repeatingly. Spec is the crontab format with some additional nonstandard definitions.
	//
	// ┌───────────── minute (0 - 59)
	// │ ┌───────────── hour (0 - 23)
	// │ │ ┌───────────── day of the month (1 - 31)
	// │ │ │ ┌───────────── month (1 - 12)
	// │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
	// │ │ │ │ │                                   7 is also Sunday on some systems)
	// │ │ │ │ │
	// │ │ │ │ │
	// * * * * *
	//
	// `@yearly` (or `@annually`)   => `0 0 1 1 *`
	// `@monthly`                   => `0 0 1 * *`
	// `@weekly`                    => `0 0 * * 0`
	// `@daily` (or `@midnight`)    => `0 0 * * *`
	// `@hourly`                    => `0 * * * *`
	// `@every [interval]` where interval is the duration string that can be parsed by time.ParseDuration.
	Schedule(spec string, job Job) error
}

Scheduler is an interface that allows Jobs to be regularly scheduled.

type TestAssertions

type TestAssertions struct {
	// contains filtered or unexported fields
}

TestAssertions are assertions that work on a Queue, to make testing easier and convenient. The interface follows stretchr/testify as close as possible.

func (*TestAssertions) Contains

func (a *TestAssertions) Contains(job Job, msgAndArgs ...any) bool

Contains asserts that the queue contains at least one Job of the same type as job.

func (*TestAssertions) Empty

func (a *TestAssertions) Empty(msgAndArgs ...any) bool

Empty asserts that the queue has no pending Jobs.

func (*TestAssertions) NotContains

func (a *TestAssertions) NotContains(job Job, msgAndArgs ...any) bool

NotContains asserts that the queue does not contains any Job of the same type as job.

func (*TestAssertions) NotEmpty

func (a *TestAssertions) NotEmpty(msgAndArgs ...any) bool

NotEmpty asserts that the queue has at least one pending Job.

func (*TestAssertions) Total

func (a *TestAssertions) Total(total int, msgAndArgs ...any) bool

Total asserts that the queue has exactly total number of jobs pending.

type TestQueue

type TestQueue struct {
	*MemoryQueue
	*TestAssertions
}

TestQueue is a special Queue for unit testing. It exposes all methods of Queue and can be injected as a dependency in any application. Additionally, TestQueue exposes a set of assertions TestAssertions on all the jobs stored in the Queue.

func Test

func Test(t *testing.T) *TestQueue

Test returns a TestQueue tuned for unit testing.

func (*TestQueue) Get

func (q *TestQueue) Get(pos int) Job

Get returns the pos'th Job in the queue or nil if the queue is empty. The Job stays in the queue.

func (*TestQueue) GetFirst

func (q *TestQueue) GetFirst() Job

GetFirst returns the first Job in the queue or nil if the queue is empty. The Job stays in the queue.

func (*TestQueue) GetFirstOf

func (q *TestQueue) GetFirstOf(job Job) Job

GetFirstOf returns the first Job of the same type as the given job or nil if the queue is empty. The Job stays in the queue.

func (*TestQueue) GetOf

func (q *TestQueue) GetOf(job Job, pos int) Job

GetOf returns the pos'th Job of the same type as the given job or nil if the queue is empty. The Job stays in the queue.

func (*TestQueue) Jobs

func (q *TestQueue) Jobs() []Job

Source Files

gue-logger-adapter.go inmemory.go jobs.go noop.go postgres.go testing.go

Directories

PathSynopsis
jobs/models
Version
v0.0.0-20250311203644-ab26c1152cb4 (latest)
Published
Mar 11, 2025
Platform
linux/amd64
Imports
32 packages
Last checked
1 week ago

Tools for package owners.