workgroups/workgroups_test.go
Marvin Preuss 83d1e50fd1
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
feat: hello again old logging friend
found a way to log late work error without race condition.
2022-05-17 13:43:58 +02:00

239 lines
4.6 KiB
Go

//nolint:paralleltest
package workgroups_test
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"os"
"runtime"
"sync"
"testing"
"time"
"github.com/go-logr/stdr"
"github.com/stretchr/testify/require"
"github.com/tonglil/buflogr"
"go.xsfx.dev/workgroups"
)
func TestDispatcher(t *testing.T) {
require := require.New(t)
collector := struct {
sync.RWMutex
results []int
}{results: []int{}}
work := func(ctx context.Context) error {
log.Print("printing this from inner work function")
collector.Lock()
collector.results = append(collector.results, 1)
collector.Unlock()
return nil
}
d, ctx := workgroups.NewDispatcher(
context.Background(),
stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
runtime.GOMAXPROCS(0),
20,
)
d.Start()
for i := 0; i < 10; i++ {
d.Append(workgroups.NewJob(ctx, work))
}
d.Close()
err := d.Wait()
require.NoError(err)
require.Equal(10, len(collector.results))
}
func TestDispatcherError(t *testing.T) {
require := require.New(t)
errWork := func(ctx context.Context) error {
return fmt.Errorf("this is an error") //nolint:goerr113
}
okWork := func(ctx context.Context) error {
return nil
}
d, ctx := workgroups.NewDispatcher(
context.Background(),
stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
runtime.GOMAXPROCS(0),
2,
)
d.Start()
d.Append(workgroups.NewJob(ctx, errWork))
d.Append(workgroups.NewJob(ctx, okWork))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: go error from work function: this is an error")
}
func TestDispatcherErrorOneWorker(t *testing.T) {
require := require.New(t)
errWork := func(ctx context.Context) error {
return fmt.Errorf("this is an error") //nolint:goerr113
}
okWork := func(ctx context.Context) error {
return nil
}
d, ctx := workgroups.NewDispatcher(context.Background(), stdr.New(log.New(os.Stderr, "", log.Lshortfile)), 1, 1)
d.Start()
d.Append(workgroups.NewJob(ctx, errWork))
d.Append(workgroups.NewJob(ctx, okWork))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: go error from work function: this is an error")
}
func TestDispatcherTimeout(t *testing.T) {
require := require.New(t)
work := func(ctx context.Context) error {
log.Print("sleeping...")
time.Sleep(5 * time.Second)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
d, ctx := workgroups.NewDispatcher(ctx, stdr.New(log.New(os.Stderr, "", log.Lshortfile)), runtime.GOMAXPROCS(0), 1)
d.Start()
d.Append(workgroups.NewJob(ctx, work))
d.Close()
err := d.Wait()
require.EqualError(err, "error on waiting: got error from context: context deadline exceeded")
}
var errTest = errors.New("just a test")
type counter struct {
sync.Mutex
count int
}
func (c *counter) Work(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("got error from context: %w", ctx.Err())
default:
}
c.Lock()
c.count++
c.Unlock()
return errTest
}
func TestRetry(t *testing.T) {
type args struct {
timeout time.Duration
}
tests := []struct {
name string
args args
greaterThan int
err error
}{
{
"00",
args{
time.Second,
},
500,
context.DeadlineExceeded,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
defer cancel()
c := counter{}
d, ctx := workgroups.NewDispatcher(
ctx,
stdr.New(log.New(os.Stderr, "", log.Lshortfile)),
1,
1,
)
d.Start()
d.Append(
workgroups.NewJob(
ctx,
workgroups.Retry(
ctx,
time.Millisecond,
)(c.Work),
),
)
d.Close()
err := d.Wait()
require.ErrorIs(t, err, tt.err)
require.Greater(t, c.count, tt.greaterThan)
})
}
}
func TestErrChanNotUsed(t *testing.T) {
var buf bytes.Buffer
log := buflogr.NewWithBuffer(&buf)
require := require.New(t)
work := func(ctx context.Context) error {
time.Sleep(5 * time.Second)
return nil
}
ctx, cancel := context.WithCancel(context.Background())
d, ctx := workgroups.NewDispatcher(ctx, log, runtime.GOMAXPROCS(0), 1)
d.Start()
d.Append(workgroups.NewJob(ctx, work))
d.Close()
go func() {
time.Sleep(time.Second / 2)
cancel()
}()
err := d.Wait()
require.ErrorIs(err, context.Canceled)
time.Sleep(10 * time.Second)
// Breaking glass!
s := log.GetSink()
underlier, ok := s.(buflogr.Underlier)
if !ok {
t.FailNow()
}
bl := underlier.GetUnderlying()
bl.Mutex().Lock()
require.Contains(buf.String(), "received job return after canceled context")
bl.Mutex().Unlock()
}