workgroups/workgroups_test.go
Marvin Preuss 22edc0f9f5 feat: adds retry middleware
it retries work until ctx.Done chan receives something.
2022-03-29 11:42:17 +02:00

183 lines
3.5 KiB
Go

//nolint:paralleltest
package workgroups_test
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"testing"
"time"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/require"
"go.xsfx.dev/workgroups"
)
func TestDispatcher(t *testing.T) {
require := require.New(t)
collector := struct {
sync.RWMutex
results []int
}{results: []int{}}
work := func(ctx context.Context) error {
log.Print("printing this from inner work function")
collector.Lock()
collector.results = append(collector.results, 1)
collector.Unlock()
return nil
}
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0), 20)
d.Start()
for i := 0; i < 10; i++ {
d.Append(workgroups.NewJob(ctx, work))
}
d.Close()
err := d.Wait()
require.NoError(err)
require.Equal(10, len(collector.results))
}
func TestDispatcherError(t *testing.T) {
require := require.New(t)
errWork := func(ctx context.Context) error {
return fmt.Errorf("this is an error") //nolint:goerr113
}
okWork := func(ctx context.Context) error {
return nil
}
d, ctx := workgroups.NewDispatcher(context.Background(), runtime.GOMAXPROCS(0), 2)
d.Start()
d.Append(workgroups.NewJob(ctx, errWork))
d.Append(workgroups.NewJob(ctx, okWork))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: go error from work function: this is an error")
}
func TestDispatcherErrorOneWorker(t *testing.T) {
require := require.New(t)
errWork := func(ctx context.Context) error {
return fmt.Errorf("this is an error") //nolint:goerr113
}
okWork := func(ctx context.Context) error {
return nil
}
d, ctx := workgroups.NewDispatcher(context.Background(), 1, 1)
d.Start()
d.Append(workgroups.NewJob(ctx, errWork))
d.Append(workgroups.NewJob(ctx, okWork))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: go error from work function: this is an error")
}
func TestDispatcherTimeout(t *testing.T) {
require := require.New(t)
work := func(ctx context.Context) error {
log.Print("sleeping...")
time.Sleep(5 * time.Second)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
d, ctx := workgroups.NewDispatcher(ctx, runtime.GOMAXPROCS(0), 1)
d.Start()
d.Append(workgroups.NewJob(ctx, work))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: got error from context: context deadline exceeded")
}
var errTest = errors.New("just a test")
type counter struct {
sync.Mutex
count int
}
func (c *counter) Work(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("got error from context: %w", ctx.Err())
default:
}
c.Lock()
c.count++
c.Unlock()
return errTest
}
func TestRetry(t *testing.T) {
type args struct {
timeout time.Duration
}
tests := []struct {
name string
args args
greaterThan int
err error
}{
{
"00",
args{
time.Second,
},
500,
context.DeadlineExceeded,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
defer cancel()
c := counter{}
d, ctx := workgroups.NewDispatcher(
ctx,
1,
1,
)
d.Start()
d.Append(
workgroups.NewJob(
ctx,
workgroups.Retry(
ctx,
time.Millisecond,
)(c.Work),
),
)
d.Close()
err := d.Wait()
require.ErrorIs(t, err, tt.err)
require.Greater(t, c.count, tt.greaterThan)
})
}
}