Implement comprehensive audit and security infrastructure: - Immutable audit logs with platform-specific backends (Linux/Other) - Sealed log entries with tamper-evident checksums - Audit alert system for real-time security notifications - Log rotation with retention policies - Checkpoint-based audit verification Add multi-tenant security features: - Tenant manager with quota enforcement - Middleware for tenant authentication/authorization - Per-tenant cryptographic key isolation - Supply chain security for container verification - Cross-platform secure file utilities (Unix/Windows) Add test coverage: - Unit tests for audit alerts and sealed logs - Platform-specific audit backend tests
266 lines
7.8 KiB
Go
266 lines
7.8 KiB
Go
// Package tenant provides multi-tenant isolation and resource management.
|
|
package tenant
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/logging"
|
|
)
|
|
|
|
// QuotaManager tracks resource usage per tenant
|
|
type QuotaManager struct {
|
|
usage map[string]*TenantUsage
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// TenantUsage tracks current resource consumption
|
|
type TenantUsage struct {
|
|
TenantID string `json:"tenant_id"`
|
|
ActiveJobs int `json:"active_jobs"`
|
|
GPUsAllocated int `json:"gpus_allocated"`
|
|
MemoryGBUsed int `json:"memory_gb_used"`
|
|
StorageGBUsed int `json:"storage_gb_used"`
|
|
CPUCoresUsed int `json:"cpu_cores_used"`
|
|
ArtifactsThisHour int `json:"artifacts_this_hour"`
|
|
LastReset time.Time `json:"last_reset"`
|
|
}
|
|
|
|
// NewQuotaManager creates a new quota manager
|
|
func NewQuotaManager() *QuotaManager {
|
|
return &QuotaManager{
|
|
usage: make(map[string]*TenantUsage),
|
|
}
|
|
}
|
|
|
|
// GetUsage returns current usage for a tenant
|
|
func (qm *QuotaManager) GetUsage(tenantID string) *TenantUsage {
|
|
qm.mu.RLock()
|
|
defer qm.mu.RUnlock()
|
|
|
|
if usage, exists := qm.usage[tenantID]; exists {
|
|
return usage
|
|
}
|
|
return &TenantUsage{
|
|
TenantID: tenantID,
|
|
LastReset: time.Now().UTC(),
|
|
}
|
|
}
|
|
|
|
// CheckQuota verifies if a requested operation fits within tenant quotas
|
|
func (qm *QuotaManager) CheckQuota(tenantID string, quota ResourceQuota, req ResourceRequest) error {
|
|
qm.mu.RLock()
|
|
defer qm.mu.RUnlock()
|
|
|
|
usage := qm.getOrCreateUsage(tenantID)
|
|
|
|
// Reset hourly counters if needed
|
|
if time.Since(usage.LastReset) > time.Hour {
|
|
usage.ArtifactsThisHour = 0
|
|
usage.LastReset = time.Now().UTC()
|
|
}
|
|
|
|
// Check each resource
|
|
if usage.ActiveJobs+req.Jobs > quota.MaxConcurrentJobs {
|
|
return fmt.Errorf("quota exceeded: concurrent jobs %d/%d", usage.ActiveJobs+req.Jobs, quota.MaxConcurrentJobs)
|
|
}
|
|
|
|
if usage.GPUsAllocated+req.GPUs > quota.MaxGPUs {
|
|
return fmt.Errorf("quota exceeded: GPUs %d/%d", usage.GPUsAllocated+req.GPUs, quota.MaxGPUs)
|
|
}
|
|
|
|
if usage.MemoryGBUsed+req.MemoryGB > quota.MaxMemoryGB {
|
|
return fmt.Errorf("quota exceeded: memory %d/%d GB", usage.MemoryGBUsed+req.MemoryGB, quota.MaxMemoryGB)
|
|
}
|
|
|
|
if usage.StorageGBUsed+req.StorageGB > quota.MaxStorageGB {
|
|
return fmt.Errorf("quota exceeded: storage %d/%d GB", usage.StorageGBUsed+req.StorageGB, quota.MaxStorageGB)
|
|
}
|
|
|
|
if usage.CPUCoresUsed+req.CPUCores > quota.MaxCPUCores {
|
|
return fmt.Errorf("quota exceeded: CPU cores %d/%d", usage.CPUCoresUsed+req.CPUCores, quota.MaxCPUCores)
|
|
}
|
|
|
|
if req.Artifacts > 0 && usage.ArtifactsThisHour+req.Artifacts > quota.MaxArtifactsPerHour {
|
|
return fmt.Errorf("quota exceeded: artifacts per hour %d/%d", usage.ArtifactsThisHour+req.Artifacts, quota.MaxArtifactsPerHour)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Allocate reserves resources for a tenant
|
|
func (qm *QuotaManager) Allocate(tenantID string, req ResourceRequest) error {
|
|
qm.mu.Lock()
|
|
defer qm.mu.Unlock()
|
|
|
|
usage := qm.getOrCreateUsage(tenantID)
|
|
|
|
usage.ActiveJobs += req.Jobs
|
|
usage.GPUsAllocated += req.GPUs
|
|
usage.MemoryGBUsed += req.MemoryGB
|
|
usage.StorageGBUsed += req.StorageGB
|
|
usage.CPUCoresUsed += req.CPUCores
|
|
|
|
return nil
|
|
}
|
|
|
|
// Release frees resources for a tenant
|
|
func (qm *QuotaManager) Release(tenantID string, req ResourceRequest) {
|
|
qm.mu.Lock()
|
|
defer qm.mu.Unlock()
|
|
|
|
usage, exists := qm.usage[tenantID]
|
|
if !exists {
|
|
return
|
|
}
|
|
|
|
usage.ActiveJobs = max(0, usage.ActiveJobs-req.Jobs)
|
|
usage.GPUsAllocated = max(0, usage.GPUsAllocated-req.GPUs)
|
|
usage.MemoryGBUsed = max(0, usage.MemoryGBUsed-req.MemoryGB)
|
|
usage.StorageGBUsed = max(0, usage.StorageGBUsed-req.StorageGB)
|
|
usage.CPUCoresUsed = max(0, usage.CPUCoresUsed-req.CPUCores)
|
|
}
|
|
|
|
// RecordArtifact increments the artifact counter for a tenant
|
|
func (qm *QuotaManager) RecordArtifact(tenantID string) {
|
|
qm.mu.Lock()
|
|
defer qm.mu.Unlock()
|
|
|
|
usage := qm.getOrCreateUsage(tenantID)
|
|
|
|
// Reset if needed
|
|
if time.Since(usage.LastReset) > time.Hour {
|
|
usage.ArtifactsThisHour = 0
|
|
usage.LastReset = time.Now().UTC()
|
|
}
|
|
|
|
usage.ArtifactsThisHour++
|
|
}
|
|
|
|
func (qm *QuotaManager) getOrCreateUsage(tenantID string) *TenantUsage {
|
|
if usage, exists := qm.usage[tenantID]; exists {
|
|
return usage
|
|
}
|
|
|
|
usage := &TenantUsage{
|
|
TenantID: tenantID,
|
|
LastReset: time.Now().UTC(),
|
|
}
|
|
qm.usage[tenantID] = usage
|
|
return usage
|
|
}
|
|
|
|
// ResourceRequest represents a request for resources
|
|
type ResourceRequest struct {
|
|
Jobs int
|
|
GPUs int
|
|
MemoryGB int
|
|
StorageGB int
|
|
CPUCores int
|
|
Artifacts int
|
|
}
|
|
|
|
// AuditLogger handles per-tenant audit logging
|
|
type AuditLogger struct {
|
|
basePath string
|
|
loggers map[string]*logging.Logger
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// AuditEventType represents different types of audit events
|
|
type AuditEventType string
|
|
|
|
const (
|
|
AuditTenantCreated AuditEventType = "tenant_created"
|
|
AuditTenantDeactivated AuditEventType = "tenant_deactivated"
|
|
AuditTenantUpdated AuditEventType = "tenant_updated"
|
|
AuditResourceAccess AuditEventType = "resource_access"
|
|
AuditResourceCreated AuditEventType = "resource_created"
|
|
AuditResourceDeleted AuditEventType = "resource_deleted"
|
|
AuditJobSubmitted AuditEventType = "job_submitted"
|
|
AuditJobCompleted AuditEventType = "job_completed"
|
|
AuditJobFailed AuditEventType = "job_failed"
|
|
AuditCrossTenantDeny AuditEventType = "cross_tenant_deny"
|
|
AuditQuotaExceeded AuditEventType = "quota_exceeded"
|
|
AuditWorkerSanitized AuditEventType = "worker_sanitized"
|
|
AuditEncryptionOp AuditEventType = "encryption_op"
|
|
AuditDecryptionOp AuditEventType = "decryption_op"
|
|
)
|
|
|
|
// AuditEvent represents a single audit log entry
|
|
type AuditEvent struct {
|
|
Type AuditEventType `json:"type"`
|
|
TenantID string `json:"tenant_id"`
|
|
UserID string `json:"user_id,omitempty"`
|
|
ResourceID string `json:"resource_id,omitempty"`
|
|
JobID string `json:"job_id,omitempty"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Success bool `json:"success"`
|
|
Details map[string]any `json:"details,omitempty"`
|
|
IPAddress string `json:"ip_address,omitempty"`
|
|
}
|
|
|
|
// NewAuditLogger creates a new per-tenant audit logger
|
|
func NewAuditLogger(basePath string) *AuditLogger {
|
|
return &AuditLogger{
|
|
basePath: basePath,
|
|
loggers: make(map[string]*logging.Logger),
|
|
}
|
|
}
|
|
|
|
// LogEvent logs an audit event for a specific tenant
|
|
func (al *AuditLogger) LogEvent(ctx context.Context, event AuditEvent) error {
|
|
al.mu.Lock()
|
|
defer al.mu.Unlock()
|
|
|
|
// Get or create tenant-specific logger
|
|
logger, err := al.getOrCreateLogger(event.TenantID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get audit logger for tenant %s: %w", event.TenantID, err)
|
|
}
|
|
|
|
// Log the event
|
|
logger.Info("audit_event",
|
|
"type", event.Type,
|
|
"tenant_id", event.TenantID,
|
|
"user_id", event.UserID,
|
|
"resource_id", event.ResourceID,
|
|
"job_id", event.JobID,
|
|
"timestamp", event.Timestamp.Format(time.RFC3339Nano),
|
|
"success", event.Success,
|
|
"details", event.Details,
|
|
"ip_address", event.IPAddress,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// QueryEvents queries audit events for a tenant (placeholder for future implementation)
|
|
func (al *AuditLogger) QueryEvents(tenantID string, start, end time.Time, eventTypes []AuditEventType) ([]AuditEvent, error) {
|
|
// In production, this would query from a centralized logging system
|
|
// For now, return empty
|
|
return []AuditEvent{}, nil
|
|
}
|
|
|
|
func (al *AuditLogger) getOrCreateLogger(tenantID string) (*logging.Logger, error) {
|
|
if logger, exists := al.loggers[tenantID]; exists {
|
|
return logger, nil
|
|
}
|
|
|
|
// Create tenant audit directory
|
|
tenantAuditPath := filepath.Join(al.basePath, tenantID)
|
|
if err := os.MkdirAll(tenantAuditPath, 0750); err != nil {
|
|
return nil, fmt.Errorf("failed to create audit directory: %w", err)
|
|
}
|
|
|
|
// Create logger for this tenant (JSON format to file)
|
|
logger := logging.NewFileLogger(slog.LevelInfo, true, filepath.Join(tenantAuditPath, "audit.log"))
|
|
|
|
al.loggers[tenantID] = logger
|
|
return logger, nil
|
|
}
|