feat: adds retry middleware

it retries work until ctx.Done chan receives something.
This commit is contained in:
Marvin Preuss 2022-03-29 11:42:17 +02:00
parent bcf20fb800
commit 22edc0f9f5
3 changed files with 154 additions and 1 deletions

View File

@ -2,9 +2,11 @@ package workgroups_test
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log" "log"
"runtime" "runtime"
"time"
"go.xsfx.dev/workgroups" "go.xsfx.dev/workgroups"
) )
@ -48,3 +50,59 @@ func Example() {
// Output: // Output:
// hello world from work // hello world from work
} }
func ExampleRetry() {
d, ctx := workgroups.NewDispatcher(
context.Background(),
runtime.GOMAXPROCS(0), // This starts as much worker as maximal processes are allowed for go.
10, // Capacity of the queue.
)
// Just returning some error. So it can retry.
failFunc := func() error {
fmt.Print("fail ")
return errors.New("fail") //nolint:goerr113
}
work := func(ctx context.Context) error {
// Check if context already expired.
// Return if its the case, else just go forward.
select {
case <-ctx.Done():
return fmt.Errorf("got error from context: %w", ctx.Err())
default:
}
if err := failFunc(); err != nil {
return err
}
return nil
}
// Starting up the workers.
d.Start()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
// Feeding the workers some work.
d.Append(
workgroups.NewJob(
ctx,
workgroups.Retry(ctx, time.Second/2)(work), // This will retry after a half second.
),
)
// Closing the channel for work.
d.Close()
// Waiting to finnish everything.
if err := d.Wait(); err != nil {
fmt.Print(err)
}
// Output:
// fail fail error on waiting: got error from context: context deadline exceeded
}

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -38,7 +39,7 @@ type Work func(ctx context.Context) error
// The context is only used as argument for the Work function. // The context is only used as argument for the Work function.
// Please use the NewJob function to get around this context in struct shenanigans. // Please use the NewJob function to get around this context in struct shenanigans.
type Job struct { type Job struct {
ctx context.Context ctx context.Context //nolint:containedctx
work Work work Work
} }
@ -120,3 +121,22 @@ func (d *Dispatcher) Wait() error {
return nil return nil
} }
// Retry is a middleware for doing a retry in executing job work.
func Retry(ctx context.Context, wait time.Duration) func(Work) Work {
return func(next Work) Work {
return func(ctx context.Context) error {
for {
if err := next(ctx); err == nil {
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf("timeout while fetching information (last error: %w)", ctx.Err())
case <-time.After(wait):
}
}
}
}
}

View File

@ -3,6 +3,7 @@ package workgroups_test
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"runtime" "runtime"
"sync" "sync"
@ -105,3 +106,77 @@ func TestDispatcherTimeout(t *testing.T) {
err := d.Wait() err := d.Wait()
require.EqualError(err, "error on waiting: got error from context: context deadline exceeded") require.EqualError(err, "error on waiting: got error from context: context deadline exceeded")
} }
var errTest = errors.New("just a test")
type counter struct {
sync.Mutex
count int
}
func (c *counter) Work(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("got error from context: %w", ctx.Err())
default:
}
c.Lock()
c.count++
c.Unlock()
return errTest
}
func TestRetry(t *testing.T) {
type args struct {
timeout time.Duration
}
tests := []struct {
name string
args args
greaterThan int
err error
}{
{
"00",
args{
time.Second,
},
500,
context.DeadlineExceeded,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
defer cancel()
c := counter{}
d, ctx := workgroups.NewDispatcher(
ctx,
1,
1,
)
d.Start()
d.Append(
workgroups.NewJob(
ctx,
workgroups.Retry(
ctx,
time.Millisecond,
)(c.Work),
),
)
d.Close()
err := d.Wait()
require.ErrorIs(t, err, tt.err)
require.Greater(t, c.count, tt.greaterThan)
})
}
}