diff --git a/example_test.go b/example_test.go index 18be835..5110be1 100644 --- a/example_test.go +++ b/example_test.go @@ -2,9 +2,11 @@ package workgroups_test import ( "context" + "errors" "fmt" "log" "runtime" + "time" "go.xsfx.dev/workgroups" ) @@ -48,3 +50,59 @@ func Example() { // Output: // hello world from work } + +func ExampleRetry() { + d, ctx := workgroups.NewDispatcher( + context.Background(), + runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go. + 10, // Capacity of the queue. + ) + + // Just returning some error. So it can retry. + failFunc := func() error { + fmt.Print("fail ") + + return errors.New("fail") //nolint:goerr113 + } + + work := func(ctx context.Context) error { + // Check if context already expired. + // Return if its the case, else just go forward. + select { + case <-ctx.Done(): + return fmt.Errorf("got error from context: %w", ctx.Err()) + default: + } + + if err := failFunc(); err != nil { + return err + } + + return nil + } + + // Starting up the workers. + d.Start() + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + // Feeding the workers some work. + d.Append( + workgroups.NewJob( + ctx, + workgroups.Retry(ctx, time.Second/2)(work), // This will retry after a half second. + ), + ) + + // Closing the channel for work. + d.Close() + + // Waiting to finnish everything. + if err := d.Wait(); err != nil { + fmt.Print(err) + } + + // Output: + // fail fail error on waiting: got error from context: context deadline exceeded +} diff --git a/workgroups.go b/workgroups.go index 135a109..02d6d83 100644 --- a/workgroups.go +++ b/workgroups.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" @@ -38,7 +39,7 @@ type Work func(ctx context.Context) error // The context is only used as argument for the Work function. // Please use the NewJob function to get around this context in struct shenanigans. type Job struct { - ctx context.Context + ctx context.Context //nolint:containedctx work Work } @@ -120,3 +121,22 @@ func (d *Dispatcher) Wait() error { return nil } + +// Retry is a middleware for doing a retry in executing job work. +func Retry(ctx context.Context, wait time.Duration) func(Work) Work { + return func(next Work) Work { + return func(ctx context.Context) error { + for { + if err := next(ctx); err == nil { + return nil + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout while fetching information (last error: %w)", ctx.Err()) + case <-time.After(wait): + } + } + } + } +} diff --git a/workgroups_test.go b/workgroups_test.go index e434261..f7f74c4 100644 --- a/workgroups_test.go +++ b/workgroups_test.go @@ -3,6 +3,7 @@ package workgroups_test import ( "context" + "errors" "fmt" "runtime" "sync" @@ -105,3 +106,77 @@ func TestDispatcherTimeout(t *testing.T) { err := d.Wait() require.EqualError(err, "error on waiting: got error from context: context deadline exceeded") } + +var errTest = errors.New("just a test") + +type counter struct { + sync.Mutex + count int +} + +func (c *counter) Work(ctx context.Context) error { + select { + case <-ctx.Done(): + return fmt.Errorf("got error from context: %w", ctx.Err()) + default: + } + + c.Lock() + c.count++ + c.Unlock() + + return errTest +} + +func TestRetry(t *testing.T) { + type args struct { + timeout time.Duration + } + + tests := []struct { + name string + args args + greaterThan int + err error + }{ + { + "00", + args{ + time.Second, + }, + 500, + context.DeadlineExceeded, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout) + defer cancel() + + c := counter{} + d, ctx := workgroups.NewDispatcher( + ctx, + 1, + 1, + ) + + d.Start() + d.Append( + workgroups.NewJob( + ctx, + workgroups.Retry( + ctx, + time.Millisecond, + )(c.Work), + ), + ) + d.Close() + err := d.Wait() + + require.ErrorIs(t, err, tt.err) + require.Greater(t, c.count, tt.greaterThan) + }) + } +}