Spaces:
Configuration error
Configuration error
//go:build p2p | |
// +build p2p | |
package p2p | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"net" | |
"os" | |
"sync" | |
"time" | |
"github.com/ipfs/go-log" | |
"github.com/libp2p/go-libp2p/core/peer" | |
"github.com/mudler/LocalAI/pkg/utils" | |
"github.com/mudler/edgevpn/pkg/config" | |
"github.com/mudler/edgevpn/pkg/node" | |
"github.com/mudler/edgevpn/pkg/protocol" | |
"github.com/mudler/edgevpn/pkg/services" | |
"github.com/mudler/edgevpn/pkg/types" | |
eutils "github.com/mudler/edgevpn/pkg/utils" | |
"github.com/phayes/freeport" | |
zlog "github.com/rs/zerolog/log" | |
"github.com/mudler/edgevpn/pkg/logger" | |
) | |
func generateNewConnectionData(DHTInterval, OTPInterval int) *node.YAMLConnectionConfig { | |
maxMessSize := 20 << 20 // 20MB | |
keyLength := 43 | |
if DHTInterval == 0 { | |
DHTInterval = 360 | |
} | |
if OTPInterval == 0 { | |
OTPInterval = 9000 | |
} | |
return &node.YAMLConnectionConfig{ | |
MaxMessageSize: maxMessSize, | |
RoomName: eutils.RandStringRunes(keyLength), | |
Rendezvous: eutils.RandStringRunes(keyLength), | |
MDNS: eutils.RandStringRunes(keyLength), | |
OTP: node.OTP{ | |
DHT: node.OTPConfig{ | |
Key: eutils.RandStringRunes(keyLength), | |
Interval: DHTInterval, | |
Length: keyLength, | |
}, | |
Crypto: node.OTPConfig{ | |
Key: eutils.RandStringRunes(keyLength), | |
Interval: OTPInterval, | |
Length: keyLength, | |
}, | |
}, | |
} | |
} | |
func GenerateToken(DHTInterval, OTPInterval int) string { | |
// Generates a new config and exit | |
return generateNewConnectionData(DHTInterval, OTPInterval).Base64() | |
} | |
func IsP2PEnabled() bool { | |
return true | |
} | |
func nodeID(s string) string { | |
hostname, _ := os.Hostname() | |
return fmt.Sprintf("%s-%s", hostname, s) | |
} | |
func nodeAnnounce(ctx context.Context, node *node.Node) { | |
ledger, _ := node.Ledger() | |
// Announce ourselves so nodes accepts our connection | |
ledger.Announce( | |
ctx, | |
10*time.Second, | |
func() { | |
updatedMap := map[string]interface{}{} | |
updatedMap[node.Host().ID().String()] = &types.User{ | |
PeerID: node.Host().ID().String(), | |
Timestamp: time.Now().String(), | |
} | |
ledger.Add(protocol.UsersLedgerKey, updatedMap) | |
}, | |
) | |
} | |
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) { | |
ledger, _ := node.Ledger() | |
// Retrieve current ID for ip in the blockchain | |
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID) | |
service := &types.Service{} | |
existingValue.Unmarshal(service) | |
// If mismatch, update the blockchain | |
if !found { | |
zlog.Error().Msg("Service not found on blockchain") | |
conn.Close() | |
// ll.Debugf("service '%s' not found on blockchain", serviceID) | |
return | |
} | |
// Decode the Peer | |
d, err := peer.Decode(service.PeerID) | |
if err != nil { | |
zlog.Error().Msg("cannot decode peer") | |
conn.Close() | |
// ll.Debugf("could not decode peer '%s'", service.PeerID) | |
return | |
} | |
// Open a stream | |
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) | |
if err != nil { | |
zlog.Error().Err(err).Msg("cannot open stream peer") | |
conn.Close() | |
// ll.Debugf("could not open stream '%s'", err.Error()) | |
return | |
} | |
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String()) | |
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String()) | |
closer := make(chan struct{}, 2) | |
go copyStream(closer, stream, conn) | |
go copyStream(closer, conn, stream) | |
<-closer | |
stream.Close() | |
conn.Close() | |
} | |
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { | |
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) | |
// Open local port for listening | |
l, err := net.Listen("tcp", listenAddr) | |
if err != nil { | |
zlog.Error().Err(err).Msg("Error listening") | |
return err | |
} | |
go func() { | |
<-ctx.Done() | |
l.Close() | |
}() | |
nodeAnnounce(ctx, node) | |
defer l.Close() | |
for { | |
select { | |
case <-ctx.Done(): | |
return errors.New("context canceled") | |
default: | |
zlog.Debug().Msg("New for connection") | |
// Listen for an incoming connection. | |
conn, err := l.Accept() | |
if err != nil { | |
fmt.Println("Error accepting: ", err.Error()) | |
continue | |
} | |
// Handle connections in a new goroutine, forwarding to the p2p service | |
go func() { | |
proxyP2PConnection(ctx, node, service, conn) | |
}() | |
} | |
} | |
} | |
// This is the main of the server (which keeps the env variable updated) | |
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services | |
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error { | |
if servicesID == "" { | |
servicesID = defaultServicesID | |
} | |
tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate) | |
if err != nil { | |
return err | |
} | |
// TODO: discoveryTunnels should return all the nodes that are available? | |
// In this way we updated availableNodes here instead of appending | |
// e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels | |
// each time the node is seen | |
// In this case the below function should be idempotent and just keep track of the nodes | |
go func() { | |
for { | |
select { | |
case <-ctx.Done(): | |
zlog.Error().Msg("Discoverer stopped") | |
return | |
case tunnel := <-tunnels: | |
AddNode(servicesID, tunnel) | |
if discoveryFunc != nil { | |
discoveryFunc(servicesID, tunnel) | |
} | |
} | |
} | |
}() | |
return nil | |
} | |
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) { | |
tunnels := make(chan NodeData) | |
ledger, err := n.Ledger() | |
if err != nil { | |
return nil, fmt.Errorf("getting the ledger: %w", err) | |
} | |
// get new services, allocate and return to the channel | |
// TODO: | |
// a function ensureServices that: | |
// - starts a service if not started, if the worker is Online | |
// - checks that workers are Online, if not cancel the context of allocateLocalService | |
// - discoveryTunnels should return all the nodes and addresses associated with it | |
// - the caller should take now care of the fact that we are always returning fresh informations | |
go func() { | |
for { | |
select { | |
case <-ctx.Done(): | |
zlog.Error().Msg("Discoverer stopped") | |
return | |
default: | |
time.Sleep(5 * time.Second) | |
data := ledger.LastBlock().Storage[servicesID] | |
zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data") | |
for k, v := range data { | |
zlog.Debug().Msgf("New worker found in the ledger data '%s'", k) | |
nd := &NodeData{} | |
if err := v.Unmarshal(nd); err != nil { | |
zlog.Error().Msg("cannot unmarshal node data") | |
continue | |
} | |
ensureService(ctx, n, nd, k, allocate) | |
muservice.Lock() | |
if _, ok := service[nd.Name]; ok { | |
tunnels <- service[nd.Name].NodeData | |
} | |
muservice.Unlock() | |
} | |
} | |
} | |
}() | |
return tunnels, err | |
} | |
type nodeServiceData struct { | |
NodeData NodeData | |
CancelFunc context.CancelFunc | |
} | |
var service = map[string]nodeServiceData{} | |
var muservice sync.Mutex | |
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) { | |
muservice.Lock() | |
defer muservice.Unlock() | |
nd.ServiceID = sserv | |
if ndService, found := service[nd.Name]; !found { | |
if !nd.IsOnline() { | |
// if node is offline and not present, do nothing | |
zlog.Debug().Msgf("Node %s is offline", nd.ID) | |
return | |
} | |
newCtxm, cancel := context.WithCancel(ctx) | |
if allocate { | |
// Start the service | |
port, err := freeport.GetFreePort() | |
if err != nil { | |
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID) | |
return | |
} | |
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) | |
nd.TunnelAddress = tunnelAddress | |
go allocateLocalService(newCtxm, n, tunnelAddress, sserv) | |
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress) | |
} | |
service[nd.Name] = nodeServiceData{ | |
NodeData: *nd, | |
CancelFunc: cancel, | |
} | |
} else { | |
// Check if the service is still alive | |
// if not cancel the context | |
if !nd.IsOnline() && !ndService.NodeData.IsOnline() { | |
ndService.CancelFunc() | |
delete(service, nd.Name) | |
zlog.Info().Msgf("Node %s is offline, deleting", nd.ID) | |
} else if nd.IsOnline() { | |
// update last seen inside service | |
nd.TunnelAddress = ndService.NodeData.TunnelAddress | |
service[nd.Name] = nodeServiceData{ | |
NodeData: *nd, | |
CancelFunc: ndService.CancelFunc, | |
} | |
zlog.Debug().Msgf("Node %s is still online", nd.ID) | |
} | |
} | |
} | |
// This is the P2P worker main | |
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) { | |
if servicesID == "" { | |
servicesID = defaultServicesID | |
} | |
llger := logger.New(log.LevelFatal) | |
nodeOpts, err := newNodeOpts(token) | |
if err != nil { | |
return nil, err | |
} | |
// generate a random string for the name | |
name := utils.RandString(10) | |
// Register the service | |
nodeOpts = append(nodeOpts, | |
services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...) | |
n, err := node.New(nodeOpts...) | |
if err != nil { | |
return nil, fmt.Errorf("creating a new node: %w", err) | |
} | |
err = n.Start(ctx) | |
if err != nil { | |
return n, fmt.Errorf("creating a new node: %w", err) | |
} | |
ledger, err := n.Ledger() | |
if err != nil { | |
return n, fmt.Errorf("creating a new node: %w", err) | |
} | |
ledger.Announce( | |
ctx, | |
20*time.Second, | |
func() { | |
updatedMap := map[string]interface{}{} | |
updatedMap[name] = &NodeData{ | |
Name: name, | |
LastSeen: time.Now(), | |
ID: nodeID(name), | |
} | |
ledger.Add(servicesID, updatedMap) | |
}, | |
) | |
return n, err | |
} | |
func NewNode(token string) (*node.Node, error) { | |
nodeOpts, err := newNodeOpts(token) | |
if err != nil { | |
return nil, err | |
} | |
n, err := node.New(nodeOpts...) | |
if err != nil { | |
return nil, fmt.Errorf("creating a new node: %w", err) | |
} | |
return n, nil | |
} | |
func newNodeOpts(token string) ([]node.Option, error) { | |
llger := logger.New(log.LevelFatal) | |
defaultInterval := 10 * time.Second | |
// TODO: move this up, expose more config options when creating a node | |
noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true" | |
noLimits := os.Getenv("LOCALAI_P2P_ENABLE_LIMITS") == "true" | |
loglevel := os.Getenv("LOCALAI_P2P_LOGLEVEL") | |
if loglevel == "" { | |
loglevel = "info" | |
} | |
libp2ploglevel := os.Getenv("LOCALAI_LIBP2P_LOGLEVEL") | |
if libp2ploglevel == "" { | |
libp2ploglevel = "fatal" | |
} | |
c := config.Config{ | |
Limit: config.ResourceLimit{ | |
Enable: noLimits, | |
MaxConns: 100, | |
}, | |
NetworkToken: token, | |
LowProfile: false, | |
LogLevel: loglevel, | |
Libp2pLogLevel: libp2ploglevel, | |
Ledger: config.Ledger{ | |
SyncInterval: defaultInterval, | |
AnnounceInterval: defaultInterval, | |
}, | |
NAT: config.NAT{ | |
Service: true, | |
Map: true, | |
RateLimit: true, | |
RateLimitGlobal: 100, | |
RateLimitPeer: 100, | |
RateLimitInterval: defaultInterval, | |
}, | |
Discovery: config.Discovery{ | |
DHT: !noDHT, | |
MDNS: true, | |
Interval: 10 * time.Second, | |
}, | |
Connection: config.Connection{ | |
HolePunch: true, | |
AutoRelay: true, | |
MaxConnections: 1000, | |
}, | |
} | |
nodeOpts, _, err := c.ToOpts(llger) | |
if err != nil { | |
return nil, fmt.Errorf("parsing options: %w", err) | |
} | |
nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...) | |
return nodeOpts, nil | |
} | |
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { | |
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy | |
io.Copy(dst, src) | |
} | |