workgroups.NewDispatcher takes a queue size
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing

This commit is contained in:
Marvin Preuss 2021-10-07 13:55:22 +02:00
parent 649a97a97c
commit b1d12f53b0
4 changed files with 34 additions and 7 deletions

View File

@ -91,6 +91,7 @@ func main() {
d, ctx := workgroups.NewDispatcher( d, ctx := workgroups.NewDispatcher(
context.Background(), context.Background(),
runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go. runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
10, // Capacity of the queue.
) )
work := func(ctx context.Context) error { work := func(ctx context.Context) error {

View File

@ -13,6 +13,7 @@ func Example() {
d, ctx := workgroups.NewDispatcher( d, ctx := workgroups.NewDispatcher(
context.Background(), context.Background(),
runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go. runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
10, // Capacity of the queue.
) )
work := func(ctx context.Context) error { work := func(ctx context.Context) error {

View File

@ -57,11 +57,11 @@ type Dispatcher struct {
// NewDispatcher creates a new Dispatcher. // NewDispatcher creates a new Dispatcher.
// It takes a context and adds it to the errgroup creation and returns it again. // It takes a context and adds it to the errgroup creation and returns it again.
func NewDispatcher(ctx context.Context, numWorkers int) (*Dispatcher, context.Context) { func NewDispatcher(ctx context.Context, numWorkers, workLength int) (*Dispatcher, context.Context) {
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
return &Dispatcher{ return &Dispatcher{
queue: make(chan Job, numWorkers), queue: make(chan Job, workLength),
eg: eg, eg: eg,
numWorkers: numWorkers, numWorkers: numWorkers,
}, ctx }, ctx

View File

@ -31,7 +31,7 @@ func TestDispatcher(t *testing.T) {
return nil return nil
} }
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0)) d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0), 20)
d.Start() d.Start()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -48,13 +48,38 @@ func TestDispatcher(t *testing.T) {
func TestDispatcherError(t *testing.T) { func TestDispatcherError(t *testing.T) {
require := require.New(t) require := require.New(t)
work := func(ctx context.Context) error { errWork := func(ctx context.Context) error {
return fmt.Errorf("this is an error") //nolint:goerr113 return fmt.Errorf("this is an error") //nolint:goerr113
} }
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0)) okWork := func(ctx context.Context) error {
return nil
}
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0), 2)
d.Start() d.Start()
d.Append(workgroups.NewJob(ctx, work)) d.Append(workgroups.NewJob(ctx, errWork))
d.Append(workgroups.NewJob(ctx, okWork))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: go error from work function: this is an error")
}
func TestDispatcherErrorOneWorker(t *testing.T) {
require := require.New(t)
errWork := func(ctx context.Context) error {
return fmt.Errorf("this is an error") //nolint:goerr113
}
okWork := func(ctx context.Context) error {
return nil
}
d, ctx := workgroups.NewDispatcher(context.Background(), 1, 1)
d.Start()
d.Append(workgroups.NewJob(ctx, errWork))
d.Append(workgroups.NewJob(ctx, okWork))
d.Close() d.Close()
err := d.Wait() err := d.Wait()
@ -73,7 +98,7 @@ func TestDispatcherTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel() defer cancel()
d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0)) d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0), 1)
d.Start() d.Start()
d.Append(workgroups.NewJob(ctx, work)) d.Append(workgroups.NewJob(ctx, work))
d.Close() d.Close()