Spaces:
Configuration error
Configuration error
package p2p | |
import ( | |
"fmt" | |
"math/rand/v2" | |
"sync" | |
"github.com/rs/zerolog/log" | |
) | |
const FederatedID = "federated" | |
func NetworkID(networkID, serviceID string) string { | |
if networkID != "" { | |
return fmt.Sprintf("%s_%s", networkID, serviceID) | |
} | |
return serviceID | |
} | |
type FederatedServer struct { | |
sync.Mutex | |
listenAddr, service, p2ptoken string | |
requestTable map[string]int | |
loadBalanced bool | |
workerTarget string | |
} | |
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer { | |
return &FederatedServer{ | |
listenAddr: listenAddr, | |
service: service, | |
p2ptoken: p2pToken, | |
requestTable: map[string]int{}, | |
loadBalanced: loadBalanced, | |
workerTarget: workerTarget, | |
} | |
} | |
func (fs *FederatedServer) RandomServer() string { | |
var tunnelAddresses []string | |
for _, v := range GetAvailableNodes(fs.service) { | |
if v.IsOnline() { | |
tunnelAddresses = append(tunnelAddresses, v.ID) | |
} else { | |
delete(fs.requestTable, v.ID) // make sure it's not tracked | |
log.Info().Msgf("Node %s is offline", v.ID) | |
} | |
} | |
if len(tunnelAddresses) == 0 { | |
return "" | |
} | |
return tunnelAddresses[rand.IntN(len(tunnelAddresses))] | |
} | |
func (fs *FederatedServer) syncTableStatus() { | |
fs.Lock() | |
defer fs.Unlock() | |
currentTunnels := make(map[string]struct{}) | |
for _, v := range GetAvailableNodes(fs.service) { | |
if v.IsOnline() { | |
fs.ensureRecordExist(v.ID) | |
currentTunnels[v.ID] = struct{}{} | |
} | |
} | |
// delete tunnels that don't exist anymore | |
for t := range fs.requestTable { | |
if _, ok := currentTunnels[t]; !ok { | |
delete(fs.requestTable, t) | |
} | |
} | |
} | |
func (fs *FederatedServer) SelectLeastUsedServer() string { | |
fs.syncTableStatus() | |
fs.Lock() | |
defer fs.Unlock() | |
log.Debug().Any("request_table", fs.requestTable).Msgf("SelectLeastUsedServer()") | |
// cycle over requestTable and find the entry with the lower number | |
// if there are multiple entries with the same number, select one randomly | |
// if there are no entries, return an empty string | |
var min int | |
var minKey string | |
for k, v := range fs.requestTable { | |
if min == 0 || v < min { | |
min = v | |
minKey = k | |
} | |
} | |
log.Debug().Any("requests_served", min).Any("request_table", fs.requestTable).Msgf("Selected tunnel %s", minKey) | |
return minKey | |
} | |
func (fs *FederatedServer) RecordRequest(nodeID string) { | |
fs.Lock() | |
defer fs.Unlock() | |
// increment the counter for the nodeID in the requestTable | |
fs.requestTable[nodeID]++ | |
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Recording request") | |
} | |
func (fs *FederatedServer) ensureRecordExist(nodeID string) { | |
// if the nodeID is not in the requestTable, add it with a counter of 0 | |
_, ok := fs.requestTable[nodeID] | |
if !ok { | |
fs.requestTable[nodeID] = 0 | |
} | |
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Ensure record exists") | |
} | |