lineworker – github.com/codesoap/lineworker Index | Examples | Files

package lineworker

import "github.com/codesoap/lineworker"

package lineworker provides a worker pool with a fixed amount of workers. It outputs work results in the order the work was given. The package is designed for serial data input and output; the functions Process and Next must never be called in parallel.

Each worker caches at most one result, so that no new work is processed, if as many results are waiting to be consumed as there are workers.

Index

Examples

Variables

var EOS = fmt.Errorf("no more results available")

EOS is the error returned by Next when no more results are available.

Types

type WorkFunc

type WorkFunc[IN, OUT any] func(in IN) (OUT, error)

type WorkerPool

type WorkerPool[IN, OUT any] struct {
	// contains filtered or unexported fields
}
Example

Code:play 

package main

import (
	"fmt"
	"math/rand"
	"runtime"
	"time"

	"github.com/codesoap/lineworker"
)

func main() {
	slowSprint := func(a int) (string, error) {
		delay := rand.Int()
		time.Sleep(time.Duration(delay%6) * time.Millisecond)
		return fmt.Sprint(a), nil
	}
	pool := lineworker.NewWorkerPool(runtime.NumCPU(), slowSprint)
	go func() {
		for i := 0; i < 10; i++ {
			workAccepted := pool.Process(i)
			if !workAccepted {
				// Cannot happen in this example, because pool.Stop is not called
				// outside this goroutine, but is handled for demonstration
				// purposes.
				return
			}
		}
		pool.Stop()
	}()
	for {
		res, err := pool.Next()
		if err == lineworker.EOS {
			break
		} else if err != nil {
			panic(err)
		}
		fmt.Println(res)
	}
}

Output:

0
1
2
3
4
5
6
7
8
9

func NewWorkerPool

func NewWorkerPool[IN, OUT any](workerCount int, f WorkFunc[IN, OUT]) *WorkerPool[IN, OUT]

NewWorkerPool creates a new worker pool with workerCount workers waiting to process data of type IN to results of type OUT via f.

func (*WorkerPool[IN, OUT]) DiscardWork

func (w *WorkerPool[IN, OUT]) DiscardWork()

DiscardWork recieves and discards all pending work results, so that workers can quit after Stop has been called. It will block until all workers have quit.

DiscardWork must only be called after Stop has been called.

func (*WorkerPool[IN, OUT]) Next

func (w *WorkerPool[IN, OUT]) Next() (OUT, error)

Next will return the next result with its error. If the next result is not yet ready, it will block. If no more results are available, the EOS error will be returned.

func (*WorkerPool[IN, OUT]) Process

func (w *WorkerPool[IN, OUT]) Process(input IN) bool

Process queues a new input for processing. If all workers are currently busy, Process will block.

Process will return true if the input has been accepted. If Stop has been called previously, Process will discard the given input and return false.

func (*WorkerPool[IN, OUT]) Stop

func (w *WorkerPool[IN, OUT]) Stop()

Stop should be called after all calls to Process have been made. It stops the workers from accepting new work and allows their resources to be released after all results have been consumed via Next or discarded with DiscardWork.

Further calls to Stop after the first call will do nothing.

Source Files

lineworker.go

Version
v0.2.1 (latest)
Published
Nov 1, 2024
Platform
linux/amd64
Imports
2 packages
Last checked
1 month ago

Tools for package owners.