fetch_ml/internal/scheduler/port_allocator.go
Jeremie Fraeys 43e6446587
feat(scheduler): implement multi-tenant job scheduler with gang scheduling
Add new scheduler component for distributed ML workload orchestration:
- Hub-based coordination for multi-worker clusters
- Pacing controller for rate limiting job submissions
- Priority queue with preemption support
- Port allocator for dynamic service discovery
- Protocol handlers for worker-scheduler communication
- Service manager with OS-specific implementations
- Connection management and state persistence
- Template system for service deployment

Includes comprehensive test suite:
- Unit tests for all core components
- Integration tests for distributed scenarios
- Benchmark tests for performance validation
- Mock fixtures for isolated testing

Refs: scheduler-architecture.md
2026-02-26 12:03:23 -05:00

145 lines
3.4 KiB
Go

package scheduler
import (
"fmt"
"net"
"sync"
"time"
)
// Default port range for service jobs (Jupyter, vLLM, etc.)
const (
DefaultServicePortStart = 8000
DefaultServicePortEnd = 9000
)
// PortAllocator manages dynamic port allocation for service jobs
// It tracks which ports are in use and assigns available ports from a configured range
// This is thread-safe for concurrent allocations across multiple workers
type PortAllocator struct {
mu sync.Mutex
start int
end int
used map[int]allocation
ttl time.Duration // How long to keep port reserved after release
}
type allocation struct {
taskID string
allocated time.Time
}
// NewPortAllocator creates a new port allocator for the given range
// Default port range is 10000-65535 to avoid well-known ports
func NewPortAllocator(start, end int) *PortAllocator {
if start <= 0 {
start = 10000
}
if end <= 0 || end > 65535 {
end = 65535
}
if start >= end {
start = 10000
end = 65535
}
return &PortAllocator{
start: start,
end: end,
used: make(map[int]allocation),
ttl: 30 * time.Second, // Prevent immediate reuse
}
}
// Allocate assigns an available port to a task
// Returns error if no ports available or port is already in use
func (pa *PortAllocator) Allocate(taskID string) (int, error) {
pa.mu.Lock()
defer pa.mu.Unlock()
// Clean up expired allocations
pa.cleanupExpired()
// Try to find an available port
for port := pa.start; port <= pa.end; port++ {
if _, inUse := pa.used[port]; !inUse {
// Verify port is actually available on the system
if !pa.isPortAvailable(port) {
continue
}
pa.used[port] = allocation{
taskID: taskID,
allocated: time.Now(),
}
return port, nil
}
}
return 0, fmt.Errorf("no ports available in range %d-%d", pa.start, pa.end)
}
// Release frees a port for reuse (after TTL expires)
func (pa *PortAllocator) Release(port int) {
pa.mu.Lock()
defer pa.mu.Unlock()
if alloc, exists := pa.used[port]; exists {
// Don't delete immediately - mark with release time
// so it can't be immediately reallocated
pa.used[port] = allocation{
taskID: alloc.taskID + ":released",
allocated: time.Now().Add(-pa.ttl), // Expired
}
}
}
// GetAllocation returns the task ID for a given port, or empty if not allocated
func (pa *PortAllocator) GetAllocation(port int) string {
pa.mu.Lock()
defer pa.mu.Unlock()
if alloc, exists := pa.used[port]; exists && !pa.isExpired(alloc) {
return alloc.taskID
}
return ""
}
// cleanupExpired removes expired allocations
func (pa *PortAllocator) cleanupExpired() {
for port, alloc := range pa.used {
if pa.isExpired(alloc) {
delete(pa.used, port)
}
}
}
// isExpired checks if an allocation has expired
func (pa *PortAllocator) isExpired(alloc allocation) bool {
return time.Since(alloc.allocated) > pa.ttl
}
// isPortAvailable checks if a port is actually available on the system
func (pa *PortAllocator) isPortAvailable(port int) bool {
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return false
}
ln.Close()
return true
}
// AvailableCount returns the number of available ports
func (pa *PortAllocator) AvailableCount() int {
pa.mu.Lock()
defer pa.mu.Unlock()
pa.cleanupExpired()
return (pa.end - pa.start + 1) - len(pa.used)
}
// SetTTL changes the time-to-live for released ports (for testing)
func (pa *PortAllocator) SetTTL(ttl time.Duration) {
pa.mu.Lock()
defer pa.mu.Unlock()
pa.ttl = ttl
}