diff --git a/workgroups.go b/workgroups.go index 0ba661f..78ac8d4 100644 --- a/workgroups.go +++ b/workgroups.go @@ -84,7 +84,8 @@ func (d *Dispatcher) Start() { err := j.work(j.ctx) select { case <-j.ctx.Done(): - return + log := d.log + log.V(1).Info("received job return after canceled context", "worker", i, "return", err) default: errChan <- err } diff --git a/workgroups_test.go b/workgroups_test.go index 292472e..0660cdc 100644 --- a/workgroups_test.go +++ b/workgroups_test.go @@ -2,6 +2,7 @@ package workgroups_test import ( + "bytes" "context" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/go-logr/stdr" "github.com/stretchr/testify/require" + "github.com/tonglil/buflogr" "go.xsfx.dev/workgroups" ) @@ -193,3 +195,44 @@ func TestRetry(t *testing.T) { }) } } + +func TestErrChanNotUsed(t *testing.T) { + var buf bytes.Buffer + log := buflogr.NewWithBuffer(&buf) + + require := require.New(t) + + work := func(ctx context.Context) error { + time.Sleep(5 * time.Second) + + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + d, ctx := workgroups.NewDispatcher(ctx, log, runtime.GOMAXPROCS(0), 1) + d.Start() + d.Append(workgroups.NewJob(ctx, work)) + d.Close() + + go func() { + time.Sleep(time.Second / 2) + cancel() + }() + + err := d.Wait() + require.ErrorIs(err, context.Canceled) + time.Sleep(10 * time.Second) + + // Breaking glass! + s := log.GetSink() + + underlier, ok := s.(buflogr.Underlier) + if !ok { + t.FailNow() + } + + bl := underlier.GetUnderlying() + bl.Mutex().Lock() + require.Contains(buf.String(), "received job return after canceled context") + bl.Mutex().Unlock() +}