diff --git a/example_test.go b/example_test.go index 56ca424..55bdf4a 100644 --- a/example_test.go +++ b/example_test.go @@ -31,10 +31,10 @@ func Example() { } // Starting up the workers. - d.Start(ctx) + d.Start() // Feeding the workers some work. - d.Append(work) + d.Append(workgroups.NewJob(ctx, work)) // Closing the channel for work. d.Close() diff --git a/workgroups.go b/workgroups.go index 198dfe8..4b964ce 100644 --- a/workgroups.go +++ b/workgroups.go @@ -11,16 +11,38 @@ import ( "golang.org/x/sync/errgroup" ) +// ErrInWorker is an error that gets returned if there is a error +// in the work function. var ErrInWorker = errors.New("received error in worker") -type Job func(ctx context.Context) error +// Work is a type that defines worker work. +type Work func(ctx context.Context) error +// Job carries a job with everything it needs. +// I know know that contexts shouldnt be stored in a struct. +// Here is an exception, because its a short living object. +// The context is only used as argument for the Work function. +// Please use the NewJob function to get around this context in struct shenanigans. +type Job struct { + ctx context.Context + work Work +} + +// NewJob creates a new Job to send down the work queue with context and all that stuff. +func NewJob(ctx context.Context, work Work) Job { + return Job{ctx, work} +} + +// Dispatcher carries the job queue, the errgroup and the number of workers +// to start. type Dispatcher struct { queue chan Job eg *errgroup.Group numWorkers int } +// NewDispatcher creates a new Dispatcher. +// It takes a context and adds it to the errgroup creation and returns it again. func NewDispatcher(ctx context.Context, numWorkers int) (*Dispatcher, context.Context) { eg, ctx := errgroup.WithContext(ctx) @@ -31,7 +53,8 @@ func NewDispatcher(ctx context.Context, numWorkers int) (*Dispatcher, context.Co }, ctx } -func (d *Dispatcher) Start(ctx context.Context) { +// Start starts the configured number of workers and waits for jobs. +func (d *Dispatcher) Start() { for i := 0; i < d.numWorkers; i++ { logger := log.With().Caller().Int("worker", i).Logger() logger.Info().Msg("starting worker") @@ -41,12 +64,12 @@ func (d *Dispatcher) Start(ctx context.Context) { errChan := make(chan error) go func() { - errChan <- j(ctx) + errChan <- j.work(j.ctx) }() select { - case <-ctx.Done(): - return fmt.Errorf("got error from context: %w", ctx.Err()) + case <-j.ctx.Done(): + return fmt.Errorf("got error from context: %w", j.ctx.Err()) case err := <-errChan: if err != nil { return fmt.Errorf("go error from work function: %w", err) @@ -61,7 +84,7 @@ func (d *Dispatcher) Start(ctx context.Context) { } } -// Append adds a job to the queue. +// Append adds a job to the work queue. func (d *Dispatcher) Append(job Job) { log.Debug().Msg("adds job") d.queue <- job diff --git a/workgroups_test.go b/workgroups_test.go index ad06013..918aa5e 100644 --- a/workgroups_test.go +++ b/workgroups_test.go @@ -32,10 +32,10 @@ func TestDispatcher(t *testing.T) { } d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0)) - d.Start(ctx) + d.Start() for i := 0; i < 10; i++ { - d.Append(work) + d.Append(workgroups.NewJob(ctx, work)) } d.Close() @@ -53,8 +53,8 @@ func TestDispatcherError(t *testing.T) { } d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0)) - d.Start(ctx) - d.Append(work) + d.Start() + d.Append(workgroups.NewJob(ctx, work)) d.Close() err := d.Wait() @@ -70,12 +70,12 @@ func TestDispatcherTimeout(t *testing.T) { return nil } - ctx, cancel := context.WithTimeout(context.Background(), time.Second/2) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0)) - d.Start(ctx) - d.Append(work) + d.Start() + d.Append(workgroups.NewJob(ctx, work)) d.Close() err := d.Wait() require.EqualError(err, "error on waiting: got error from context: context deadline exceeded")