SemanticSearchPOC / entities /cyclemanager /cyclecallbackgroup.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
10.7 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cyclemanager
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
)
// Container for multiple callbacks exposing CycleCallback method acting as single callback.
// Can be provided to CycleManager.
type CycleCallbackGroup interface {
// Adds CycleCallback method to container
Register(id string, cycleCallback CycleCallback, options ...RegisterOption) CycleCallbackCtrl
// Method of CycleCallback acting as single callback for all callbacks added to the container
CycleCallback(shouldAbort ShouldAbortCallback) bool
}
type cycleCallbackGroup struct {
sync.Mutex
logger logrus.FieldLogger
customId string
routinesLimit int
nextId uint32
callbackIds []uint32
callbacks map[uint32]*cycleCallbackMeta
}
func NewCallbackGroup(id string, logger logrus.FieldLogger, routinesLimit int) CycleCallbackGroup {
return &cycleCallbackGroup{
logger: logger,
customId: id,
routinesLimit: routinesLimit,
nextId: 0,
callbackIds: []uint32{},
callbacks: map[uint32]*cycleCallbackMeta{},
}
}
func (c *cycleCallbackGroup) Register(id string, cycleCallback CycleCallback, options ...RegisterOption) CycleCallbackCtrl {
c.Lock()
defer c.Unlock()
meta := &cycleCallbackMeta{
customId: id,
cycleCallback: cycleCallback,
active: true,
runningCtx: nil,
started: time.Now(),
intervals: nil,
}
for _, option := range options {
if option != nil {
option(meta)
}
}
callbackId := c.nextId
c.callbackIds = append(c.callbackIds, callbackId)
c.callbacks[callbackId] = meta
c.nextId++
return &cycleCallbackCtrl{
callbackId: callbackId,
callbackCustomId: id,
isActive: c.isActive,
activate: c.activate,
deactivate: c.deactivate,
unregister: c.unregister,
}
}
func (c *cycleCallbackGroup) CycleCallback(shouldAbort ShouldAbortCallback) bool {
if c.routinesLimit <= 1 {
return c.cycleCallbackSequential(shouldAbort)
}
return c.cycleCallbackParallel(shouldAbort, c.routinesLimit)
}
func (c *cycleCallbackGroup) cycleCallbackSequential(shouldAbort ShouldAbortCallback) bool {
anyExecuted := false
i := 0
for {
if shouldAbort() {
break
}
c.Lock()
// no more callbacks left, exit the loop
if i >= len(c.callbackIds) {
c.Unlock()
break
}
callbackId := c.callbackIds[i]
meta, ok := c.callbacks[callbackId]
// callback deleted in the meantime, remove its id
// and proceed to the next one (no "i" increment required)
if !ok {
c.callbackIds = append(c.callbackIds[:i], c.callbackIds[i+1:]...)
c.Unlock()
continue
}
i++
// callback deactivated, proceed to the next one
if !meta.active {
c.Unlock()
continue
}
now := time.Now()
// not enough time passed since previous execution
if meta.intervals != nil && now.Sub(meta.started) < meta.intervals.Get() {
c.Unlock()
continue
}
// callback active, mark as running
runningCtx, cancel := context.WithCancel(context.Background())
meta.runningCtx = runningCtx
meta.started = now
c.Unlock()
func() {
// cancel called in recover, regardless of panic occurred or not
defer c.recover(meta.customId, cancel)
executed := meta.cycleCallback(func() bool {
if shouldAbort() {
return true
}
// abort if callback deactivated or being unregistered
c.Lock()
defer c.Unlock()
meta, ok := c.callbacks[callbackId]
if !ok || !meta.active || meta.unregisterRequested {
return true
}
return false
})
anyExecuted = executed || anyExecuted
if meta.intervals != nil {
if executed {
meta.intervals.Reset()
} else {
meta.intervals.Advance()
}
}
}()
}
return anyExecuted
}
func (c *cycleCallbackGroup) cycleCallbackParallel(shouldAbort ShouldAbortCallback, routinesLimit int) bool {
anyExecuted := false
ch := make(chan uint32)
lock := new(sync.Mutex)
wg := new(sync.WaitGroup)
wg.Add(routinesLimit)
i := 0
for r := 0; r < routinesLimit; r++ {
go func() {
for callbackId := range ch {
if shouldAbort() {
// keep reading from channel until it is closed
continue
}
c.Lock()
meta, ok := c.callbacks[callbackId]
// callback missing or deactivated, proceed to the next one
if !ok || !meta.active {
c.Unlock()
continue
}
now := time.Now()
// not enough time passed since previous execution
if meta.intervals != nil && now.Sub(meta.started) < meta.intervals.Get() {
c.Unlock()
continue
}
// callback active, mark as running
runningCtx, cancel := context.WithCancel(context.Background())
meta.runningCtx = runningCtx
meta.started = now
c.Unlock()
func() {
// cancel called in recover, regardless of panic occurred or not
defer c.recover(meta.customId, cancel)
executed := meta.cycleCallback(shouldAbort)
if executed {
lock.Lock()
anyExecuted = true
lock.Unlock()
}
if meta.intervals != nil {
if executed {
meta.intervals.Reset()
} else {
meta.intervals.Advance()
}
}
}()
}
wg.Done()
}()
}
for {
if shouldAbort() {
close(ch)
break
}
c.Lock()
// no more callbacks left, exit the loop
if i >= len(c.callbackIds) {
c.Unlock()
close(ch)
break
}
callbackId := c.callbackIds[i]
_, ok := c.callbacks[callbackId]
// callback deleted in the meantime, remove its id
// and proceed to the next one (no "i" increment required)
if !ok {
c.callbackIds = append(c.callbackIds[:i], c.callbackIds[i+1:]...)
c.Unlock()
continue
}
c.Unlock()
ch <- callbackId
i++
}
wg.Wait()
return anyExecuted
}
func (c *cycleCallbackGroup) recover(callbackCustomId string, cancel context.CancelFunc) {
if r := recover(); r != nil {
c.logger.WithFields(logrus.Fields{
"action": "cyclemanager",
"callback_id": callbackCustomId,
"callbacks_id": c.customId,
}).Errorf("callback panic: %v", r)
}
cancel()
}
func (c *cycleCallbackGroup) mutateCallback(ctx context.Context, callbackId uint32,
onNotFound func(callbackId uint32) error,
onFound func(callbackId uint32, meta *cycleCallbackMeta) error,
) error {
if ctx.Err() != nil {
return ctx.Err()
}
for {
// mutate callback in collection only if not running (not yet started of finished)
c.Lock()
meta, ok := c.callbacks[callbackId]
if !ok {
err := onNotFound(callbackId)
c.Unlock()
return err
}
meta.unregisterRequested = true
runningCtx := meta.runningCtx
if runningCtx == nil || runningCtx.Err() != nil {
err := onFound(callbackId, meta)
c.Unlock()
return err
}
c.Unlock()
// wait for callback to finish
select {
case <-runningCtx.Done():
// get back to the beginning of the loop to make sure state.runningCtx
// was not changed. If not, loop will finish on runningCtx.Err() != nil check
continue
case <-ctx.Done():
// in case both contexts are ready, but input ctx was selected
// check again running ctx as priority one
if runningCtx.Err() != nil {
// get back to the beginning of the loop to make sure state.runningCtx
// was not changed. If not, loop will finish on runningCtx.Err() != nil check
continue
}
// input ctx expired
return ctx.Err()
}
}
}
func (c *cycleCallbackGroup) unregister(ctx context.Context, callbackId uint32, callbackCustomId string) error {
err := c.mutateCallback(ctx, callbackId,
func(callbackId uint32) error {
return nil
},
func(callbackId uint32, meta *cycleCallbackMeta) error {
delete(c.callbacks, callbackId)
return nil
},
)
return errorUnregisterCallback(callbackCustomId, c.customId, err)
}
func (c *cycleCallbackGroup) deactivate(ctx context.Context, callbackId uint32, callbackCustomId string) error {
err := c.mutateCallback(ctx, callbackId,
func(callbackId uint32) error {
return ErrorCallbackNotFound
},
func(callbackId uint32, meta *cycleCallbackMeta) error {
meta.active = false
return nil
},
)
return errorDeactivateCallback(callbackCustomId, c.customId, err)
}
func (c *cycleCallbackGroup) activate(callbackId uint32, callbackCustomId string) error {
c.Lock()
defer c.Unlock()
meta, ok := c.callbacks[callbackId]
if !ok {
return errorActivateCallback(callbackCustomId, c.customId, ErrorCallbackNotFound)
}
meta.active = true
return nil
}
func (c *cycleCallbackGroup) isActive(callbackId uint32, callbackCustomId string) bool {
c.Lock()
defer c.Unlock()
if meta, ok := c.callbacks[callbackId]; ok {
return meta.active
}
return false
}
type cycleCallbackMeta struct {
customId string
cycleCallback CycleCallback
active bool
// indicates whether callback is already running - context active
// or not running (already finished) - context expired
// or not running (not yet started) - context nil
runningCtx context.Context
started time.Time
intervals CycleIntervals
// set to true if callback is being unregistered, but still running
unregisterRequested bool
}
type cycleCallbackGroupNoop struct{}
func NewCallbackGroupNoop() CycleCallbackGroup {
return &cycleCallbackGroupNoop{}
}
func (c *cycleCallbackGroupNoop) Register(id string, cycleCallback CycleCallback, options ...RegisterOption) CycleCallbackCtrl {
return NewCallbackCtrlNoop()
}
func (c *cycleCallbackGroupNoop) CycleCallback(shouldAbort ShouldAbortCallback) bool {
return false
}
type RegisterOption func(meta *cycleCallbackMeta)
func AsInactive() RegisterOption {
return func(meta *cycleCallbackMeta) {
meta.active = false
}
}
func WithIntervals(intervals CycleIntervals) RegisterOption {
if intervals == nil {
return nil
}
return func(meta *cycleCallbackMeta) {
meta.intervals = intervals
// adjusts start time to allow for immediate callback execution without
// having to wait for interval duration to pass
meta.started = time.Now().Add(-intervals.Get())
}
}