package scheduler import ( "fmt" "net" "sync" "time" ) // Default port range for service jobs (Jupyter, vLLM, etc.) const ( DefaultServicePortStart = 8000 DefaultServicePortEnd = 9000 ) // PortAllocator manages dynamic port allocation for service jobs // It tracks which ports are in use and assigns available ports from a configured range // This is thread-safe for concurrent allocations across multiple workers type PortAllocator struct { mu sync.Mutex start int end int used map[int]allocation ttl time.Duration // How long to keep port reserved after release } type allocation struct { taskID string allocated time.Time } // NewPortAllocator creates a new port allocator for the given range // Default port range is 10000-65535 to avoid well-known ports func NewPortAllocator(start, end int) *PortAllocator { if start <= 0 { start = 10000 } if end <= 0 || end > 65535 { end = 65535 } if start >= end { start = 10000 end = 65535 } return &PortAllocator{ start: start, end: end, used: make(map[int]allocation), ttl: 30 * time.Second, // Prevent immediate reuse } } // Allocate assigns an available port to a task // Returns error if no ports available or port is already in use func (pa *PortAllocator) Allocate(taskID string) (int, error) { pa.mu.Lock() defer pa.mu.Unlock() // Clean up expired allocations pa.cleanupExpired() // Try to find an available port for port := pa.start; port <= pa.end; port++ { if _, inUse := pa.used[port]; !inUse { // Verify port is actually available on the system if !pa.isPortAvailable(port) { continue } pa.used[port] = allocation{ taskID: taskID, allocated: time.Now(), } return port, nil } } return 0, fmt.Errorf("no ports available in range %d-%d", pa.start, pa.end) } // Release frees a port for reuse (after TTL expires) func (pa *PortAllocator) Release(port int) { pa.mu.Lock() defer pa.mu.Unlock() if alloc, exists := pa.used[port]; exists { // Don't delete immediately - mark with release time // so it can't be immediately reallocated pa.used[port] = allocation{ taskID: alloc.taskID + ":released", allocated: time.Now().Add(-pa.ttl), // Expired } } } // GetAllocation returns the task ID for a given port, or empty if not allocated func (pa *PortAllocator) GetAllocation(port int) string { pa.mu.Lock() defer pa.mu.Unlock() if alloc, exists := pa.used[port]; exists && !pa.isExpired(alloc) { return alloc.taskID } return "" } // cleanupExpired removes expired allocations func (pa *PortAllocator) cleanupExpired() { for port, alloc := range pa.used { if pa.isExpired(alloc) { delete(pa.used, port) } } } // isExpired checks if an allocation has expired func (pa *PortAllocator) isExpired(alloc allocation) bool { return time.Since(alloc.allocated) > pa.ttl } // isPortAvailable checks if a port is actually available on the system func (pa *PortAllocator) isPortAvailable(port int) bool { ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return false } _ = ln.Close() return true } // AvailableCount returns the number of available ports func (pa *PortAllocator) AvailableCount() int { pa.mu.Lock() defer pa.mu.Unlock() pa.cleanupExpired() return (pa.end - pa.start + 1) - len(pa.used) } // SetTTL changes the time-to-live for released ports (for testing) func (pa *PortAllocator) SetTTL(ttl time.Duration) { pa.mu.Lock() defer pa.mu.Unlock() pa.ttl = ttl }