Update Jupyter integration for security and scheduler support: - Enhanced security configuration with audit logging - Health monitoring with scheduler event integration - Package manager with network policy enforcement - Service manager with lifecycle hooks - Network manager with tenant isolation - Workspace metadata with tenant tags - Config with resource limits - Podman container integration improvements - Experiment manager with tracking integration - Manifest runner with security checks
419 lines
11 KiB
Go
419 lines
11 KiB
Go
package jupyter
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jfraeys/fetch_ml/internal/config"
|
|
"github.com/jfraeys/fetch_ml/internal/fileutil"
|
|
"github.com/jfraeys/fetch_ml/internal/logging"
|
|
)
|
|
|
|
// WorkspaceMetadata tracks the relationship between Jupyter workspaces and experiments
|
|
type WorkspaceMetadata struct {
|
|
LinkedAt time.Time `json:"linked_at"`
|
|
LastSync time.Time `json:"last_sync"`
|
|
AdditionalData map[string]string `json:"additional_data"`
|
|
WorkspacePath string `json:"workspace_path"`
|
|
ExperimentID string `json:"experiment_id"`
|
|
ServiceID string `json:"service_id,omitempty"`
|
|
SyncDirection string `json:"sync_direction"`
|
|
Tags []string `json:"tags"`
|
|
SyncInterval time.Duration `json:"sync_interval"`
|
|
AutoSync bool `json:"auto_sync"`
|
|
}
|
|
|
|
// WorkspaceMetadataManager manages workspace metadata
|
|
type WorkspaceMetadataManager struct {
|
|
logger *logging.Logger
|
|
metadata map[string]*WorkspaceMetadata
|
|
dataFile string
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewWorkspaceMetadataManager creates a new workspace metadata manager
|
|
func NewWorkspaceMetadataManager(
|
|
logger *logging.Logger,
|
|
dataFile string,
|
|
) *WorkspaceMetadataManager {
|
|
wmm := &WorkspaceMetadataManager{
|
|
logger: logger,
|
|
metadata: make(map[string]*WorkspaceMetadata),
|
|
dataFile: dataFile,
|
|
}
|
|
|
|
// Load existing metadata
|
|
if err := wmm.loadMetadata(); err != nil {
|
|
logger.Warn("failed to load workspace metadata", "error", err)
|
|
}
|
|
|
|
return wmm
|
|
}
|
|
|
|
// LinkWorkspace links a workspace with an experiment
|
|
func (wmm *WorkspaceMetadataManager) LinkWorkspace(
|
|
workspacePath,
|
|
experimentID,
|
|
serviceID string,
|
|
) error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
// Resolve absolute path
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
// Create metadata
|
|
metadata := &WorkspaceMetadata{
|
|
WorkspacePath: absPath,
|
|
ExperimentID: experimentID,
|
|
ServiceID: serviceID,
|
|
LinkedAt: time.Now(),
|
|
LastSync: time.Time{}, // Zero value indicates no sync yet
|
|
SyncDirection: "bidirectional",
|
|
AutoSync: false,
|
|
SyncInterval: 30 * time.Minute,
|
|
Tags: []string{},
|
|
AdditionalData: make(map[string]string),
|
|
}
|
|
|
|
// Store metadata
|
|
wmm.metadata[absPath] = metadata
|
|
|
|
// Save to disk
|
|
if err := wmm.saveMetadata(); err != nil {
|
|
wmm.logger.Error("failed to save workspace metadata", "error", err)
|
|
return err
|
|
}
|
|
|
|
wmm.logger.Info("workspace linked with experiment",
|
|
"workspace", absPath,
|
|
"experiment_id", experimentID,
|
|
"service_id", serviceID)
|
|
|
|
// Create metadata file in workspace
|
|
if err := wmm.createWorkspaceMetadataFile(absPath, metadata); err != nil {
|
|
wmm.logger.Warn("failed to create workspace metadata file", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetWorkspaceMetadata retrieves metadata for a workspace
|
|
func (wmm *WorkspaceMetadataManager) GetWorkspaceMetadata(
|
|
workspacePath string,
|
|
) (*WorkspaceMetadata, error) {
|
|
wmm.mutex.RLock()
|
|
defer wmm.mutex.RUnlock()
|
|
|
|
// Resolve absolute path
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
metadata, exists := wmm.metadata[absPath]
|
|
if !exists {
|
|
return nil, fmt.Errorf("workspace not linked: %s", absPath)
|
|
}
|
|
|
|
return metadata, nil
|
|
}
|
|
|
|
// UpdateSyncTime updates the last sync time for a workspace
|
|
func (wmm *WorkspaceMetadataManager) UpdateSyncTime(workspacePath string, direction string) error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
metadata, exists := wmm.metadata[absPath]
|
|
if !exists {
|
|
return fmt.Errorf("workspace not linked: %s", absPath)
|
|
}
|
|
|
|
metadata.LastSync = time.Now()
|
|
if direction != "" {
|
|
metadata.SyncDirection = direction
|
|
}
|
|
|
|
return wmm.saveMetadata()
|
|
}
|
|
|
|
// ListLinkedWorkspaces returns all linked workspaces
|
|
func (wmm *WorkspaceMetadataManager) ListLinkedWorkspaces() []*WorkspaceMetadata {
|
|
wmm.mutex.RLock()
|
|
defer wmm.mutex.RUnlock()
|
|
|
|
workspaces := make([]*WorkspaceMetadata, 0, len(wmm.metadata))
|
|
for _, metadata := range wmm.metadata {
|
|
workspaces = append(workspaces, metadata)
|
|
}
|
|
|
|
return workspaces
|
|
}
|
|
|
|
// UnlinkWorkspace removes the link between workspace and experiment
|
|
func (wmm *WorkspaceMetadataManager) UnlinkWorkspace(workspacePath string) error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
if _, exists := wmm.metadata[absPath]; !exists {
|
|
return fmt.Errorf("workspace not linked: %s", absPath)
|
|
}
|
|
|
|
delete(wmm.metadata, absPath)
|
|
|
|
// Save to disk
|
|
if err := wmm.saveMetadata(); err != nil {
|
|
wmm.logger.Error("failed to save workspace metadata", "error", err)
|
|
return err
|
|
}
|
|
|
|
// Remove workspace metadata file
|
|
workspaceMetaFile := filepath.Join(absPath, ".jupyter_experiment.json")
|
|
if err := os.Remove(workspaceMetaFile); err != nil && !os.IsNotExist(err) {
|
|
wmm.logger.Warn(
|
|
"failed to remove workspace metadata file",
|
|
"file",
|
|
workspaceMetaFile,
|
|
"error",
|
|
err,
|
|
)
|
|
}
|
|
|
|
wmm.logger.Info("workspace unlinked", "workspace", absPath)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ClearAllMetadata clears all workspace metadata
|
|
func (wmm *WorkspaceMetadataManager) ClearAllMetadata() error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
wmm.metadata = make(map[string]*WorkspaceMetadata)
|
|
|
|
// Save to disk
|
|
if err := wmm.saveMetadata(); err != nil {
|
|
wmm.logger.Error("failed to save cleared workspace metadata", "error", err)
|
|
return err
|
|
}
|
|
|
|
wmm.logger.Info("all workspace metadata cleared")
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetAutoSync enables or disables auto-sync for a workspace
|
|
func (wmm *WorkspaceMetadataManager) SetAutoSync(
|
|
workspacePath string,
|
|
enabled bool,
|
|
interval time.Duration,
|
|
) error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
metadata, exists := wmm.metadata[absPath]
|
|
if !exists {
|
|
return fmt.Errorf("workspace not linked: %s", absPath)
|
|
}
|
|
|
|
metadata.AutoSync = enabled
|
|
if interval > 0 {
|
|
metadata.SyncInterval = interval
|
|
}
|
|
|
|
return wmm.saveMetadata()
|
|
}
|
|
|
|
// AddTag adds a tag to workspace metadata
|
|
func (wmm *WorkspaceMetadataManager) AddTag(workspacePath, tag string) error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
metadata, exists := wmm.metadata[absPath]
|
|
if !exists {
|
|
return fmt.Errorf("workspace not linked: %s", absPath)
|
|
}
|
|
|
|
// Check if tag already exists
|
|
for _, existingTag := range metadata.Tags {
|
|
if existingTag == tag {
|
|
return nil // Tag already exists
|
|
}
|
|
}
|
|
|
|
metadata.Tags = append(metadata.Tags, tag)
|
|
|
|
return wmm.saveMetadata()
|
|
}
|
|
|
|
// SetAdditionalData sets additional data for a workspace
|
|
func (wmm *WorkspaceMetadataManager) SetAdditionalData(workspacePath, key, value string) error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
absPath, err := filepath.Abs(workspacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve workspace path: %w", err)
|
|
}
|
|
|
|
metadata, exists := wmm.metadata[absPath]
|
|
if !exists {
|
|
return fmt.Errorf("workspace not linked: %s", absPath)
|
|
}
|
|
|
|
if metadata.AdditionalData == nil {
|
|
metadata.AdditionalData = make(map[string]string)
|
|
}
|
|
|
|
metadata.AdditionalData[key] = value
|
|
|
|
return wmm.saveMetadata()
|
|
}
|
|
|
|
// loadMetadata loads metadata from disk
|
|
func (wmm *WorkspaceMetadataManager) loadMetadata() error {
|
|
if _, err := os.Stat(wmm.dataFile); os.IsNotExist(err) {
|
|
return nil // No existing metadata
|
|
}
|
|
|
|
data, err := os.ReadFile(wmm.dataFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read metadata file: %w", err)
|
|
}
|
|
|
|
var metadata map[string]*WorkspaceMetadata
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
return fmt.Errorf("failed to parse metadata file: %w", err)
|
|
}
|
|
|
|
wmm.metadata = metadata
|
|
|
|
wmm.logger.Info("workspace metadata loaded", "count", len(metadata))
|
|
|
|
return nil
|
|
}
|
|
|
|
// saveMetadata saves metadata to disk using PathRegistry with crash safety (fsync)
|
|
func (wmm *WorkspaceMetadataManager) saveMetadata() error {
|
|
// Use PathRegistry for consistent directory creation
|
|
paths := config.FromEnv()
|
|
if err := paths.EnsureDir(filepath.Dir(wmm.dataFile)); err != nil {
|
|
return fmt.Errorf("failed to create metadata directory: %w", err)
|
|
}
|
|
|
|
data, err := json.MarshalIndent(wmm.metadata, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal metadata: %w", err)
|
|
}
|
|
|
|
// SECURITY: Write with fsync for crash safety
|
|
if err := fileutil.WriteFileSafe(wmm.dataFile, data, 0644); err != nil {
|
|
return fmt.Errorf("failed to write metadata file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createWorkspaceMetadataFile creates a metadata file in the workspace directory
|
|
func (wmm *WorkspaceMetadataManager) createWorkspaceMetadataFile(
|
|
workspacePath string,
|
|
metadata *WorkspaceMetadata,
|
|
) error {
|
|
workspaceMetaFile := filepath.Join(workspacePath, ".jupyter_experiment.json")
|
|
|
|
// Create a simplified version for the workspace
|
|
workspaceMeta := map[string]interface{}{
|
|
"experiment_id": metadata.ExperimentID,
|
|
"service_id": metadata.ServiceID,
|
|
"linked_at": metadata.LinkedAt.Unix(),
|
|
"last_sync": metadata.LastSync.Unix(),
|
|
"sync_direction": metadata.SyncDirection,
|
|
"auto_sync": metadata.AutoSync,
|
|
"jupyter_integration": true,
|
|
"workspace_path": metadata.WorkspacePath,
|
|
"tags": metadata.Tags,
|
|
}
|
|
|
|
data, err := json.MarshalIndent(workspaceMeta, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal workspace metadata: %w", err)
|
|
}
|
|
|
|
// SECURITY: Write with fsync for crash safety
|
|
if err := fileutil.WriteFileSafe(workspaceMetaFile, data, 0600); err != nil {
|
|
return fmt.Errorf("failed to write workspace metadata file: %w", err)
|
|
}
|
|
|
|
wmm.logger.Info("workspace metadata file created", "file", workspaceMetaFile)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetWorkspacesForExperiment returns all workspaces linked to an experiment
|
|
func (wmm *WorkspaceMetadataManager) GetWorkspacesForExperiment(
|
|
experimentID string,
|
|
) []*WorkspaceMetadata {
|
|
wmm.mutex.RLock()
|
|
defer wmm.mutex.RUnlock()
|
|
|
|
var workspaces []*WorkspaceMetadata
|
|
for _, metadata := range wmm.metadata {
|
|
if metadata.ExperimentID == experimentID {
|
|
workspaces = append(workspaces, metadata)
|
|
}
|
|
}
|
|
|
|
return workspaces
|
|
}
|
|
|
|
// Cleanup removes metadata for workspaces that no longer exist
|
|
func (wmm *WorkspaceMetadataManager) Cleanup() error {
|
|
wmm.mutex.Lock()
|
|
defer wmm.mutex.Unlock()
|
|
|
|
var toRemove []string
|
|
|
|
for workspacePath := range wmm.metadata {
|
|
if _, err := os.Stat(workspacePath); os.IsNotExist(err) {
|
|
toRemove = append(toRemove, workspacePath)
|
|
}
|
|
}
|
|
|
|
for _, workspacePath := range toRemove {
|
|
delete(wmm.metadata, workspacePath)
|
|
wmm.logger.Info("removed metadata for non-existent workspace", "workspace", workspacePath)
|
|
}
|
|
|
|
if len(toRemove) > 0 {
|
|
return wmm.saveMetadata()
|
|
}
|
|
|
|
return nil
|
|
}
|