From b1d12f53b0cbd4125da66ec18f9f53c7660a892a Mon Sep 17 00:00:00 2001 From: Marvin Preuss Date: Thu, 7 Oct 2021 13:55:22 +0200 Subject: [PATCH] workgroups.NewDispatcher takes a queue size --- README.md | 1 + example_test.go | 1 + workgroups.go | 4 ++-- workgroups_test.go | 35 ++++++++++++++++++++++++++++++----- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 141cb07..7b2a2ad 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ func main() { d, ctx := workgroups.NewDispatcher( context.Background(), runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go. + 10, // Capacity of the queue. ) work := func(ctx context.Context) error { diff --git a/example_test.go b/example_test.go index 55bdf4a..18be835 100644 --- a/example_test.go +++ b/example_test.go @@ -13,6 +13,7 @@ func Example() { d, ctx := workgroups.NewDispatcher( context.Background(), runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go. + 10, // Capacity of the queue. ) work := func(ctx context.Context) error { diff --git a/workgroups.go b/workgroups.go index 6840171..6720a09 100644 --- a/workgroups.go +++ b/workgroups.go @@ -57,11 +57,11 @@ type Dispatcher struct { // NewDispatcher creates a new Dispatcher. // It takes a context and adds it to the errgroup creation and returns it again. -func NewDispatcher(ctx context.Context, numWorkers int) (*Dispatcher, context.Context) { +func NewDispatcher(ctx context.Context, numWorkers, workLength int) (*Dispatcher, context.Context) { eg, ctx := errgroup.WithContext(ctx) return &Dispatcher{ - queue: make(chan Job, numWorkers), + queue: make(chan Job, workLength), eg: eg, numWorkers: numWorkers, }, ctx diff --git a/workgroups_test.go b/workgroups_test.go index 918aa5e..e434261 100644 --- a/workgroups_test.go +++ b/workgroups_test.go @@ -31,7 +31,7 @@ func TestDispatcher(t *testing.T) { return nil } - d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0)) + d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0), 20) d.Start() for i := 0; i < 10; i++ { @@ -48,13 +48,38 @@ func TestDispatcher(t *testing.T) { func TestDispatcherError(t *testing.T) { require := require.New(t) - work := func(ctx context.Context) error { + errWork := func(ctx context.Context) error { return fmt.Errorf("this is an error") //nolint:goerr113 } - d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0)) + okWork := func(ctx context.Context) error { + return nil + } + + d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0), 2) d.Start() - d.Append(workgroups.NewJob(ctx, work)) + d.Append(workgroups.NewJob(ctx, errWork)) + d.Append(workgroups.NewJob(ctx, okWork)) + d.Close() + err := d.Wait() + + require.EqualError(err, "error on waiting: go error from work function: this is an error") +} + +func TestDispatcherErrorOneWorker(t *testing.T) { + require := require.New(t) + errWork := func(ctx context.Context) error { + return fmt.Errorf("this is an error") //nolint:goerr113 + } + + okWork := func(ctx context.Context) error { + return nil + } + + d, ctx := workgroups.NewDispatcher(context.Background(), 1, 1) + d.Start() + d.Append(workgroups.NewJob(ctx, errWork)) + d.Append(workgroups.NewJob(ctx, okWork)) d.Close() err := d.Wait() @@ -73,7 +98,7 @@ func TestDispatcherTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0)) + d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0), 1) d.Start() d.Append(workgroups.NewJob(ctx, work)) d.Close()