fetch_ml/internal/jupyter/network_manager.go
Jeremie Fraeys 6b2c377680
refactor(jupyter): enhance security and scheduler integration
Update Jupyter integration for security and scheduler support:
- Enhanced security configuration with audit logging
- Health monitoring with scheduler event integration
- Package manager with network policy enforcement
- Service manager with lifecycle hooks
- Network manager with tenant isolation
- Workspace metadata with tenant tags
- Config with resource limits
- Podman container integration improvements
- Experiment manager with tracking integration
- Manifest runner with security checks
2026-02-26 12:06:35 -05:00

376 lines
11 KiB
Go

package jupyter
import (
"context"
"fmt"
"net"
"time"
"github.com/jfraeys/fetch_ml/internal/logging"
)
// NetworkManager handles network configuration for Jupyter services
type NetworkManager struct {
logger *logging.Logger
portAllocator *PortAllocator
usedPorts map[int]string // port -> service_id
}
// PortAllocator manages port allocation for services
type PortAllocator struct {
usedPorts map[int]bool
startPort int
endPort int
}
// NewNetworkManager creates a new network manager
func NewNetworkManager(logger *logging.Logger, startPort, endPort int) *NetworkManager {
return &NetworkManager{
logger: logger,
portAllocator: NewPortAllocator(startPort, endPort),
usedPorts: make(map[int]string),
}
}
// AllocatePort allocates a port for a service
func (nm *NetworkManager) AllocatePort(serviceID string, preferredPort int) (int, error) {
// If preferred port is specified, try to use it
if preferredPort > 0 {
if nm.isPortAvailable(preferredPort) {
nm.usedPorts[preferredPort] = serviceID
nm.portAllocator.usedPorts[preferredPort] = true
nm.logger.Info("allocated preferred port", "service_id", serviceID, "port", preferredPort)
return preferredPort, nil
}
nm.logger.Warn("preferred port not available, allocating alternative",
"service_id", serviceID, "preferred_port", preferredPort)
}
// Allocate any available port
port, err := nm.portAllocator.AllocatePort()
if err != nil {
return 0, fmt.Errorf("failed to allocate port: %w", err)
}
nm.usedPorts[port] = serviceID
nm.logger.Info("allocated port", "service_id", serviceID, "port", port)
return port, nil
}
// ReleasePort releases a port for a service
func (nm *NetworkManager) ReleasePort(serviceID string) error {
var releasedPorts []int
for port, sid := range nm.usedPorts {
if sid == serviceID {
delete(nm.usedPorts, port)
nm.portAllocator.ReleasePort(port)
releasedPorts = append(releasedPorts, port)
}
}
if len(releasedPorts) > 0 {
nm.logger.Info("released ports", "service_id", serviceID, "ports", releasedPorts)
}
return nil
}
// GetPortForService returns the port allocated to a service
func (nm *NetworkManager) GetPortForService(serviceID string) (int, error) {
for port, sid := range nm.usedPorts {
if sid == serviceID {
return port, nil
}
}
return 0, fmt.Errorf("no port allocated for service %s", serviceID)
}
// ValidateNetworkConfig validates network configuration
func (nm *NetworkManager) ValidateNetworkConfig(config *NetworkConfig) error {
if config.HostPort <= 0 || config.HostPort > 65535 {
return fmt.Errorf("invalid host port: %d", config.HostPort)
}
if config.ContainerPort <= 0 || config.ContainerPort > 65535 {
return fmt.Errorf("invalid container port: %d", config.ContainerPort)
}
if config.BindAddress == "" {
config.BindAddress = "127.0.0.1"
}
// Validate bind address
if net.ParseIP(config.BindAddress) == nil {
return fmt.Errorf("invalid bind address: %s", config.BindAddress)
}
// Check if port is available
if !nm.isPortAvailable(config.HostPort) {
return fmt.Errorf("port %d is already in use", config.HostPort)
}
return nil
}
// PrepareNetworkConfig prepares network configuration for a service
func (nm *NetworkManager) PrepareNetworkConfig(
serviceID string,
userConfig *NetworkConfig,
) (*NetworkConfig, error) {
config := &NetworkConfig{
ContainerPort: 8888,
BindAddress: "127.0.0.1",
EnableToken: false,
EnablePassword: false,
AllowRemote: false,
NetworkName: "jupyter-network",
}
// Apply user configuration
if userConfig != nil {
config.ContainerPort = userConfig.ContainerPort
config.BindAddress = userConfig.BindAddress
config.EnableToken = userConfig.EnableToken
config.Token = userConfig.Token
config.EnablePassword = userConfig.EnablePassword
config.Password = userConfig.Password
config.AllowRemote = userConfig.AllowRemote
config.NetworkName = userConfig.NetworkName
}
// Allocate host port
port, err := nm.AllocatePort(serviceID, userConfig.HostPort)
if err != nil {
return nil, err
}
config.HostPort = port
// Generate token if enabled but not provided
if config.EnableToken && config.Token == "" {
config.Token = nm.generateToken()
}
// Generate password if enabled but not provided
if config.EnablePassword && config.Password == "" {
config.Password = nm.generatePassword()
}
return config, nil
}
// isPortAvailable checks if a port is available
func (nm *NetworkManager) isPortAvailable(port int) bool {
// Check if allocated to our services
if _, allocated := nm.usedPorts[port]; allocated {
return false
}
// Check if port is in use by system
dialer := &net.Dialer{Timeout: 1 * time.Second}
conn, err := dialer.DialContext(context.Background(), "tcp", fmt.Sprintf(":%d", port))
if err != nil {
return true // Port is available
}
defer func() {
if err := conn.Close(); err != nil {
nm.logger.Warn("failed to close connection", "error", err)
}
}()
return false // Port is in use
}
// generateToken generates a random token for Jupyter
func (nm *NetworkManager) generateToken() string {
// Simple token generation - in production, use crypto/rand
return fmt.Sprintf("token-%d", time.Now().Unix())
}
// generatePassword generates a random password for Jupyter
func (nm *NetworkManager) generatePassword() string {
// Simple password generation - in production, use crypto/rand
return fmt.Sprintf("pass-%d", time.Now().Unix())
}
// GetServiceURL generates the URL for accessing a Jupyter service
func (nm *NetworkManager) GetServiceURL(config *NetworkConfig) string {
url := fmt.Sprintf("http://%s:%d", config.BindAddress, config.HostPort)
// Add token if enabled
if config.EnableToken && config.Token != "" {
url += fmt.Sprintf("?token=%s", config.Token)
}
return url
}
// ValidateRemoteAccess checks if remote access is properly configured
func (nm *NetworkManager) ValidateRemoteAccess(config *NetworkConfig) error {
if config.AllowRemote {
if config.BindAddress == "127.0.0.1" || config.BindAddress == "localhost" {
return fmt.Errorf("remote access enabled but bind address is local only: %s", config.BindAddress)
}
if !config.EnableToken && !config.EnablePassword {
return fmt.Errorf("remote access requires authentication (token or password)")
}
}
return nil
}
// NewPortAllocator creates a new port allocator
func NewPortAllocator(startPort, endPort int) *PortAllocator {
return &PortAllocator{
startPort: startPort,
endPort: endPort,
usedPorts: make(map[int]bool),
}
}
// AllocatePort allocates an available port
func (pa *PortAllocator) AllocatePort() (int, error) {
for port := pa.startPort; port <= pa.endPort; port++ {
if !pa.usedPorts[port] {
pa.usedPorts[port] = true
return port, nil
}
}
return 0, fmt.Errorf("no available ports in range %d-%d", pa.startPort, pa.endPort)
}
// ReleasePort releases a port
func (pa *PortAllocator) ReleasePort(port int) {
delete(pa.usedPorts, port)
}
// GetAvailablePorts returns a list of available ports
func (pa *PortAllocator) GetAvailablePorts() []int {
var available []int
for port := pa.startPort; port <= pa.endPort; port++ {
if !pa.usedPorts[port] {
available = append(available, port)
}
}
return available
}
// GetUsedPorts returns a list of used ports
func (pa *PortAllocator) GetUsedPorts() []int {
var used []int
for port := range pa.usedPorts {
used = append(used, port)
}
return used
}
// IsPortAvailable checks if a specific port is available
func (pa *PortAllocator) IsPortAvailable(port int) bool {
if port < pa.startPort || port > pa.endPort {
return false
}
return !pa.usedPorts[port]
}
// GetPortRange returns the port range
func (pa *PortAllocator) GetPortRange() (int, int) {
return pa.startPort, pa.endPort
}
// SetPortRange sets the port range
func (pa *PortAllocator) SetPortRange(startPort, endPort int) error {
if startPort <= 0 || endPort <= 0 || startPort > endPort {
return fmt.Errorf("invalid port range: %d-%d", startPort, endPort)
}
// Check if current used ports are outside new range
for port := range pa.usedPorts {
if port < startPort || port > endPort {
return fmt.Errorf("cannot change range: port %d is in use and outside new range", port)
}
}
pa.startPort = startPort
pa.endPort = endPort
return nil
}
// Cleanup releases all ports allocated to a service
func (nm *NetworkManager) Cleanup(serviceID string) {
if err := nm.ReleasePort(serviceID); err != nil {
nm.logger.Warn("failed to cleanup network resources", "service_id", serviceID, "error", err)
}
}
// GetNetworkStatus returns network status information
func (nm *NetworkManager) GetNetworkStatus() *NetworkStatus {
return &NetworkStatus{
TotalPorts: nm.portAllocator.endPort - nm.portAllocator.startPort + 1,
AvailablePorts: len(nm.portAllocator.GetAvailablePorts()),
UsedPorts: len(nm.portAllocator.GetUsedPorts()),
PortRange: fmt.Sprintf("%d-%d", nm.portAllocator.startPort, nm.portAllocator.endPort),
Services: len(nm.usedPorts),
}
}
// NetworkStatus contains network status information
type NetworkStatus struct {
PortRange string `json:"port_range"`
TotalPorts int `json:"total_ports"`
AvailablePorts int `json:"available_ports"`
UsedPorts int `json:"used_ports"`
Services int `json:"services"`
}
// TestConnectivity tests if a Jupyter service is accessible
func (nm *NetworkManager) TestConnectivity(_ context.Context, config *NetworkConfig) error {
url := nm.GetServiceURL(config)
// Simple connectivity test
dialer := &net.Dialer{Timeout: 5 * time.Second}
conn, err := dialer.DialContext(
context.Background(),
"tcp",
fmt.Sprintf("%s:%d", config.BindAddress, config.HostPort),
)
if err != nil {
return fmt.Errorf("cannot connect to %s: %w", url, err)
}
defer func() {
if err := conn.Close(); err != nil {
nm.logger.Warn("failed to close connection", "error", err)
}
}()
nm.logger.Info("connectivity test passed", "url", stripTokenFromURL(url))
return nil
}
// FindAvailablePort finds an available port in the specified range
func (nm *NetworkManager) FindAvailablePort(startPort, endPort int) (int, error) {
for port := startPort; port <= endPort; port++ {
if nm.isPortAvailable(port) {
return port, nil
}
}
return 0, fmt.Errorf("no available ports in range %d-%d", startPort, endPort)
}
// ReservePort reserves a specific port for a service
func (nm *NetworkManager) ReservePort(serviceID string, port int) error {
if !nm.isPortAvailable(port) {
return fmt.Errorf("port %d is not available", port)
}
nm.usedPorts[port] = serviceID
nm.portAllocator.usedPorts[port] = true
nm.logger.Info("reserved port", "service_id", serviceID, "port", port)
return nil
}
// GetServiceForPort returns the service ID using a port
func (nm *NetworkManager) GetServiceForPort(port int) (string, error) {
serviceID, exists := nm.usedPorts[port]
if !exists {
return "", fmt.Errorf("no service using port %d", port)
}
return serviceID, nil
}