Spaces:
Configuration error
Configuration error
//go:build p2p | |
// +build p2p | |
package p2p | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"net" | |
"github.com/mudler/edgevpn/pkg/node" | |
"github.com/rs/zerolog/log" | |
) | |
func (f *FederatedServer) Start(ctx context.Context) error { | |
n, err := NewNode(f.p2ptoken) | |
if err != nil { | |
return fmt.Errorf("creating a new node: %w", err) | |
} | |
err = n.Start(ctx) | |
if err != nil { | |
return fmt.Errorf("creating a new node: %w", err) | |
} | |
if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { | |
log.Debug().Msgf("Discovered node: %s", tunnel.ID) | |
}, false); err != nil { | |
return err | |
} | |
return f.proxy(ctx, n) | |
} | |
func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { | |
log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr) | |
// Open local port for listening | |
l, err := net.Listen("tcp", fs.listenAddr) | |
if err != nil { | |
log.Error().Err(err).Msg("Error listening") | |
return err | |
} | |
go func() { | |
<-ctx.Done() | |
l.Close() | |
}() | |
nodeAnnounce(ctx, node) | |
defer l.Close() | |
for { | |
select { | |
case <-ctx.Done(): | |
return errors.New("context canceled") | |
default: | |
log.Debug().Msgf("New connection from %s", l.Addr().String()) | |
// Listen for an incoming connection. | |
conn, err := l.Accept() | |
if err != nil { | |
fmt.Println("Error accepting: ", err.Error()) | |
continue | |
} | |
// Handle connections in a new goroutine, forwarding to the p2p service | |
go func() { | |
workerID := "" | |
if fs.workerTarget != "" { | |
workerID = fs.workerTarget | |
} else if fs.loadBalanced { | |
log.Debug().Msgf("Load balancing request") | |
workerID = fs.SelectLeastUsedServer() | |
if workerID == "" { | |
log.Debug().Msgf("Least used server not found, selecting random") | |
workerID = fs.RandomServer() | |
} | |
} else { | |
workerID = fs.RandomServer() | |
} | |
if workerID == "" { | |
log.Error().Msg("No available nodes yet") | |
fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect") | |
return | |
} | |
log.Debug().Msgf("Selected node %s", workerID) | |
nodeData, exists := GetNode(fs.service, workerID) | |
if !exists { | |
log.Error().Msgf("Node %s not found", workerID) | |
fs.sendHTMLResponse(conn, 404, "Node not found") | |
return | |
} | |
proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) | |
if fs.loadBalanced { | |
fs.RecordRequest(workerID) | |
} | |
}() | |
} | |
} | |
} | |
// sendHTMLResponse sends a basic HTML response with a status code and a message. | |
// This is extracted to make the HTML content maintainable. | |
func (fs *FederatedServer) sendHTMLResponse(conn net.Conn, statusCode int, message string) { | |
defer conn.Close() | |
// Define the HTML content separately for easier maintenance. | |
htmlContent := fmt.Sprintf("<html><body><h1>%s</h1></body></html>\r\n", message) | |
// Create the HTTP response with dynamic status code and content. | |
response := fmt.Sprintf( | |
"HTTP/1.1 %d %s\r\n"+ | |
"Content-Type: text/html\r\n"+ | |
"Connection: close\r\n"+ | |
"\r\n"+ | |
"%s", | |
statusCode, getHTTPStatusText(statusCode), htmlContent, | |
) | |
// Write the response to the client connection. | |
_, writeErr := io.WriteString(conn, response) | |
if writeErr != nil { | |
log.Error().Err(writeErr).Msg("Error writing response to client") | |
} | |
} | |
// getHTTPStatusText returns a textual representation of HTTP status codes. | |
func getHTTPStatusText(statusCode int) string { | |
switch statusCode { | |
case 503: | |
return "Service Unavailable" | |
case 404: | |
return "Not Found" | |
case 200: | |
return "OK" | |
default: | |
return "Unknown Status" | |
} | |
} | |