mirror of
https://github.com/coder/coder.git
synced 2025-07-12 00:14:10 +00:00
feat: add cleanup strategy to loadtest (#4991)
This commit is contained in:
@ -135,8 +135,9 @@ func loadtest() *cobra.Command {
|
||||
client.PropagateTracing = tracePropagate
|
||||
|
||||
// Prepare the test.
|
||||
strategy := config.Strategy.ExecutionStrategy()
|
||||
th := harness.NewTestHarness(strategy)
|
||||
runStrategy := config.Strategy.ExecutionStrategy()
|
||||
cleanupStrategy := config.CleanupStrategy.ExecutionStrategy()
|
||||
th := harness.NewTestHarness(runStrategy, cleanupStrategy)
|
||||
|
||||
for i, t := range config.Tests {
|
||||
name := fmt.Sprintf("%s-%d", t.Type, i)
|
||||
|
@ -38,6 +38,9 @@ func TestLoadTest(t *testing.T) {
|
||||
Strategy: cli.LoadTestStrategy{
|
||||
Type: cli.LoadTestStrategyTypeLinear,
|
||||
},
|
||||
CleanupStrategy: cli.LoadTestStrategy{
|
||||
Type: cli.LoadTestStrategyTypeLinear,
|
||||
},
|
||||
Tests: []cli.LoadTest{
|
||||
{
|
||||
Type: cli.LoadTestTypePlacebo,
|
||||
@ -89,6 +92,10 @@ func TestLoadTest(t *testing.T) {
|
||||
Type: cli.LoadTestStrategyTypeConcurrent,
|
||||
ConcurrencyLimit: 2,
|
||||
},
|
||||
CleanupStrategy: cli.LoadTestStrategy{
|
||||
Type: cli.LoadTestStrategyTypeConcurrent,
|
||||
ConcurrencyLimit: 2,
|
||||
},
|
||||
Tests: []cli.LoadTest{
|
||||
{
|
||||
Type: cli.LoadTestTypeWorkspaceBuild,
|
||||
@ -210,6 +217,9 @@ func TestLoadTest(t *testing.T) {
|
||||
Strategy: cli.LoadTestStrategy{
|
||||
Type: cli.LoadTestStrategyTypeLinear,
|
||||
},
|
||||
CleanupStrategy: cli.LoadTestStrategy{
|
||||
Type: cli.LoadTestStrategyTypeLinear,
|
||||
},
|
||||
Tests: []cli.LoadTest{
|
||||
{
|
||||
Type: cli.LoadTestTypePlacebo,
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
// LoadTestConfig is the overall configuration for a call to `coder loadtest`.
|
||||
type LoadTestConfig struct {
|
||||
Strategy LoadTestStrategy `json:"strategy"`
|
||||
CleanupStrategy LoadTestStrategy `json:"cleanup_strategy"`
|
||||
Tests []LoadTest `json:"tests"`
|
||||
// Timeout sets a timeout for the entire test run, to control the timeout
|
||||
// for each individual run use strategy.timeout.
|
||||
@ -134,6 +135,10 @@ func (c *LoadTestConfig) Validate() error {
|
||||
if err != nil {
|
||||
return xerrors.Errorf("validate strategy: %w", err)
|
||||
}
|
||||
err = c.CleanupStrategy.Validate()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("validate cleanup_strategy: %w", err)
|
||||
}
|
||||
|
||||
for i, test := range c.Tests {
|
||||
err := test.Validate()
|
||||
|
@ -11,19 +11,11 @@ import (
|
||||
"github.com/coder/coder/coderd/tracing"
|
||||
)
|
||||
|
||||
// ExecutionStrategy defines how a TestHarness should execute a set of runs. It
|
||||
// essentially defines the concurrency model for a given testing session.
|
||||
type ExecutionStrategy interface {
|
||||
// Execute runs the given runs in whatever way the strategy wants. An error
|
||||
// may only be returned if the strategy has a failure itself, not if any of
|
||||
// the runs fail.
|
||||
Execute(ctx context.Context, runs []*TestRun) error
|
||||
}
|
||||
|
||||
// TestHarness runs a bunch of registered test runs using the given
|
||||
// ExecutionStrategy.
|
||||
// TestHarness runs a bunch of registered test runs using the given execution
|
||||
// strategies.
|
||||
type TestHarness struct {
|
||||
strategy ExecutionStrategy
|
||||
runStrategy ExecutionStrategy
|
||||
cleanupStrategy ExecutionStrategy
|
||||
|
||||
mut *sync.Mutex
|
||||
runIDs map[string]struct{}
|
||||
@ -33,10 +25,11 @@ type TestHarness struct {
|
||||
elapsed time.Duration
|
||||
}
|
||||
|
||||
// NewTestHarness creates a new TestHarness with the given ExecutionStrategy.
|
||||
func NewTestHarness(strategy ExecutionStrategy) *TestHarness {
|
||||
// NewTestHarness creates a new TestHarness with the given execution strategies.
|
||||
func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy) *TestHarness {
|
||||
return &TestHarness{
|
||||
strategy: strategy,
|
||||
runStrategy: runStrategy,
|
||||
cleanupStrategy: cleanupStrategy,
|
||||
mut: new(sync.Mutex),
|
||||
runIDs: map[string]struct{}{},
|
||||
runs: []*TestRun{},
|
||||
@ -62,11 +55,16 @@ func (h *TestHarness) Run(ctx context.Context) (err error) {
|
||||
h.started = true
|
||||
h.mut.Unlock()
|
||||
|
||||
runFns := make([]TestFn, len(h.runs))
|
||||
for i, run := range h.runs {
|
||||
runFns[i] = run.Run
|
||||
}
|
||||
|
||||
defer close(h.done)
|
||||
defer func() {
|
||||
e := recover()
|
||||
if e != nil {
|
||||
err = xerrors.Errorf("execution strategy panicked: %w", e)
|
||||
err = xerrors.Errorf("panic in harness.Run: %+v", e)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -77,7 +75,9 @@ func (h *TestHarness) Run(ctx context.Context) (err error) {
|
||||
h.elapsed = time.Since(start)
|
||||
}()
|
||||
|
||||
err = h.strategy.Execute(ctx, h.runs)
|
||||
// We don't care about test failures here since they already get recorded
|
||||
// by the *TestRun.
|
||||
_, err = h.runStrategy.Run(ctx, runFns)
|
||||
//nolint:revive // we use named returns because we mutate it in a defer
|
||||
return
|
||||
}
|
||||
@ -96,20 +96,34 @@ func (h *TestHarness) Cleanup(ctx context.Context) (err error) {
|
||||
panic("harness has not finished")
|
||||
}
|
||||
|
||||
cleanupFns := make([]TestFn, len(h.runs))
|
||||
for i, run := range h.runs {
|
||||
cleanupFns[i] = run.Cleanup
|
||||
}
|
||||
|
||||
defer func() {
|
||||
e := recover()
|
||||
if e != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("panic in cleanup: %w", e))
|
||||
err = xerrors.Errorf("panic in harness.Cleanup: %+v", e)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, run := range h.runs {
|
||||
e := run.Cleanup(ctx)
|
||||
if e != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("cleanup for %s failed: %w", run.FullID(), e))
|
||||
}
|
||||
}
|
||||
|
||||
var cleanupErrs []error
|
||||
cleanupErrs, err = h.cleanupStrategy.Run(ctx, cleanupFns)
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("cleanup strategy error: %w", err)
|
||||
//nolint:revive // we use named returns because we mutate it in a defer
|
||||
return
|
||||
}
|
||||
|
||||
var merr error
|
||||
for _, cleanupErr := range cleanupErrs {
|
||||
if cleanupErr != nil {
|
||||
merr = multierror.Append(merr, cleanupErr)
|
||||
}
|
||||
}
|
||||
|
||||
err = merr
|
||||
//nolint:revive // we use named returns because we mutate it in a defer
|
||||
return
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ type panickingExecutionStrategy struct{}
|
||||
|
||||
var _ harness.ExecutionStrategy = panickingExecutionStrategy{}
|
||||
|
||||
func (panickingExecutionStrategy) Execute(_ context.Context, _ []*harness.TestRun) error {
|
||||
func (panickingExecutionStrategy) Run(_ context.Context, _ []harness.TestFn) ([]error, error) {
|
||||
panic(testPanicMessage)
|
||||
}
|
||||
|
||||
@ -28,8 +28,8 @@ type erroringExecutionStrategy struct {
|
||||
|
||||
var _ harness.ExecutionStrategy = erroringExecutionStrategy{}
|
||||
|
||||
func (e erroringExecutionStrategy) Execute(_ context.Context, _ []*harness.TestRun) error {
|
||||
return e.err
|
||||
func (e erroringExecutionStrategy) Run(_ context.Context, _ []harness.TestFn) ([]error, error) {
|
||||
return []error{}, e.err
|
||||
}
|
||||
|
||||
func Test_TestHarness(t *testing.T) {
|
||||
@ -40,7 +40,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
|
||||
expectedErr := xerrors.New("expected error")
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
r1 := h.AddRun("test", "1", fakeTestFns(nil, nil))
|
||||
r2 := h.AddRun("test", "2", fakeTestFns(expectedErr, nil))
|
||||
|
||||
@ -65,7 +65,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
|
||||
expectedErr := xerrors.New("expected error")
|
||||
|
||||
h := harness.NewTestHarness(erroringExecutionStrategy{err: expectedErr})
|
||||
h := harness.NewTestHarness(erroringExecutionStrategy{err: expectedErr}, harness.LinearExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", fakeTestFns(nil, nil))
|
||||
|
||||
err := h.Run(context.Background())
|
||||
@ -76,7 +76,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("CatchesExecutionPanic", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(panickingExecutionStrategy{})
|
||||
h := harness.NewTestHarness(panickingExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", fakeTestFns(nil, nil))
|
||||
|
||||
err := h.Run(context.Background())
|
||||
@ -93,7 +93,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
|
||||
expectedErr := xerrors.New("expected error")
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", fakeTestFns(nil, expectedErr))
|
||||
|
||||
err := h.Run(context.Background())
|
||||
@ -107,7 +107,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("Panic", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", testFns{
|
||||
RunFn: func(_ context.Context, _ string, _ io.Writer) error {
|
||||
return nil
|
||||
@ -125,6 +125,44 @@ func Test_TestHarness(t *testing.T) {
|
||||
require.ErrorContains(t, err, "panic")
|
||||
require.ErrorContains(t, err, testPanicMessage)
|
||||
})
|
||||
|
||||
t.Run("CatchesExecutionError", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
expectedErr := xerrors.New("expected error")
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, erroringExecutionStrategy{err: expectedErr})
|
||||
_ = h.AddRun("test", "1", fakeTestFns(nil, nil))
|
||||
|
||||
err := h.Run(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.Cleanup(context.Background())
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, expectedErr)
|
||||
})
|
||||
|
||||
t.Run("CatchesExecutionPanic", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, panickingExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", testFns{
|
||||
RunFn: func(_ context.Context, _ string, _ io.Writer) error {
|
||||
return nil
|
||||
},
|
||||
CleanupFn: func(_ context.Context, _ string) error {
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
err := h.Run(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.Cleanup(context.Background())
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "panic")
|
||||
require.ErrorContains(t, err, testPanicMessage)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Panics", func(t *testing.T) {
|
||||
@ -133,7 +171,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("RegisterAfterStart", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
_ = h.Run(context.Background())
|
||||
|
||||
require.Panics(t, func() {
|
||||
@ -144,7 +182,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("DuplicateTestID", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
|
||||
name, id := "test", "1"
|
||||
_ = h.AddRun(name, id, fakeTestFns(nil, nil))
|
||||
@ -157,7 +195,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("StartedTwice", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
h.Run(context.Background())
|
||||
|
||||
require.Panics(t, func() {
|
||||
@ -168,7 +206,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("ResultsBeforeStart", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
|
||||
require.Panics(t, func() {
|
||||
h.Results()
|
||||
@ -183,7 +221,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
endRun = make(chan struct{})
|
||||
testsEnded = make(chan struct{})
|
||||
)
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", testFns{
|
||||
RunFn: func(_ context.Context, _ string, _ io.Writer) error {
|
||||
close(started)
|
||||
@ -210,14 +248,14 @@ func Test_TestHarness(t *testing.T) {
|
||||
t.Run("CleanupBeforeStart", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
|
||||
require.Panics(t, func() {
|
||||
h.Cleanup(context.Background())
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("ClenaupBeforeFinish", func(t *testing.T) {
|
||||
t.Run("CleanupBeforeFinish", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
@ -225,7 +263,7 @@ func Test_TestHarness(t *testing.T) {
|
||||
endRun = make(chan struct{})
|
||||
testsEnded = make(chan struct{})
|
||||
)
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{})
|
||||
h := harness.NewTestHarness(harness.LinearExecutionStrategy{}, harness.LinearExecutionStrategy{})
|
||||
_ = h.AddRun("test", "1", testFns{
|
||||
RunFn: func(_ context.Context, _ string, _ io.Writer) error {
|
||||
close(started)
|
||||
|
70
loadtest/harness/results_test.go
Normal file
70
loadtest/harness/results_test.go
Normal file
@ -0,0 +1,70 @@
|
||||
package harness_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/coder/coder/coderd/httpapi"
|
||||
"github.com/coder/coder/loadtest/harness"
|
||||
)
|
||||
|
||||
func Test_Results(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
results := harness.Results{
|
||||
TotalRuns: 10,
|
||||
TotalPass: 8,
|
||||
TotalFail: 2,
|
||||
Runs: map[string]harness.RunResult{
|
||||
"test-0/0": {
|
||||
FullID: "test-0/0",
|
||||
TestName: "test-0",
|
||||
ID: "0",
|
||||
Logs: "test-0/0 log line 1\ntest-0/0 log line 2",
|
||||
Error: xerrors.New("test-0/0 error"),
|
||||
StartedAt: time.Now(),
|
||||
Duration: httpapi.Duration(time.Second),
|
||||
DurationMS: 1000,
|
||||
},
|
||||
"test-0/1": {
|
||||
FullID: "test-0/1",
|
||||
TestName: "test-0",
|
||||
ID: "1",
|
||||
Logs: "test-0/1 log line 1\ntest-0/1 log line 2",
|
||||
Error: nil,
|
||||
StartedAt: time.Now(),
|
||||
Duration: httpapi.Duration(time.Second),
|
||||
DurationMS: 1000,
|
||||
},
|
||||
},
|
||||
Elapsed: httpapi.Duration(time.Second),
|
||||
ElapsedMS: 1000,
|
||||
}
|
||||
|
||||
expected := `
|
||||
== FAIL: test-0/0
|
||||
|
||||
Error: test-0/0 error
|
||||
|
||||
Log:
|
||||
test-0/0 log line 1
|
||||
|
||||
|
||||
Test results:
|
||||
Pass: 8
|
||||
Fail: 2
|
||||
Total: 10
|
||||
|
||||
Total duration: 1s
|
||||
Avg. duration: 200ms
|
||||
`
|
||||
|
||||
out := bytes.NewBuffer(nil)
|
||||
results.PrintText(out)
|
||||
|
||||
require.Equal(t, expected, out.String())
|
||||
}
|
@ -108,7 +108,7 @@ func (r *TestRun) Run(ctx context.Context) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *TestRun) Cleanup(ctx context.Context) error {
|
||||
func (r *TestRun) Cleanup(ctx context.Context) (err error) {
|
||||
c, ok := r.runner.(Cleanable)
|
||||
if !ok {
|
||||
return nil
|
||||
@ -121,5 +121,14 @@ func (r *TestRun) Cleanup(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.Cleanup(ctx, r.id)
|
||||
defer func() {
|
||||
e := recover()
|
||||
if e != nil {
|
||||
err = xerrors.Errorf("panic: %v", e)
|
||||
}
|
||||
}()
|
||||
|
||||
err = c.Cleanup(ctx, r.id)
|
||||
//nolint:revive // we use named returns because we mutate it in a defer
|
||||
return
|
||||
}
|
||||
|
@ -4,25 +4,42 @@ import (
|
||||
"context"
|
||||
cryptorand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// TestFn is a function that can be run by an ExecutionStrategy.
|
||||
type TestFn func(ctx context.Context) error
|
||||
|
||||
// ExecutionStrategy defines how a TestHarness should execute a set of runs. It
|
||||
// essentially defines the concurrency model for a given testing session.
|
||||
type ExecutionStrategy interface {
|
||||
// Execute calls each function in whatever way the strategy wants. All
|
||||
// errors returned from the function should be wrapped and returned, but all
|
||||
// given functions must be executed.
|
||||
Run(ctx context.Context, fns []TestFn) ([]error, error)
|
||||
}
|
||||
|
||||
// LinearExecutionStrategy executes all test runs in a linear fashion, one after
|
||||
// the other.
|
||||
type LinearExecutionStrategy struct{}
|
||||
|
||||
var _ ExecutionStrategy = LinearExecutionStrategy{}
|
||||
|
||||
// Execute implements ExecutionStrategy.
|
||||
func (LinearExecutionStrategy) Execute(ctx context.Context, runs []*TestRun) error {
|
||||
for _, run := range runs {
|
||||
_ = run.Run(ctx)
|
||||
// Run implements ExecutionStrategy.
|
||||
func (LinearExecutionStrategy) Run(ctx context.Context, fns []TestFn) ([]error, error) {
|
||||
var errs []error
|
||||
for i, fn := range fns {
|
||||
err := fn(ctx)
|
||||
if err != nil {
|
||||
errs = append(errs, xerrors.Errorf("run %d: %w", i, err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// ConcurrentExecutionStrategy executes all test runs concurrently without any
|
||||
@ -31,21 +48,27 @@ type ConcurrentExecutionStrategy struct{}
|
||||
|
||||
var _ ExecutionStrategy = ConcurrentExecutionStrategy{}
|
||||
|
||||
// Execute implements ExecutionStrategy.
|
||||
func (ConcurrentExecutionStrategy) Execute(ctx context.Context, runs []*TestRun) error {
|
||||
var wg sync.WaitGroup
|
||||
for _, run := range runs {
|
||||
run := run
|
||||
// Run implements ExecutionStrategy.
|
||||
func (ConcurrentExecutionStrategy) Run(ctx context.Context, fns []TestFn) ([]error, error) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
errs = newErrorsList()
|
||||
)
|
||||
for i, fn := range fns {
|
||||
i, fn := i, fn
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = run.Run(ctx)
|
||||
err := fn(ctx)
|
||||
if err != nil {
|
||||
errs.add(xerrors.Errorf("run %d: %w", i, err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
return errs.errs, nil
|
||||
}
|
||||
|
||||
// ParallelExecutionStrategy executes all test runs concurrently, but limits the
|
||||
@ -56,14 +79,17 @@ type ParallelExecutionStrategy struct {
|
||||
|
||||
var _ ExecutionStrategy = ParallelExecutionStrategy{}
|
||||
|
||||
// Execute implements ExecutionStrategy.
|
||||
func (p ParallelExecutionStrategy) Execute(ctx context.Context, runs []*TestRun) error {
|
||||
var wg sync.WaitGroup
|
||||
sem := make(chan struct{}, p.Limit)
|
||||
// Run implements ExecutionStrategy.
|
||||
func (p ParallelExecutionStrategy) Run(ctx context.Context, fns []TestFn) ([]error, error) {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
errs = newErrorsList()
|
||||
sem = make(chan struct{}, p.Limit)
|
||||
)
|
||||
defer close(sem)
|
||||
|
||||
for _, run := range runs {
|
||||
run := run
|
||||
for i, fn := range fns {
|
||||
i, fn := i, fn
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@ -72,12 +98,15 @@ func (p ParallelExecutionStrategy) Execute(ctx context.Context, runs []*TestRun)
|
||||
wg.Done()
|
||||
}()
|
||||
sem <- struct{}{}
|
||||
_ = run.Run(ctx)
|
||||
err := fn(ctx)
|
||||
if err != nil {
|
||||
errs.add(xerrors.Errorf("run %d: %w", i, err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
return errs.errs, nil
|
||||
}
|
||||
|
||||
// TimeoutExecutionStrategyWrapper is an ExecutionStrategy that wraps another
|
||||
@ -89,41 +118,19 @@ type TimeoutExecutionStrategyWrapper struct {
|
||||
|
||||
var _ ExecutionStrategy = TimeoutExecutionStrategyWrapper{}
|
||||
|
||||
type timeoutRunnerWrapper struct {
|
||||
timeout time.Duration
|
||||
inner Runnable
|
||||
}
|
||||
|
||||
var _ Runnable = timeoutRunnerWrapper{}
|
||||
var _ Cleanable = timeoutRunnerWrapper{}
|
||||
|
||||
func (t timeoutRunnerWrapper) Run(ctx context.Context, id string, logs io.Writer) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, t.timeout)
|
||||
// Run implements ExecutionStrategy.
|
||||
func (t TimeoutExecutionStrategyWrapper) Run(ctx context.Context, fns []TestFn) ([]error, error) {
|
||||
newFns := make([]TestFn, len(fns))
|
||||
for i, fn := range fns {
|
||||
fn := fn
|
||||
newFns[i] = func(ctx context.Context) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, t.Timeout)
|
||||
defer cancel()
|
||||
|
||||
return t.inner.Run(ctx, id, logs)
|
||||
}
|
||||
|
||||
func (t timeoutRunnerWrapper) Cleanup(ctx context.Context, id string) error {
|
||||
c, ok := t.inner.(Cleanable)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.Cleanup(ctx, id)
|
||||
}
|
||||
|
||||
// Execute implements ExecutionStrategy.
|
||||
func (t TimeoutExecutionStrategyWrapper) Execute(ctx context.Context, runs []*TestRun) error {
|
||||
for _, run := range runs {
|
||||
oldRunner := run.runner
|
||||
run.runner = timeoutRunnerWrapper{
|
||||
timeout: t.Timeout,
|
||||
inner: oldRunner,
|
||||
return fn(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
return t.Inner.Execute(ctx, runs)
|
||||
return t.Inner.Run(ctx, newFns)
|
||||
}
|
||||
|
||||
// ShuffleExecutionStrategyWrapper is an ExecutionStrategy that wraps another
|
||||
@ -151,17 +158,35 @@ func (cryptoRandSource) Int63() int64 {
|
||||
|
||||
func (cryptoRandSource) Seed(_ int64) {}
|
||||
|
||||
// Execute implements ExecutionStrategy.
|
||||
func (s ShuffleExecutionStrategyWrapper) Execute(ctx context.Context, runs []*TestRun) error {
|
||||
shuffledRuns := make([]*TestRun, len(runs))
|
||||
copy(shuffledRuns, runs)
|
||||
// Run implements ExecutionStrategy.
|
||||
func (s ShuffleExecutionStrategyWrapper) Run(ctx context.Context, fns []TestFn) ([]error, error) {
|
||||
shuffledFns := make([]TestFn, len(fns))
|
||||
copy(shuffledFns, fns)
|
||||
|
||||
//nolint:gosec // gosec thinks we're using an insecure RNG, but we're not.
|
||||
src := rand.New(cryptoRandSource{})
|
||||
for i := range shuffledRuns {
|
||||
for i := range shuffledFns {
|
||||
j := src.Intn(i + 1)
|
||||
shuffledRuns[i], shuffledRuns[j] = shuffledRuns[j], shuffledRuns[i]
|
||||
shuffledFns[i], shuffledFns[j] = shuffledFns[j], shuffledFns[i]
|
||||
}
|
||||
|
||||
return s.Inner.Execute(ctx, shuffledRuns)
|
||||
return s.Inner.Run(ctx, shuffledFns)
|
||||
}
|
||||
|
||||
type errorsList struct {
|
||||
mut *sync.Mutex
|
||||
errs []error
|
||||
}
|
||||
|
||||
func newErrorsList() *errorsList {
|
||||
return &errorsList{
|
||||
mut: &sync.Mutex{},
|
||||
errs: []error{},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *errorsList) add(err error) {
|
||||
l.mut.Lock()
|
||||
defer l.mut.Unlock()
|
||||
l.errs = append(l.errs, err)
|
||||
}
|
||||
|
@ -22,16 +22,22 @@ func Test_LinearExecutionStrategy(t *testing.T) {
|
||||
lastSeenI int64 = -1
|
||||
count int64
|
||||
)
|
||||
runs := strategyTestData(100, func(_ context.Context, i int, _ io.Writer) error {
|
||||
runs, fns := strategyTestData(100, func(_ context.Context, i int, _ io.Writer) error {
|
||||
atomic.AddInt64(&count, 1)
|
||||
swapped := atomic.CompareAndSwapInt64(&lastSeenI, int64(i-1), int64(i))
|
||||
assert.True(t, swapped)
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
|
||||
if i%2 == 0 {
|
||||
return xerrors.New("error")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
strategy := harness.LinearExecutionStrategy{}
|
||||
err := strategy.Execute(context.Background(), runs)
|
||||
runErrs, err := strategy.Run(context.Background(), fns)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, runErrs, 50)
|
||||
require.EqualValues(t, 100, atomic.LoadInt64(&count))
|
||||
|
||||
lastStartTime := time.Time{}
|
||||
@ -44,15 +50,19 @@ func Test_LinearExecutionStrategy(t *testing.T) {
|
||||
|
||||
//nolint:paralleltest // this tests uses timings to determine if it's working
|
||||
func Test_ConcurrentExecutionStrategy(t *testing.T) {
|
||||
runs := strategyTestData(10, func(_ context.Context, i int, _ io.Writer) error {
|
||||
runs, fns := strategyTestData(10, func(_ context.Context, i int, _ io.Writer) error {
|
||||
time.Sleep(1 * time.Second)
|
||||
if i%2 == 0 {
|
||||
return xerrors.New("error")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
strategy := harness.ConcurrentExecutionStrategy{}
|
||||
|
||||
startTime := time.Now()
|
||||
err := strategy.Execute(context.Background(), runs)
|
||||
runErrs, err := strategy.Run(context.Background(), fns)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, runErrs, 5)
|
||||
|
||||
// Should've taken at least 900ms to run but less than 5 seconds.
|
||||
require.True(t, time.Since(startTime) > 900*time.Millisecond)
|
||||
@ -68,8 +78,11 @@ func Test_ConcurrentExecutionStrategy(t *testing.T) {
|
||||
|
||||
//nolint:paralleltest // this tests uses timings to determine if it's working
|
||||
func Test_ParallelExecutionStrategy(t *testing.T) {
|
||||
runs := strategyTestData(10, func(_ context.Context, _ int, _ io.Writer) error {
|
||||
runs, fns := strategyTestData(10, func(_ context.Context, i int, _ io.Writer) error {
|
||||
time.Sleep(1 * time.Second)
|
||||
if i%2 == 0 {
|
||||
return xerrors.New("error")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
strategy := harness.ParallelExecutionStrategy{
|
||||
@ -78,8 +91,9 @@ func Test_ParallelExecutionStrategy(t *testing.T) {
|
||||
|
||||
startTime := time.Now()
|
||||
time.Sleep(time.Millisecond)
|
||||
err := strategy.Execute(context.Background(), runs)
|
||||
runErrs, err := strategy.Run(context.Background(), fns)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, runErrs, 5)
|
||||
|
||||
// Should've taken at least 1900ms to run but less than 8 seconds.
|
||||
require.True(t, time.Since(startTime) > 1900*time.Millisecond)
|
||||
@ -112,7 +126,7 @@ func Test_ParallelExecutionStrategy(t *testing.T) {
|
||||
|
||||
//nolint:paralleltest // this tests uses timings to determine if it's working
|
||||
func Test_TimeoutExecutionStrategy(t *testing.T) {
|
||||
runs := strategyTestData(1, func(ctx context.Context, _ int, _ io.Writer) error {
|
||||
runs, fns := strategyTestData(1, func(ctx context.Context, _ int, _ io.Writer) error {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
@ -128,8 +142,9 @@ func Test_TimeoutExecutionStrategy(t *testing.T) {
|
||||
Inner: harness.LinearExecutionStrategy{},
|
||||
}
|
||||
|
||||
err := strategy.Execute(context.Background(), runs)
|
||||
runErrs, err := strategy.Run(context.Background(), fns)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, runErrs, 0)
|
||||
|
||||
for _, run := range runs {
|
||||
require.NoError(t, run.Result().Error)
|
||||
@ -138,7 +153,7 @@ func Test_TimeoutExecutionStrategy(t *testing.T) {
|
||||
|
||||
//nolint:paralleltest // this tests uses timings to determine if it's working
|
||||
func Test_ShuffleExecutionStrategyWrapper(t *testing.T) {
|
||||
runs := strategyTestData(100000, func(_ context.Context, i int, _ io.Writer) error {
|
||||
runs, fns := strategyTestData(100000, func(_ context.Context, i int, _ io.Writer) error {
|
||||
// t.Logf("run %d", i)
|
||||
return nil
|
||||
})
|
||||
@ -146,8 +161,9 @@ func Test_ShuffleExecutionStrategyWrapper(t *testing.T) {
|
||||
Inner: harness.LinearExecutionStrategy{},
|
||||
}
|
||||
|
||||
err := strategy.Execute(context.Background(), runs)
|
||||
runErrs, err := strategy.Run(context.Background(), fns)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, runErrs, 0)
|
||||
|
||||
// Ensure not in order by sorting the start time of each run.
|
||||
unsortedTimes := make([]time.Time, len(runs))
|
||||
@ -164,12 +180,15 @@ func Test_ShuffleExecutionStrategyWrapper(t *testing.T) {
|
||||
require.NotEqual(t, unsortedTimes, sortedTimes)
|
||||
}
|
||||
|
||||
func strategyTestData(count int, runFn func(ctx context.Context, i int, logs io.Writer) error) []*harness.TestRun {
|
||||
out := make([]*harness.TestRun, count)
|
||||
func strategyTestData(count int, runFn func(ctx context.Context, i int, logs io.Writer) error) ([]*harness.TestRun, []harness.TestFn) {
|
||||
var (
|
||||
runs = make([]*harness.TestRun, count)
|
||||
fns = make([]harness.TestFn, count)
|
||||
)
|
||||
for i := 0; i < count; i++ {
|
||||
i := i
|
||||
|
||||
out[i] = harness.NewTestRun("test", strconv.Itoa(i), testFns{
|
||||
runs[i] = harness.NewTestRun("test", strconv.Itoa(i), testFns{
|
||||
RunFn: func(ctx context.Context, id string, logs io.Writer) error {
|
||||
if runFn != nil {
|
||||
return runFn(ctx, i, logs)
|
||||
@ -177,7 +196,8 @@ func strategyTestData(count int, runFn func(ctx context.Context, i int, logs io.
|
||||
return nil
|
||||
},
|
||||
})
|
||||
fns[i] = runs[i].Run
|
||||
}
|
||||
|
||||
return out
|
||||
return runs, fns
|
||||
}
|
||||
|
Reference in New Issue
Block a user