fetch_ml/internal/worker/tenant/quota.go
Jeremie Fraeys a981e89005
feat(security): add audit subsystem and tenant isolation
Implement comprehensive audit and security infrastructure:
- Immutable audit logs with platform-specific backends (Linux/Other)
- Sealed log entries with tamper-evident checksums
- Audit alert system for real-time security notifications
- Log rotation with retention policies
- Checkpoint-based audit verification

Add multi-tenant security features:
- Tenant manager with quota enforcement
- Middleware for tenant authentication/authorization
- Per-tenant cryptographic key isolation
- Supply chain security for container verification
- Cross-platform secure file utilities (Unix/Windows)

Add test coverage:
- Unit tests for audit alerts and sealed logs
- Platform-specific audit backend tests
2026-02-26 12:03:45 -05:00

266 lines
7.8 KiB
Go

// Package tenant provides multi-tenant isolation and resource management.
package tenant
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
"github.com/jfraeys/fetch_ml/internal/logging"
)
// QuotaManager tracks resource usage per tenant
type QuotaManager struct {
usage map[string]*TenantUsage
mu sync.RWMutex
}
// TenantUsage tracks current resource consumption
type TenantUsage struct {
TenantID string `json:"tenant_id"`
ActiveJobs int `json:"active_jobs"`
GPUsAllocated int `json:"gpus_allocated"`
MemoryGBUsed int `json:"memory_gb_used"`
StorageGBUsed int `json:"storage_gb_used"`
CPUCoresUsed int `json:"cpu_cores_used"`
ArtifactsThisHour int `json:"artifacts_this_hour"`
LastReset time.Time `json:"last_reset"`
}
// NewQuotaManager creates a new quota manager
func NewQuotaManager() *QuotaManager {
return &QuotaManager{
usage: make(map[string]*TenantUsage),
}
}
// GetUsage returns current usage for a tenant
func (qm *QuotaManager) GetUsage(tenantID string) *TenantUsage {
qm.mu.RLock()
defer qm.mu.RUnlock()
if usage, exists := qm.usage[tenantID]; exists {
return usage
}
return &TenantUsage{
TenantID: tenantID,
LastReset: time.Now().UTC(),
}
}
// CheckQuota verifies if a requested operation fits within tenant quotas
func (qm *QuotaManager) CheckQuota(tenantID string, quota ResourceQuota, req ResourceRequest) error {
qm.mu.RLock()
defer qm.mu.RUnlock()
usage := qm.getOrCreateUsage(tenantID)
// Reset hourly counters if needed
if time.Since(usage.LastReset) > time.Hour {
usage.ArtifactsThisHour = 0
usage.LastReset = time.Now().UTC()
}
// Check each resource
if usage.ActiveJobs+req.Jobs > quota.MaxConcurrentJobs {
return fmt.Errorf("quota exceeded: concurrent jobs %d/%d", usage.ActiveJobs+req.Jobs, quota.MaxConcurrentJobs)
}
if usage.GPUsAllocated+req.GPUs > quota.MaxGPUs {
return fmt.Errorf("quota exceeded: GPUs %d/%d", usage.GPUsAllocated+req.GPUs, quota.MaxGPUs)
}
if usage.MemoryGBUsed+req.MemoryGB > quota.MaxMemoryGB {
return fmt.Errorf("quota exceeded: memory %d/%d GB", usage.MemoryGBUsed+req.MemoryGB, quota.MaxMemoryGB)
}
if usage.StorageGBUsed+req.StorageGB > quota.MaxStorageGB {
return fmt.Errorf("quota exceeded: storage %d/%d GB", usage.StorageGBUsed+req.StorageGB, quota.MaxStorageGB)
}
if usage.CPUCoresUsed+req.CPUCores > quota.MaxCPUCores {
return fmt.Errorf("quota exceeded: CPU cores %d/%d", usage.CPUCoresUsed+req.CPUCores, quota.MaxCPUCores)
}
if req.Artifacts > 0 && usage.ArtifactsThisHour+req.Artifacts > quota.MaxArtifactsPerHour {
return fmt.Errorf("quota exceeded: artifacts per hour %d/%d", usage.ArtifactsThisHour+req.Artifacts, quota.MaxArtifactsPerHour)
}
return nil
}
// Allocate reserves resources for a tenant
func (qm *QuotaManager) Allocate(tenantID string, req ResourceRequest) error {
qm.mu.Lock()
defer qm.mu.Unlock()
usage := qm.getOrCreateUsage(tenantID)
usage.ActiveJobs += req.Jobs
usage.GPUsAllocated += req.GPUs
usage.MemoryGBUsed += req.MemoryGB
usage.StorageGBUsed += req.StorageGB
usage.CPUCoresUsed += req.CPUCores
return nil
}
// Release frees resources for a tenant
func (qm *QuotaManager) Release(tenantID string, req ResourceRequest) {
qm.mu.Lock()
defer qm.mu.Unlock()
usage, exists := qm.usage[tenantID]
if !exists {
return
}
usage.ActiveJobs = max(0, usage.ActiveJobs-req.Jobs)
usage.GPUsAllocated = max(0, usage.GPUsAllocated-req.GPUs)
usage.MemoryGBUsed = max(0, usage.MemoryGBUsed-req.MemoryGB)
usage.StorageGBUsed = max(0, usage.StorageGBUsed-req.StorageGB)
usage.CPUCoresUsed = max(0, usage.CPUCoresUsed-req.CPUCores)
}
// RecordArtifact increments the artifact counter for a tenant
func (qm *QuotaManager) RecordArtifact(tenantID string) {
qm.mu.Lock()
defer qm.mu.Unlock()
usage := qm.getOrCreateUsage(tenantID)
// Reset if needed
if time.Since(usage.LastReset) > time.Hour {
usage.ArtifactsThisHour = 0
usage.LastReset = time.Now().UTC()
}
usage.ArtifactsThisHour++
}
func (qm *QuotaManager) getOrCreateUsage(tenantID string) *TenantUsage {
if usage, exists := qm.usage[tenantID]; exists {
return usage
}
usage := &TenantUsage{
TenantID: tenantID,
LastReset: time.Now().UTC(),
}
qm.usage[tenantID] = usage
return usage
}
// ResourceRequest represents a request for resources
type ResourceRequest struct {
Jobs int
GPUs int
MemoryGB int
StorageGB int
CPUCores int
Artifacts int
}
// AuditLogger handles per-tenant audit logging
type AuditLogger struct {
basePath string
loggers map[string]*logging.Logger
mu sync.RWMutex
}
// AuditEventType represents different types of audit events
type AuditEventType string
const (
AuditTenantCreated AuditEventType = "tenant_created"
AuditTenantDeactivated AuditEventType = "tenant_deactivated"
AuditTenantUpdated AuditEventType = "tenant_updated"
AuditResourceAccess AuditEventType = "resource_access"
AuditResourceCreated AuditEventType = "resource_created"
AuditResourceDeleted AuditEventType = "resource_deleted"
AuditJobSubmitted AuditEventType = "job_submitted"
AuditJobCompleted AuditEventType = "job_completed"
AuditJobFailed AuditEventType = "job_failed"
AuditCrossTenantDeny AuditEventType = "cross_tenant_deny"
AuditQuotaExceeded AuditEventType = "quota_exceeded"
AuditWorkerSanitized AuditEventType = "worker_sanitized"
AuditEncryptionOp AuditEventType = "encryption_op"
AuditDecryptionOp AuditEventType = "decryption_op"
)
// AuditEvent represents a single audit log entry
type AuditEvent struct {
Type AuditEventType `json:"type"`
TenantID string `json:"tenant_id"`
UserID string `json:"user_id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
JobID string `json:"job_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Success bool `json:"success"`
Details map[string]any `json:"details,omitempty"`
IPAddress string `json:"ip_address,omitempty"`
}
// NewAuditLogger creates a new per-tenant audit logger
func NewAuditLogger(basePath string) *AuditLogger {
return &AuditLogger{
basePath: basePath,
loggers: make(map[string]*logging.Logger),
}
}
// LogEvent logs an audit event for a specific tenant
func (al *AuditLogger) LogEvent(ctx context.Context, event AuditEvent) error {
al.mu.Lock()
defer al.mu.Unlock()
// Get or create tenant-specific logger
logger, err := al.getOrCreateLogger(event.TenantID)
if err != nil {
return fmt.Errorf("failed to get audit logger for tenant %s: %w", event.TenantID, err)
}
// Log the event
logger.Info("audit_event",
"type", event.Type,
"tenant_id", event.TenantID,
"user_id", event.UserID,
"resource_id", event.ResourceID,
"job_id", event.JobID,
"timestamp", event.Timestamp.Format(time.RFC3339Nano),
"success", event.Success,
"details", event.Details,
"ip_address", event.IPAddress,
)
return nil
}
// QueryEvents queries audit events for a tenant (placeholder for future implementation)
func (al *AuditLogger) QueryEvents(tenantID string, start, end time.Time, eventTypes []AuditEventType) ([]AuditEvent, error) {
// In production, this would query from a centralized logging system
// For now, return empty
return []AuditEvent{}, nil
}
func (al *AuditLogger) getOrCreateLogger(tenantID string) (*logging.Logger, error) {
if logger, exists := al.loggers[tenantID]; exists {
return logger, nil
}
// Create tenant audit directory
tenantAuditPath := filepath.Join(al.basePath, tenantID)
if err := os.MkdirAll(tenantAuditPath, 0750); err != nil {
return nil, fmt.Errorf("failed to create audit directory: %w", err)
}
// Create logger for this tenant (JSON format to file)
logger := logging.NewFileLogger(slog.LevelInfo, true, filepath.Join(tenantAuditPath, "audit.log"))
al.loggers[tenantID] = logger
return logger, nil
}