Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package cyclemanager | |
import ( | |
"context" | |
"fmt" | |
"runtime" | |
"sync" | |
) | |
var _NUMCPU = runtime.NumCPU() | |
type ( | |
// indicates whether cyclemanager's stop was requested to allow safely | |
// abort execution of CycleCallback and stop cyclemanager earlier | |
ShouldAbortCallback func() bool | |
// return value indicates whether actual work was done in the cycle | |
CycleCallback func(shouldAbort ShouldAbortCallback) bool | |
) | |
type CycleManager interface { | |
Start() | |
Stop(ctx context.Context) chan bool | |
StopAndWait(ctx context.Context) error | |
Running() bool | |
} | |
type cycleManager struct { | |
sync.RWMutex | |
cycleCallback CycleCallback | |
cycleTicker CycleTicker | |
running bool | |
stopSignal chan struct{} | |
stopContexts []context.Context | |
stopResults []chan bool | |
} | |
func NewManager(cycleTicker CycleTicker, cycleCallback CycleCallback) CycleManager { | |
return &cycleManager{ | |
cycleCallback: cycleCallback, | |
cycleTicker: cycleTicker, | |
running: false, | |
stopSignal: make(chan struct{}, 1), | |
} | |
} | |
// Starts instance, does not block | |
// Does nothing if instance is already started | |
func (c *cycleManager) Start() { | |
c.Lock() | |
defer c.Unlock() | |
if c.running { | |
return | |
} | |
go func() { | |
c.cycleTicker.Start() | |
defer c.cycleTicker.Stop() | |
for { | |
if c.isStopRequested() { | |
c.Lock() | |
if c.shouldStop() { | |
c.handleStopRequest(true) | |
c.Unlock() | |
break | |
} | |
c.handleStopRequest(false) | |
c.Unlock() | |
continue | |
} | |
c.cycleTicker.CycleExecuted(c.cycleCallback(c.shouldAbortCycleCallback)) | |
} | |
}() | |
c.running = true | |
} | |
// Stops running instance, does not block | |
// Returns channel with final stop result - true / false | |
// | |
// If given context is cancelled before it is handled by stop logic, instance is not stopped | |
// If called multiple times, all contexts have to be cancelled to cancel stop | |
// (any valid will result in stopping instance) | |
// stopResult is the same (consistent) for multiple calls | |
func (c *cycleManager) Stop(ctx context.Context) (stopResult chan bool) { | |
c.Lock() | |
defer c.Unlock() | |
stopResult = make(chan bool, 1) | |
if !c.running { | |
stopResult <- true | |
close(stopResult) | |
return stopResult | |
} | |
if len(c.stopContexts) == 0 { | |
defer func() { | |
c.stopSignal <- struct{}{} | |
}() | |
} | |
c.stopContexts = append(c.stopContexts, ctx) | |
c.stopResults = append(c.stopResults, stopResult) | |
return stopResult | |
} | |
// Stops running instance, waits for stop to occur or context to expire (which comes first) | |
// Returns error if instance was not stopped | |
func (c *cycleManager) StopAndWait(ctx context.Context) error { | |
// if both channels are ready, chan is selected randomly, therefore regardless of | |
// channel selected first, second one is also checked | |
stop := c.Stop(ctx) | |
done := ctx.Done() | |
select { | |
case <-done: | |
select { | |
case stopped := <-stop: | |
if !stopped { | |
return ctx.Err() | |
} | |
default: | |
return ctx.Err() | |
} | |
case stopped := <-stop: | |
if !stopped { | |
if ctx.Err() != nil { | |
return ctx.Err() | |
} | |
return fmt.Errorf("failed to stop cycle") | |
} | |
} | |
return nil | |
} | |
func (c *cycleManager) Running() bool { | |
c.RLock() | |
defer c.RUnlock() | |
return c.running | |
} | |
func (c *cycleManager) shouldStop() bool { | |
for _, ctx := range c.stopContexts { | |
if ctx.Err() == nil { | |
return true | |
} | |
} | |
return false | |
} | |
func (c *cycleManager) shouldAbortCycleCallback() bool { | |
c.RLock() | |
defer c.RUnlock() | |
return c.shouldStop() | |
} | |
func (c *cycleManager) isStopRequested() bool { | |
select { | |
case <-c.stopSignal: | |
case <-c.cycleTicker.C(): | |
// as stop chan has higher priority, | |
// it is checked again in case of ticker was selected over stop if both were ready | |
select { | |
case <-c.stopSignal: | |
default: | |
return false | |
} | |
} | |
return true | |
} | |
func (c *cycleManager) handleStopRequest(stopped bool) { | |
for _, stopResult := range c.stopResults { | |
stopResult <- stopped | |
close(stopResult) | |
} | |
c.running = !stopped | |
c.stopContexts = nil | |
c.stopResults = nil | |
} | |
func NewManagerNoop() CycleManager { | |
return &cycleManagerNoop{running: false} | |
} | |
type cycleManagerNoop struct { | |
running bool | |
} | |
func (c *cycleManagerNoop) Start() { | |
c.running = true | |
} | |
func (c *cycleManagerNoop) Stop(ctx context.Context) chan bool { | |
if !c.running { | |
return c.closedChan(true) | |
} | |
if ctx.Err() != nil { | |
return c.closedChan(false) | |
} | |
c.running = false | |
return c.closedChan(true) | |
} | |
func (c *cycleManagerNoop) StopAndWait(ctx context.Context) error { | |
if <-c.Stop(ctx) { | |
return nil | |
} | |
return ctx.Err() | |
} | |
func (c *cycleManagerNoop) Running() bool { | |
return c.running | |
} | |
func (c *cycleManagerNoop) closedChan(val bool) chan bool { | |
ch := make(chan bool, 1) | |
ch <- val | |
close(ch) | |
return ch | |
} | |