Update utility modules: - File utilities with secure file operations - Environment pool with resource tracking - Error types with scheduler error categories - Logging with audit context support - Network/SSH with connection pooling - Privacy/PII handling with tenant boundaries - Resource manager with scheduler allocation - Security monitor with audit integration - Tracking plugins (MLflow, TensorBoard) with auth - Crypto signing with tenant keys - Database init with multi-user support
209 lines
4.7 KiB
Go
209 lines
4.7 KiB
Go
package tracking
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/logging"
|
|
)
|
|
|
|
// ToolMode represents the provisioning mode for a tracking tool.
|
|
type ToolMode string
|
|
|
|
const (
|
|
// ModeSidecar provisions the tool as a sidecar container.
|
|
ModeSidecar ToolMode = "sidecar"
|
|
// ModeRemote points to a remotely managed instance (no local provisioning).
|
|
ModeRemote ToolMode = "remote"
|
|
// ModeDisabled skips provisioning entirely.
|
|
ModeDisabled ToolMode = "disabled"
|
|
)
|
|
|
|
// ToolConfig specifies how a plugin should be provisioned for a task.
|
|
type ToolConfig struct {
|
|
Settings map[string]any
|
|
Mode ToolMode
|
|
Enabled bool
|
|
}
|
|
|
|
// Plugin defines the behaviour every tracking integration must implement.
|
|
type Plugin interface {
|
|
Name() string
|
|
ProvisionSidecar(ctx context.Context, taskID string, config ToolConfig) (map[string]string, error)
|
|
Teardown(ctx context.Context, taskID string) error
|
|
HealthCheck(ctx context.Context, config ToolConfig) bool
|
|
}
|
|
|
|
// Registry keeps track of registered plugins and their lifecycle per task.
|
|
type Registry struct {
|
|
logger *logging.Logger
|
|
plugins map[string]Plugin
|
|
active map[string][]string
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewRegistry returns a new plugin registry.
|
|
func NewRegistry(logger *logging.Logger) *Registry {
|
|
return &Registry{
|
|
logger: logger,
|
|
plugins: make(map[string]Plugin),
|
|
active: make(map[string][]string),
|
|
}
|
|
}
|
|
|
|
// Register adds a plugin to the registry.
|
|
func (r *Registry) Register(p Plugin) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.plugins[p.Name()] = p
|
|
}
|
|
|
|
// Get retrieves a plugin by name.
|
|
func (r *Registry) Get(name string) (Plugin, bool) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
p, ok := r.plugins[name]
|
|
return p, ok
|
|
}
|
|
|
|
// ProvisionAll provisions configured plugins for a task and merges their environment variables.
|
|
func (r *Registry) ProvisionAll(
|
|
ctx context.Context,
|
|
taskID string,
|
|
configs map[string]ToolConfig,
|
|
) (map[string]string, error) {
|
|
if len(configs) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
env := make(map[string]string)
|
|
var provisioned []string
|
|
|
|
for name, cfg := range configs {
|
|
if !cfg.Enabled || cfg.Mode == ModeDisabled {
|
|
continue
|
|
}
|
|
|
|
plugin, ok := r.Get(name)
|
|
if !ok {
|
|
return nil, fmt.Errorf("tracking plugin %s not registered", name)
|
|
}
|
|
|
|
settingsEnv, err := plugin.ProvisionSidecar(ctx, taskID, cfg)
|
|
if err != nil {
|
|
r.rollback(ctx, taskID, provisioned)
|
|
return nil, fmt.Errorf("failed to provision %s: %w", name, err)
|
|
}
|
|
|
|
for k, v := range settingsEnv {
|
|
env[k] = v
|
|
}
|
|
|
|
if cfg.Mode == ModeSidecar {
|
|
provisioned = append(provisioned, name)
|
|
}
|
|
}
|
|
|
|
if len(provisioned) > 0 {
|
|
r.mu.Lock()
|
|
r.active[taskID] = append(r.active[taskID], provisioned...)
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
return env, nil
|
|
}
|
|
|
|
// TeardownAll stops every plugin that was provisioned for a task.
|
|
func (r *Registry) TeardownAll(ctx context.Context, taskID string) {
|
|
r.mu.Lock()
|
|
plugins := r.active[taskID]
|
|
delete(r.active, taskID)
|
|
r.mu.Unlock()
|
|
|
|
for _, name := range plugins {
|
|
plugin, ok := r.Get(name)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if err := plugin.Teardown(ctx, taskID); err != nil && r.logger != nil {
|
|
r.logger.Warn("tracking teardown failed", "plugin", name, "task_id", taskID, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Registry) rollback(ctx context.Context, taskID string, provisioned []string) {
|
|
for i := len(provisioned) - 1; i >= 0; i-- {
|
|
name := provisioned[i]
|
|
plugin, ok := r.Get(name)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if err := plugin.Teardown(ctx, taskID); err != nil && r.logger != nil {
|
|
r.logger.Warn("rollback failed", "plugin", name, "task_id", taskID, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// PortAllocator manages dynamic port assignments for sidecars.
|
|
type PortAllocator struct {
|
|
used map[int]bool
|
|
start int
|
|
end int
|
|
next int
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewPortAllocator creates a new allocator for a port range.
|
|
func NewPortAllocator(start, end int) *PortAllocator {
|
|
if start <= 0 || end <= 0 || end <= start {
|
|
start = 5500
|
|
end = 5600
|
|
}
|
|
return &PortAllocator{
|
|
start: start,
|
|
end: end,
|
|
next: start,
|
|
used: make(map[int]bool),
|
|
}
|
|
}
|
|
|
|
// Allocate reserves the next available port.
|
|
func (p *PortAllocator) Allocate() (int, error) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
for i := 0; i < p.end-p.start; i++ {
|
|
port := p.next
|
|
p.next++
|
|
if p.next >= p.end {
|
|
p.next = p.start
|
|
}
|
|
if !p.used[port] {
|
|
p.used[port] = true
|
|
return port, nil
|
|
}
|
|
}
|
|
|
|
return 0, fmt.Errorf("no ports available in range %d-%d", p.start, p.end)
|
|
}
|
|
|
|
// Release frees a previously allocated port.
|
|
func (p *PortAllocator) Release(port int) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
delete(p.used, port)
|
|
}
|
|
|
|
// StringSetting safely reads a string from plugin settings.
|
|
func StringSetting(settings map[string]any, key string) string {
|
|
if settings == nil {
|
|
return ""
|
|
}
|
|
if v, ok := settings[key]; ok {
|
|
if str, ok := v.(string); ok {
|
|
return str
|
|
}
|
|
}
|
|
return ""
|
|
}
|