Start doesnt take context no more. also use NewJob for creating workload

This commit is contained in:
Marvin Preuss 2021-09-27 10:44:58 +02:00
parent 06f1ce010a
commit 15d8f9d5ce
3 changed files with 38 additions and 15 deletions

View File

@ -31,10 +31,10 @@ func Example() {
}
// Starting up the workers.
d.Start(ctx)
d.Start()
// Feeding the workers some work.
d.Append(work)
d.Append(workgroups.NewJob(ctx, work))
// Closing the channel for work.
d.Close()

View File

@ -11,16 +11,38 @@ import (
"golang.org/x/sync/errgroup"
)
// ErrInWorker is an error that gets returned if there is a error
// in the work function.
var ErrInWorker = errors.New("received error in worker")
type Job func(ctx context.Context) error
// Work is a type that defines worker work.
type Work func(ctx context.Context) error
// Job carries a job with everything it needs.
// I know know that contexts shouldnt be stored in a struct.
// Here is an exception, because its a short living object.
// The context is only used as argument for the Work function.
// Please use the NewJob function to get around this context in struct shenanigans.
type Job struct {
ctx context.Context
work Work
}
// NewJob creates a new Job to send down the work queue with context and all that stuff.
func NewJob(ctx context.Context, work Work) Job {
return Job{ctx, work}
}
// Dispatcher carries the job queue, the errgroup and the number of workers
// to start.
type Dispatcher struct {
queue chan Job
eg *errgroup.Group
numWorkers int
}
// NewDispatcher creates a new Dispatcher.
// It takes a context and adds it to the errgroup creation and returns it again.
func NewDispatcher(ctx context.Context, numWorkers int) (*Dispatcher, context.Context) {
eg, ctx := errgroup.WithContext(ctx)
@ -31,7 +53,8 @@ func NewDispatcher(ctx context.Context, numWorkers int) (*Dispatcher, context.Co
}, ctx
}
func (d *Dispatcher) Start(ctx context.Context) {
// Start starts the configured number of workers and waits for jobs.
func (d *Dispatcher) Start() {
for i := 0; i < d.numWorkers; i++ {
logger := log.With().Caller().Int("worker", i).Logger()
logger.Info().Msg("starting worker")
@ -41,12 +64,12 @@ func (d *Dispatcher) Start(ctx context.Context) {
errChan := make(chan error)
go func() {
errChan <- j(ctx)
errChan <- j.work(j.ctx)
}()
select {
case <-ctx.Done():
return fmt.Errorf("got error from context: %w", ctx.Err())
case <-j.ctx.Done():
return fmt.Errorf("got error from context: %w", j.ctx.Err())
case err := <-errChan:
if err != nil {
return fmt.Errorf("go error from work function: %w", err)
@ -61,7 +84,7 @@ func (d *Dispatcher) Start(ctx context.Context) {
}
}
// Append adds a job to the queue.
// Append adds a job to the work queue.
func (d *Dispatcher) Append(job Job) {
log.Debug().Msg("adds job")
d.queue <- job

View File

@ -32,10 +32,10 @@ func TestDispatcher(t *testing.T) {
}
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0))
d.Start(ctx)
d.Start()
for i := 0; i < 10; i++ {
d.Append(work)
d.Append(workgroups.NewJob(ctx, work))
}
d.Close()
@ -53,8 +53,8 @@ func TestDispatcherError(t *testing.T) {
}
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0))
d.Start(ctx)
d.Append(work)
d.Start()
d.Append(workgroups.NewJob(ctx, work))
d.Close()
err := d.Wait()
@ -70,12 +70,12 @@ func TestDispatcherTimeout(t *testing.T) {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second/2)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0))
d.Start(ctx)
d.Append(work)
d.Start()
d.Append(workgroups.NewJob(ctx, work))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: got error from context: context deadline exceeded")