fetch_ml/internal/worker/tenant/manager.go
Jeremie Fraeys a981e89005
feat(security): add audit subsystem and tenant isolation
Implement comprehensive audit and security infrastructure:
- Immutable audit logs with platform-specific backends (Linux/Other)
- Sealed log entries with tamper-evident checksums
- Audit alert system for real-time security notifications
- Log rotation with retention policies
- Checkpoint-based audit verification

Add multi-tenant security features:
- Tenant manager with quota enforcement
- Middleware for tenant authentication/authorization
- Per-tenant cryptographic key isolation
- Supply chain security for container verification
- Cross-platform secure file utilities (Unix/Windows)

Add test coverage:
- Unit tests for audit alerts and sealed logs
- Platform-specific audit backend tests
2026-02-26 12:03:45 -05:00

263 lines
7.3 KiB
Go

// Package tenant provides multi-tenant isolation and resource management.
// This implements Phase 10 Multi-Tenant Server Security.
package tenant
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/jfraeys/fetch_ml/internal/logging"
)
// Tenant represents an isolated tenant in the multi-tenant system
type Tenant struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedAt time.Time `json:"created_at"`
Config TenantConfig `json:"config"`
Metadata map[string]string `json:"metadata"`
Active bool `json:"active"`
LastAccess time.Time `json:"last_access"`
}
// TenantConfig holds tenant-specific configuration
type TenantConfig struct {
ResourceQuota ResourceQuota `json:"resource_quota"`
SecurityPolicy SecurityPolicy `json:"security_policy"`
IsolationLevel IsolationLevel `json:"isolation_level"`
AllowedImages []string `json:"allowed_images"`
AllowedNetworks []string `json:"allowed_networks"`
}
// ResourceQuota defines resource limits per tenant
type ResourceQuota struct {
MaxConcurrentJobs int `json:"max_concurrent_jobs"`
MaxGPUs int `json:"max_gpus"`
MaxMemoryGB int `json:"max_memory_gb"`
MaxStorageGB int `json:"max_storage_gb"`
MaxCPUCores int `json:"max_cpu_cores"`
MaxRuntimeHours int `json:"max_runtime_hours"`
MaxArtifactsPerHour int `json:"max_artifacts_per_hour"`
}
// SecurityPolicy defines security constraints for a tenant
type SecurityPolicy struct {
RequireEncryption bool `json:"require_encryption"`
RequireAuditLogging bool `json:"require_audit_logging"`
RequireSandbox bool `json:"require_sandbox"`
ProhibitedPackages []string `json:"prohibited_packages"`
AllowedRegistries []string `json:"allowed_registries"`
NetworkPolicy string `json:"network_policy"`
}
// IsolationLevel defines the degree of tenant isolation
type IsolationLevel string
const (
// IsolationSoft uses namespace/process separation only
IsolationSoft IsolationLevel = "soft"
// IsolationHard uses container/vm-level separation
IsolationHard IsolationLevel = "hard"
// IsolationDedicated uses dedicated worker pools per tenant
IsolationDedicated IsolationLevel = "dedicated"
)
// Manager handles tenant lifecycle and isolation
type Manager struct {
tenants map[string]*Tenant
mu sync.RWMutex
logger *logging.Logger
basePath string
quotas *QuotaManager
auditLog *AuditLogger
}
// NewManager creates a new tenant manager
func NewManager(basePath string, logger *logging.Logger) (*Manager, error) {
if err := os.MkdirAll(basePath, 0750); err != nil {
return nil, fmt.Errorf("failed to create tenant base path: %w", err)
}
return &Manager{
tenants: make(map[string]*Tenant),
logger: logger,
basePath: basePath,
quotas: NewQuotaManager(),
auditLog: NewAuditLogger(filepath.Join(basePath, "audit")),
}, nil
}
// CreateTenant creates a new tenant with the specified configuration
func (m *Manager) CreateTenant(ctx context.Context, id, name string, config TenantConfig) (*Tenant, error) {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.tenants[id]; exists {
return nil, fmt.Errorf("tenant %s already exists", id)
}
tenant := &Tenant{
ID: id,
Name: name,
CreatedAt: time.Now().UTC(),
Config: config,
Metadata: make(map[string]string),
Active: true,
LastAccess: time.Now().UTC(),
}
// Create tenant workspace
tenantPath := filepath.Join(m.basePath, id)
if err := os.MkdirAll(tenantPath, 0750); err != nil {
return nil, fmt.Errorf("failed to create tenant workspace: %w", err)
}
// Create subdirectories
subdirs := []string{"artifacts", "snapshots", "logs", "cache"}
for _, subdir := range subdirs {
if err := os.MkdirAll(filepath.Join(tenantPath, subdir), 0750); err != nil {
return nil, fmt.Errorf("failed to create tenant %s directory: %w", subdir, err)
}
}
m.tenants[id] = tenant
m.logger.Info("tenant created",
"tenant_id", id,
"tenant_name", name,
"isolation_level", config.IsolationLevel,
)
m.auditLog.LogEvent(ctx, AuditEvent{
Type: AuditTenantCreated,
TenantID: id,
Timestamp: time.Now().UTC(),
})
return tenant, nil
}
// GetTenant retrieves a tenant by ID
func (m *Manager) GetTenant(id string) (*Tenant, error) {
m.mu.RLock()
defer m.mu.RUnlock()
tenant, exists := m.tenants[id]
if !exists {
return nil, fmt.Errorf("tenant %s not found", id)
}
if !tenant.Active {
return nil, fmt.Errorf("tenant %s is inactive", id)
}
tenant.LastAccess = time.Now().UTC()
return tenant, nil
}
// ValidateTenantAccess checks if a tenant has access to a resource
func (m *Manager) ValidateTenantAccess(ctx context.Context, tenantID, resourceTenantID string) error {
if tenantID == resourceTenantID {
return nil // Same tenant, always allowed
}
// Cross-tenant access - check if allowed
// By default, deny all cross-tenant access
return fmt.Errorf("cross-tenant access denied: tenant %s cannot access resources of tenant %s", tenantID, resourceTenantID)
}
// GetTenantWorkspace returns the isolated workspace path for a tenant
func (m *Manager) GetTenantWorkspace(tenantID string) (string, error) {
if _, err := m.GetTenant(tenantID); err != nil {
return "", err
}
return filepath.Join(m.basePath, tenantID), nil
}
// DeactivateTenant deactivates a tenant (soft delete)
func (m *Manager) DeactivateTenant(ctx context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
tenant, exists := m.tenants[id]
if !exists {
return fmt.Errorf("tenant %s not found", id)
}
tenant.Active = false
m.logger.Info("tenant deactivated", "tenant_id", id)
m.auditLog.LogEvent(ctx, AuditEvent{
Type: AuditTenantDeactivated,
TenantID: id,
Timestamp: time.Now().UTC(),
})
return nil
}
// SanitizeForTenant prepares the worker environment for a different tenant
func (m *Manager) SanitizeForTenant(ctx context.Context, newTenantID string) error {
// Log the tenant transition
m.logger.Info("sanitizing worker for tenant transition",
"new_tenant_id", newTenantID,
)
// Clear any tenant-specific caches
// In production, this would also:
// - Clear GPU memory
// - Remove temporary files
// - Reset environment variables
// - Clear any in-memory state
m.auditLog.LogEvent(ctx, AuditEvent{
Type: AuditWorkerSanitized,
TenantID: newTenantID,
Timestamp: time.Now().UTC(),
})
return nil
}
// ListTenants returns all active tenants
func (m *Manager) ListTenants() []*Tenant {
m.mu.RLock()
defer m.mu.RUnlock()
var active []*Tenant
for _, t := range m.tenants {
if t.Active {
active = append(active, t)
}
}
return active
}
// DefaultTenantConfig returns a default tenant configuration
func DefaultTenantConfig() TenantConfig {
return TenantConfig{
ResourceQuota: ResourceQuota{
MaxConcurrentJobs: 5,
MaxGPUs: 1,
MaxMemoryGB: 32,
MaxStorageGB: 100,
MaxCPUCores: 8,
MaxRuntimeHours: 24,
MaxArtifactsPerHour: 10,
},
SecurityPolicy: SecurityPolicy{
RequireEncryption: true,
RequireAuditLogging: true,
RequireSandbox: true,
ProhibitedPackages: []string{},
AllowedRegistries: []string{"docker.io", "ghcr.io"},
NetworkPolicy: "restricted",
},
IsolationLevel: IsolationHard,
}
}