package stream
import "github.com/sourcegraph/conc/stream"
Package stream provides a concurrent, ordered stream implementation.
Index ¶
- type Callback
- type Stream
- func New() *Stream
- func (s *Stream) Go(f Task)
- func (s *Stream) Wait()
- func (s *Stream) WithMaxGoroutines(n int) *Stream
- type Task
Examples ¶
Types ¶
type Callback ¶
type Callback func()
Callback is a function that is returned by a Task. Callbacks are called in the same order that tasks are submitted.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is used to execute a stream of tasks concurrently while maintaining the order of the results.
To use a stream, you submit some number of `Task`s, each of which return a callback. Each task will be executed concurrently in the stream's associated Pool, and the callbacks will be executed sequentially in the order the tasks were submitted.
Once all your tasks have been submitted, Wait() must be called to clean up running goroutines and propagate any panics.
In the case of panic during execution of a task or a callback, all other tasks and callbacks will still execute. The panic will be propagated to the caller when Wait() is called.
A Stream is efficient, but not zero cost. It should not be used for very
short tasks. Startup and teardown adds an overhead of a couple of
microseconds, and the overhead for each task is roughly 500ns. It should be
good enough for any task that requires a network call.
Code:
Output:Example¶
{
times := []int{20, 52, 16, 45, 4, 80}
stream := New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()
// Output:
// 20ms
// 52ms
// 16ms
// 45ms
// 4ms
// 80ms
}
20ms
52ms
16ms
45ms
4ms
80ms
func New ¶
func New() *Stream
New creates a new Stream with default settings.
func (*Stream) Go ¶
Go schedules a task to be run in the stream's pool. All submitted tasks will be executed concurrently in worker goroutines. Then, the callbacks returned by the tasks will be executed in the order that the tasks were submitted. All callbacks will be executed by the same goroutine, so no synchronization is necessary between callbacks. If all goroutines in the stream's pool are busy, a call to Go() will block until the task can be started.
func (*Stream) Wait ¶
func (s *Stream) Wait()
Wait signals to the stream that all tasks have been submitted. Wait will not return until all tasks and callbacks have been run.
func (*Stream) WithMaxGoroutines ¶
type Task ¶
type Task func() Callback
Task is a task that is submitted to the stream. Submitted tasks will be executed concurrently. It returns a callback that will be called after the task has completed.
Source Files ¶
- Version
- v0.3.0 (latest)
- Published
- Feb 25, 2023
- Platform
- linux/amd64
- Imports
- 4 packages
- Last checked
- 1 hour ago –
Tools for package owners.