Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package cyclemanager | |
| import ( | |
| "context" | |
| "sync" | |
| "testing" | |
| "time" | |
| "github.com/stretchr/testify/assert" | |
| ) | |
| type cycleCallbackProvider struct { | |
| sync.Mutex | |
| firstCycleStarted chan struct{} | |
| cycleCallback CycleCallback | |
| results chan string | |
| } | |
| func newProvider(cycleDuration time.Duration, resultsSize uint) *cycleCallbackProvider { | |
| return newProviderAbortable(cycleDuration, resultsSize, 1) | |
| } | |
| func newProviderAbortable(cycleDuration time.Duration, resultsSize uint, aborts int) *cycleCallbackProvider { | |
| fs := false | |
| p := &cycleCallbackProvider{} | |
| p.results = make(chan string, resultsSize) | |
| p.firstCycleStarted = make(chan struct{}, 1) | |
| p.cycleCallback = func(shouldAbort ShouldAbortCallback) bool { | |
| p.Lock() | |
| if !fs { | |
| p.firstCycleStarted <- struct{}{} | |
| fs = true | |
| } | |
| p.Unlock() | |
| if aborts > 1 { | |
| for i := 0; i < aborts; i++ { | |
| time.Sleep(cycleDuration / time.Duration(aborts)) | |
| if shouldAbort() { | |
| return true | |
| } | |
| } | |
| } else { | |
| time.Sleep(cycleDuration) | |
| } | |
| p.results <- "something wonderful..." | |
| return true | |
| } | |
| return p | |
| } | |
| func TestCycleManager_beforeTimeout(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 1 * time.Millisecond | |
| stopTimeout := 12 * time.Millisecond | |
| p := newProvider(cycleDuration, 1) | |
| var cm CycleManager | |
| t.Run("create new", func(t *testing.T) { | |
| cm = NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| assert.False(t, cm.Running()) | |
| }) | |
| t.Run("start", func(t *testing.T) { | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| assert.True(t, cm.Running()) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| timeoutCtx, cancel := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel() | |
| stopResult := cm.Stop(timeoutCtx) | |
| select { | |
| case <-timeoutCtx.Done(): | |
| t.Fatal(timeoutCtx.Err().Error(), "failed to stop") | |
| case stopped := <-stopResult: | |
| assert.True(t, stopped) | |
| assert.False(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| } | |
| }) | |
| } | |
| func TestCycleManager_beforeTimeoutWithWait(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 1 * time.Millisecond | |
| stopTimeout := 12 * time.Millisecond | |
| p := newProvider(cycleDuration, 1) | |
| var cm CycleManager | |
| t.Run("create new", func(t *testing.T) { | |
| cm = NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| assert.False(t, cm.Running()) | |
| }) | |
| t.Run("start", func(t *testing.T) { | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| assert.True(t, cm.Running()) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| timeoutCtx, cancel := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel() | |
| err := cm.StopAndWait(timeoutCtx) | |
| assert.Nil(t, err) | |
| assert.False(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| } | |
| func TestCycleManager_timeout(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 20 * time.Millisecond | |
| stopTimeout := 12 * time.Millisecond | |
| p := newProvider(cycleDuration, 1) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("timeout is reached", func(t *testing.T) { | |
| timeoutCtx, cancel := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel() | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| stopResult := cm.Stop(timeoutCtx) | |
| select { | |
| case <-timeoutCtx.Done(): | |
| assert.True(t, cm.Running()) | |
| case <-stopResult: | |
| t.Fatal("stopped before timeout") | |
| } | |
| // make sure it is still running | |
| assert.False(t, <-stopResult) | |
| assert.True(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| stopResult := cm.Stop(context.Background()) | |
| assert.True(t, <-stopResult) | |
| assert.False(t, cm.Running()) | |
| }) | |
| } | |
| func TestCycleManager_timeoutWithWait(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 20 * time.Millisecond | |
| stopTimeout := 12 * time.Millisecond | |
| p := newProvider(cycleDuration, 1) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("timeout is reached", func(t *testing.T) { | |
| timeoutCtx, cancel := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel() | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| err := cm.StopAndWait(timeoutCtx) | |
| assert.NotNil(t, err) | |
| assert.Equal(t, "context deadline exceeded", err.Error()) | |
| assert.True(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| stopResult := cm.Stop(context.Background()) | |
| assert.True(t, <-stopResult) | |
| assert.False(t, cm.Running()) | |
| }) | |
| } | |
| func TestCycleManager_doesNotStartMultipleTimes(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 1 * time.Millisecond | |
| startCount := 5 | |
| p := newProvider(cycleDuration, uint(startCount)) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("multiple starts", func(t *testing.T) { | |
| for i := 0; i < startCount; i++ { | |
| cm.Start() | |
| } | |
| <-p.firstCycleStarted | |
| stopResult := cm.Stop(context.Background()) | |
| assert.True(t, <-stopResult) | |
| assert.False(t, cm.Running()) | |
| // just one result produced | |
| assert.Equal(t, 1, len(p.results)) | |
| }) | |
| } | |
| func TestCycleManager_doesNotStartMultipleTimesWithWait(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 1 * time.Millisecond | |
| startCount := 5 | |
| p := newProvider(cycleDuration, uint(startCount)) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("multiple starts", func(t *testing.T) { | |
| for i := 0; i < startCount; i++ { | |
| cm.Start() | |
| } | |
| <-p.firstCycleStarted | |
| err := cm.StopAndWait(context.Background()) | |
| assert.Nil(t, err) | |
| assert.False(t, cm.Running()) | |
| // just one result produced | |
| assert.Equal(t, 1, len(p.results)) | |
| }) | |
| } | |
| func TestCycleManager_handlesMultipleStops(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 1 * time.Millisecond | |
| stopCount := 5 | |
| p := newProvider(cycleDuration, 1) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("multiple stops", func(t *testing.T) { | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| stopResult := make([]chan bool, stopCount) | |
| for i := 0; i < stopCount; i++ { | |
| stopResult[i] = cm.Stop(context.Background()) | |
| } | |
| for i := 0; i < stopCount; i++ { | |
| assert.True(t, <-stopResult[i]) | |
| } | |
| assert.False(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| } | |
| func TestCycleManager_stopsIfNotAllContextsAreCancelled(t *testing.T) { | |
| cycleInterval := 5 * time.Millisecond | |
| cycleDuration := 1 * time.Millisecond | |
| stopTimeout := 5 * time.Millisecond | |
| p := newProvider(cycleDuration, 1) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("multiple stops, few cancelled", func(t *testing.T) { | |
| timeout1Ctx, cancel1 := context.WithTimeout(context.Background(), stopTimeout) | |
| timeout2Ctx, cancel2 := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel1() | |
| defer cancel2() | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| stopResult1 := cm.Stop(timeout1Ctx) | |
| stopResult2 := cm.Stop(timeout2Ctx) | |
| stopResult3 := cm.Stop(context.Background()) | |
| // all produce the same result: cycle was stopped | |
| assert.True(t, <-stopResult1) | |
| assert.True(t, <-stopResult2) | |
| assert.True(t, <-stopResult3) | |
| assert.False(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| } | |
| func TestCycleManager_doesNotStopIfAllContextsAreCancelled(t *testing.T) { | |
| cycleInterval := 50 * time.Millisecond | |
| cycleDuration := 10 * time.Millisecond | |
| stopTimeout := 50 * time.Millisecond | |
| p := newProvider(cycleDuration, 1) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("multiple stops, few cancelled", func(t *testing.T) { | |
| timeout1Ctx, cancel1 := context.WithTimeout(context.Background(), stopTimeout) | |
| timeout2Ctx, cancel2 := context.WithTimeout(context.Background(), stopTimeout) | |
| timeout3Ctx, cancel3 := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel1() | |
| defer cancel2() | |
| defer cancel3() | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| stopResult1 := cm.Stop(timeout1Ctx) | |
| stopResult2 := cm.Stop(timeout2Ctx) | |
| stopResult3 := cm.Stop(timeout3Ctx) | |
| // all produce the same result: cycle was stopped | |
| assert.False(t, <-stopResult1) | |
| assert.False(t, <-stopResult2) | |
| assert.False(t, <-stopResult3) | |
| assert.True(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| stopResult := cm.Stop(context.Background()) | |
| assert.True(t, <-stopResult) | |
| assert.False(t, cm.Running()) | |
| }) | |
| } | |
| func TestCycleManager_cycleCallbackStoppedDueToFrequentStopChecks(t *testing.T) { | |
| cycleInterval := 50 * time.Millisecond | |
| cycleDuration := 300 * time.Millisecond | |
| stopTimeout := 100 * time.Millisecond | |
| // despite cycleDuration is 30ms, cycle callback checks every 20ms (300/15) if it needs to be stopped | |
| p := newProviderAbortable(cycleDuration, 1, 15) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("cycle function stopped before timeout reached", func(t *testing.T) { | |
| timeoutCtx, cancel := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel() | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| err := cm.StopAndWait(timeoutCtx) | |
| assert.Nil(t, err) | |
| assert.False(t, cm.Running()) | |
| assert.Equal(t, 0, len(p.results)) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| stopResult := cm.Stop(context.Background()) | |
| assert.True(t, <-stopResult) | |
| assert.False(t, cm.Running()) | |
| }) | |
| } | |
| func TestCycleManager_cycleCallbackNotStoppedDueToRareStopChecks(t *testing.T) { | |
| cycleInterval := 50 * time.Millisecond | |
| cycleDuration := 300 * time.Millisecond | |
| stopTimeout := 100 * time.Millisecond | |
| // despite cycleDuration is 30ms, cycle callback checks every 150ms (300/2) if it needs to be stopped | |
| p := newProviderAbortable(cycleDuration, 1, 2) | |
| cm := NewManager(NewFixedTicker(cycleInterval), p.cycleCallback) | |
| t.Run("timeout reached", func(t *testing.T) { | |
| timeoutCtx, cancel := context.WithTimeout(context.Background(), stopTimeout) | |
| defer cancel() | |
| cm.Start() | |
| <-p.firstCycleStarted | |
| err := cm.StopAndWait(timeoutCtx) | |
| assert.NotNil(t, err) | |
| assert.Equal(t, "context deadline exceeded", err.Error()) | |
| assert.True(t, cm.Running()) | |
| assert.Equal(t, "something wonderful...", <-p.results) | |
| }) | |
| t.Run("stop", func(t *testing.T) { | |
| stopResult := cm.Stop(context.Background()) | |
| assert.True(t, <-stopResult) | |
| assert.False(t, cm.Running()) | |
| }) | |
| } | |