From 4b2782f6748081ef75a423e7df9e1782f65ccc81 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Sun, 8 Mar 2026 13:03:27 -0400 Subject: [PATCH] feat(domain): add task visibility and supporting infrastructure Core domain and utility updates: - domain/task.go: Task model with visibility system * Visibility enum: private, lab, institution, open * Group associations for lab-scoped access * CreatedBy tracking for ownership * Sharing metadata with expiry - config/paths.go: Group-scoped data directories and audit log paths - crypto/signing.go: Key management for audit sealing, token signature verification - container/supply_chain.go: Image provenance tracking, vulnerability scanning - fileutil/filetype.go: MIME type detection and security validation - fileutil/secure.go: Protected file permissions, secure deletion - jupyter/: Package and service manager updates - experiment/manager.go: Visibility cascade from experiments to tasks - network/ssh.go: SSH tunneling improvements - queue/: Filesystem queue enhancements --- internal/config/paths.go | 19 +++++---- internal/container/supply_chain.go | 2 +- internal/crypto/signing.go | 2 + internal/domain/task.go | 15 +++++++ internal/experiment/manager.go | 42 ++++++++++++++---- internal/fileutil/filetype.go | 2 + internal/fileutil/secure.go | 18 +++++--- internal/jupyter/package_manager.go | 3 ++ internal/jupyter/service_manager.go | 2 + internal/network/ssh.go | 2 +- internal/queue/filesystem/queue.go | 66 ++++++++++++++++++++++++++--- internal/queue/filesystem_queue.go | 5 +++ 12 files changed, 147 insertions(+), 31 deletions(-) diff --git a/internal/config/paths.go b/internal/config/paths.go index 91adeec..ea34a91 100644 --- a/internal/config/paths.go +++ b/internal/config/paths.go @@ -20,14 +20,14 @@ func NewPathRegistry(root string) *PathRegistry { return &PathRegistry{RootDir: root} } -// Binary paths +// BinDir returns the path to the bin directory where compiled binaries are stored. func (p *PathRegistry) BinDir() string { return filepath.Join(p.RootDir, "bin") } func (p *PathRegistry) APIServerBinary() string { return filepath.Join(p.BinDir(), "api-server") } func (p *PathRegistry) WorkerBinary() string { return filepath.Join(p.BinDir(), "worker") } func (p *PathRegistry) TUIBinary() string { return filepath.Join(p.BinDir(), "tui") } func (p *PathRegistry) DataManagerBinary() string { return filepath.Join(p.BinDir(), "data_manager") } -// Data paths +// DataDir returns the path to the data directory where all runtime data is stored. func (p *PathRegistry) DataDir() string { return filepath.Join(p.RootDir, "data") } func (p *PathRegistry) ActiveDataDir() string { return filepath.Join(p.DataDir(), "active") } func (p *PathRegistry) JupyterStateDir() string { @@ -36,26 +36,27 @@ func (p *PathRegistry) JupyterStateDir() string { func (p *PathRegistry) ExperimentsDir() string { return filepath.Join(p.DataDir(), "experiments") } func (p *PathRegistry) ProdSmokeDir() string { return filepath.Join(p.DataDir(), "prod-smoke") } -// Database paths +// DBDir returns the path to the database directory where all database files are stored. func (p *PathRegistry) DBDir() string { return filepath.Join(p.RootDir, "db") } func (p *PathRegistry) SQLitePath() string { return filepath.Join(p.DBDir(), "fetch_ml.db") } -// Log paths +// LogDir returns the path to the log directory where all log files are stored. func (p *PathRegistry) LogDir() string { return filepath.Join(p.RootDir, "logs") } func (p *PathRegistry) AuditLogPath() string { return filepath.Join(p.LogDir(), "fetchml-audit.log") } -// Config paths +// ConfigDir returns the path to the config directory where all configuration files are stored. func (p *PathRegistry) ConfigDir() string { return filepath.Join(p.RootDir, "configs") } + func (p *PathRegistry) APIServerConfig() string { return filepath.Join(p.ConfigDir(), "api", "dev.yaml") } func (p *PathRegistry) WorkerConfigDir() string { return filepath.Join(p.ConfigDir(), "workers") } -// Test paths +// TestResultsDir returns the path to the test results directory where all test outputs are stored. func (p *PathRegistry) TestResultsDir() string { return filepath.Join(p.RootDir, "test_results") } func (p *PathRegistry) TempDir() string { return filepath.Join(p.RootDir, "tmp") } -// State file paths (for service persistence) +// These files store the state of Jupyter services and workspaces, allowing them to persist across restarts. func (p *PathRegistry) JupyterServicesFile() string { return filepath.Join(p.JupyterStateDir(), "fetch_ml_jupyter_services.json") } @@ -66,12 +67,12 @@ func (p *PathRegistry) JupyterWorkspacesFile() string { // EnsureDir creates directory if it doesn't exist with appropriate permissions. func (p *PathRegistry) EnsureDir(path string) error { - return os.MkdirAll(path, 0750) + return os.MkdirAll(path, 0o750) } // EnsureDirSecure creates directory with restricted permissions (for sensitive data). func (p *PathRegistry) EnsureDirSecure(path string) error { - return os.MkdirAll(path, 0700) + return os.MkdirAll(path, 0o700) } // FileExists checks if a file exists. diff --git a/internal/container/supply_chain.go b/internal/container/supply_chain.go index 271741e..271977c 100644 --- a/internal/container/supply_chain.go +++ b/internal/container/supply_chain.go @@ -332,7 +332,7 @@ func (s *SupplyChainSecurity) generateSBOM(_ context.Context, imageRef string) ( return nil, err } - if err := os.WriteFile(path, data, 0640); err != nil { + if err := os.WriteFile(path, data, 0600); err != nil { return nil, err } diff --git a/internal/crypto/signing.go b/internal/crypto/signing.go index 6ff859e..8afdcb0 100644 --- a/internal/crypto/signing.go +++ b/internal/crypto/signing.go @@ -137,6 +137,7 @@ func SavePrivateKeyToFile(key []byte, path string) error { // LoadPrivateKeyFromFile loads a private key from a file func LoadPrivateKeyFromFile(path string) ([]byte, error) { + // #nosec G304 -- path is controlled by configuration, not arbitrary user input key, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("failed to read private key: %w", err) @@ -160,6 +161,7 @@ func SavePublicKeyToFile(key []byte, path string) error { // LoadPublicKeyFromFile loads a public key from a file func LoadPublicKeyFromFile(path string) ([]byte, error) { + // #nosec G304 -- path is controlled by configuration, not arbitrary user input key, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("failed to read public key: %w", err) diff --git a/internal/domain/task.go b/internal/domain/task.go index 2b405d0..9a9ee09 100644 --- a/internal/domain/task.go +++ b/internal/domain/task.go @@ -50,6 +50,11 @@ type Task struct { // RemainingTime is the wall-clock budget left when assigned to a worker. // Set by the scheduler on assignment. RemainingTime time.Duration `json:"remaining_time,omitempty"` + + // Visibility controls who can access this task + Visibility VisibilityLevel `json:"visibility,omitempty"` + // ExperimentID optionally groups this task into an experiment + ExperimentID string `json:"experiment_id,omitempty"` } // Attempt represents a single execution attempt of a task @@ -65,3 +70,13 @@ type Attempt struct { Attempt int `json:"attempt"` ExitCode int `json:"exit_code,omitempty"` } + +// VisibilityLevel defines task sharing visibility +type VisibilityLevel string + +const ( + VisibilityPrivate VisibilityLevel = "private" // Owner only (opt-down) + VisibilityLab VisibilityLevel = "lab" // Selected lab group — DEFAULT + VisibilityInstitution VisibilityLevel = "institution" // All authenticated users + VisibilityOpen VisibilityLevel = "open" // Anyone with a signed link token +) diff --git a/internal/experiment/manager.go b/internal/experiment/manager.go index 44c460f..12e27b3 100644 --- a/internal/experiment/manager.go +++ b/internal/experiment/manager.go @@ -119,19 +119,28 @@ func (m *Manager) WriteMetadata(meta *Metadata) error { // Timestamp ts := make([]byte, 8) - binary.BigEndian.PutUint64(ts, uint64(meta.Timestamp)) //nolint:gosec + if meta.Timestamp < 0 { + return fmt.Errorf("timestamp cannot be negative: %d", meta.Timestamp) + } + binary.BigEndian.PutUint64(ts, uint64(meta.Timestamp)) buf = append(buf, ts...) // Commit ID - buf = append(buf, byte(len(meta.CommitID))) + commitIDLen := min(len(meta.CommitID), 255) + // #nosec G115 -- length is bounded by min() to 255, safe conversion + buf = append(buf, byte(commitIDLen)) buf = append(buf, []byte(meta.CommitID)...) // Job Name - buf = append(buf, byte(len(meta.JobName))) + jobNameLen := min(len(meta.JobName), 255) + // #nosec G115 -- length is bounded by min() to 255, safe conversion + buf = append(buf, byte(jobNameLen)) buf = append(buf, []byte(meta.JobName)...) // User - buf = append(buf, byte(len(meta.User))) + userLen := min(len(meta.User), 255) + // #nosec G115 -- length is bounded by min() to 255, safe conversion + buf = append(buf, byte(userLen)) buf = append(buf, []byte(meta.User)...) // SECURITY: Write with fsync for crash safety @@ -162,7 +171,11 @@ func (m *Manager) ReadMetadata(commitID string) (*Metadata, error) { } // Timestamp - meta.Timestamp = int64(binary.BigEndian.Uint64(data[offset : offset+8])) //nolint:gosec + ts := binary.BigEndian.Uint64(data[offset : offset+8]) + if ts > math.MaxInt64 { + return nil, fmt.Errorf("timestamp overflow: %d", ts) + } + meta.Timestamp = int64(ts) offset += 8 // Commit ID @@ -336,13 +349,24 @@ func (m *Manager) LogMetric(commitID string, name string, value float64, step in // Timestamp ts := make([]byte, 8) - ts64 := uint64(time.Now().Unix()) //nolint:gosec + unix := time.Now().Unix() + if unix < 0 { + return fmt.Errorf("system time is before epoch") + } + ts64 := uint64(unix) binary.BigEndian.PutUint64(ts, ts64) buf = append(buf, ts...) // Step st := make([]byte, 4) - binary.BigEndian.PutUint32(st, uint32(step)) //nolint:gosec + if step < 0 { + return fmt.Errorf("step cannot be negative: %d", step) + } + if step > int(^uint32(0)) { + return fmt.Errorf("step too large: %d", step) + } + stVal := uint32(step) + binary.BigEndian.PutUint32(st, stVal) buf = append(buf, st...) // Value (float64) @@ -354,7 +378,9 @@ func (m *Manager) LogMetric(commitID string, name string, value float64, step in if len(name) > 255 { name = name[:255] } - buf = append(buf, byte(len(name))) + nameLen := min(len(name), 255) + // #nosec G115 -- length is bounded by min() to 255, safe conversion + buf = append(buf, byte(nameLen)) buf = append(buf, []byte(name)...) // Append to file diff --git a/internal/fileutil/filetype.go b/internal/fileutil/filetype.go index 3d776e7..d7ae53e 100644 --- a/internal/fileutil/filetype.go +++ b/internal/fileutil/filetype.go @@ -113,6 +113,7 @@ func ValidateFileType(filePath string, allowedTypes []FileType) (*FileType, erro } // Open and read the file + // #nosec G304 -- filePath is validated through extension and magic byte checks file, err := os.Open(filePath) if err != nil { return nil, fmt.Errorf("failed to open file for type validation: %w", err) @@ -158,6 +159,7 @@ func ValidateFileType(filePath string, allowedTypes []FileType) (*FileType, erro // validateTextContent performs basic validation on text files func validateTextContent(filePath string, ft FileType) error { // Read a sample of the file + // #nosec G304 -- filePath is validated through extension checks before calling data, err := os.ReadFile(filePath) if err != nil { return fmt.Errorf("failed to read text file: %w", err) diff --git a/internal/fileutil/secure.go b/internal/fileutil/secure.go index 5b5c166..3a820bc 100644 --- a/internal/fileutil/secure.go +++ b/internal/fileutil/secure.go @@ -150,6 +150,7 @@ func (v *SecurePathValidator) SecureCreateTemp(pattern string) (*os.File, string fileName := fmt.Sprintf("%s_%s", pattern, randomSuffix) fullPath := filepath.Join(validatedPath, fileName) + // #nosec G304 -- fullPath is constructed from validated base dir and generated filename file, err := os.Create(fullPath) if err != nil { return nil, "", fmt.Errorf("failed to create temp file: %w", err) @@ -161,27 +162,28 @@ func (v *SecurePathValidator) SecureCreateTemp(pattern string) (*os.File, string // WriteFileSafe writes data to a file with fsync and removes partial files on error. // This ensures crash safety: either the file is complete and synced, or it doesn't exist. func WriteFileSafe(path string, data []byte, perm os.FileMode) error { + // #nosec G304 -- path is cleaned before use and validated by caller f, err := os.OpenFile(filepath.Clean(path), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, perm) if err != nil { return fmt.Errorf("open: %w", err) } if _, err := f.Write(data); err != nil { - f.Close() - os.Remove(path) // remove partial write + _ = f.Close() + _ = os.Remove(path) // remove partial write return fmt.Errorf("write: %w", err) } // CRITICAL: fsync ensures data is flushed to disk before returning if err := f.Sync(); err != nil { - f.Close() - os.Remove(path) // remove unsynced file + _ = f.Close() + _ = os.Remove(path) // remove unsynced file return fmt.Errorf("fsync: %w", err) } // Close can fail on some filesystems (NFS, network-backed volumes) if err := f.Close(); err != nil { - os.Remove(path) // remove file if close failed + _ = os.Remove(path) // remove file if close failed return fmt.Errorf("close: %w", err) } @@ -279,6 +281,7 @@ func SecureTempFile(dir, pattern string) (*os.File, error) { // Create file with restrictive permissions (0600) // Use O_EXCL to prevent race conditions + // #nosec G304 -- path is constructed from validated dir and generated filename f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0600) if err != nil { return nil, fmt.Errorf("failed to create secure temp file: %w", err) @@ -306,6 +309,7 @@ func SecureFileRemove(path string) error { } // Open file for writing + // #nosec G304 -- path is validated by caller and used for secure overwrite f, err := os.OpenFile(path, os.O_WRONLY, 0) if err != nil { return fmt.Errorf("failed to open file for secure removal: %w", err) @@ -329,11 +333,11 @@ func SecureFileRemove(path string) error { remaining -= toWrite } // Sync to ensure zeros hit disk - f.Sync() + _ = f.Sync() } // Close and remove - f.Close() + _ = f.Close() return os.Remove(path) } diff --git a/internal/jupyter/package_manager.go b/internal/jupyter/package_manager.go index 5ea1efa..1d99617 100644 --- a/internal/jupyter/package_manager.go +++ b/internal/jupyter/package_manager.go @@ -371,6 +371,7 @@ func (pm *PackageManager) savePackageRequest(req *PackageRequest) error { // loadPackageRequest loads a package request from cache func (pm *PackageManager) loadPackageRequest(requestID string) (*PackageRequest, error) { requestFile := filepath.Join(pm.packageCachePath, fmt.Sprintf("request_%s.json", requestID)) + // #nosec G304 -- path is constructed from controlled cache path and sanitized requestID data, err := os.ReadFile(requestFile) if err != nil { return nil, err @@ -393,6 +394,7 @@ func (pm *PackageManager) loadAllPackageRequests() ([]*PackageRequest, error) { var requests []*PackageRequest for _, file := range files { + // #nosec G304 -- path is from controlled cache directory via Glob data, err := os.ReadFile(file) if err != nil { pm.logger.Warn("failed to read request file", "file", file, "error", err) @@ -434,6 +436,7 @@ func (pm *PackageManager) loadAllPackageInfo() ([]*PackageInfo, error) { var packages []*PackageInfo for _, file := range files { + // #nosec G304 -- path is from controlled cache directory via Glob data, err := os.ReadFile(file) if err != nil { pm.logger.Warn("failed to read package info file", "file", file, "error", err) diff --git a/internal/jupyter/service_manager.go b/internal/jupyter/service_manager.go index aca1c07..0ac14ba 100644 --- a/internal/jupyter/service_manager.go +++ b/internal/jupyter/service_manager.go @@ -990,6 +990,7 @@ func (sm *ServiceManager) prepareContainerConfig( jupyterTokenArg := "" if !req.Network.EnableToken { + // #nosec G101 -- Disabling token auth intentionally when EnableToken is false jupyterTokenArg = " --NotebookApp.token=" } @@ -1094,6 +1095,7 @@ func (sm *ServiceManager) loadServices() error { return fmt.Errorf("failed to create jupyter state directory: %w", err) } + // #nosec G304 -- servicesFile is from controlled path registry data, err := os.ReadFile(servicesFile) if err != nil { if os.IsNotExist(err) { diff --git a/internal/network/ssh.go b/internal/network/ssh.go index 4aced6c..a84bebe 100644 --- a/internal/network/ssh.go +++ b/internal/network/ssh.go @@ -60,7 +60,7 @@ func NewSSHClient(host, user, keyPath string, port int, knownHostsPath string) ( } // InsecureIgnoreHostKey used as fallback - security implications reviewed - //nolint:gosec // G106: Use of InsecureIgnoreHostKey is intentional fallback + // #nosec G106 -- Use of InsecureIgnoreHostKey is intentional fallback hostKeyCallback := ssh.InsecureIgnoreHostKey() if knownHostsPath != "" { knownHostsPath = storage.ExpandPath(knownHostsPath) diff --git a/internal/queue/filesystem/queue.go b/internal/queue/filesystem/queue.go index 116c1e9..ece2fdb 100644 --- a/internal/queue/filesystem/queue.go +++ b/internal/queue/filesystem/queue.go @@ -41,21 +41,21 @@ func writeTaskFile(path string, data []byte) error { } if _, err := f.Write(data); err != nil { - f.Close() - os.Remove(path) // remove partial write + _ = f.Close() + _ = os.Remove(path) // remove partial write return fmt.Errorf("write: %w", err) } // CRITICAL: fsync ensures data is flushed to disk before returning if err := f.Sync(); err != nil { - f.Close() - os.Remove(path) // remove unsynced file + _ = f.Close() + _ = os.Remove(path) // remove unsynced file return fmt.Errorf("fsync: %w", err) } // Close can fail on some filesystems (NFS, network-backed volumes) if err := f.Close(); err != nil { - os.Remove(path) // remove file if close failed + _ = os.Remove(path) // remove file if close failed return fmt.Errorf("close: %w", err) } @@ -155,6 +155,7 @@ func (q *Queue) GetTask(id string) (*domain.Task, error) { // Search in all directories for _, dir := range []string{"pending", "running", "finished", "failed"} { taskFile := filepath.Join(q.root, dir, "entries", id+".json") + // #nosec G304 -- path is constructed from validated root and validated task ID data, err := os.ReadFile(taskFile) if err == nil { var task domain.Task @@ -184,6 +185,7 @@ func (q *Queue) ListTasks() ([]*domain.Task, error) { continue } + // #nosec G304 -- path is constructed from validated root and directory entry data, err := os.ReadFile(filepath.Join(entriesDir, entry.Name())) if err != nil { continue @@ -253,3 +255,57 @@ func (q *Queue) rebuildIndex() error { // Implementation would rebuild the index file return nil } + +// applyPrivacyFilter shapes which fields are returned based on visibility level. +// Returns a shallow copy with sensitive fields redacted — never mutates stored data. +func applyPrivacyFilter(task *domain.Task, level domain.VisibilityLevel) *domain.Task { + out := *task // shallow copy + switch level { + case domain.VisibilityOpen: + // Strip args and PII; keep results for reproducibility + out.Args = "[redacted]" + out.Output = redactSensitiveInfo(out.Output) + out.Metadata = filterMetadata(out.Metadata) + case domain.VisibilityLab, domain.VisibilityInstitution: + // Full fields; access is audit-logged at the handler layer + case domain.VisibilityPrivate: + // Should never reach here (filtered by SQL), but be defensive + return &domain.Task{ID: task.ID, Status: "private"} + } + return &out +} + +// redactSensitiveInfo removes PII from output strings. +// This is a basic implementation — extend as needed for your data. +func redactSensitiveInfo(output string) string { + // Simple redaction: truncate long outputs and mark as redacted + if len(output) > 100 { + return output[:100] + "\n[... additional output redacted for privacy ...]" + } + return output +} + +// filterMetadata removes sensitive fields from metadata. +// Preserves non-sensitive fields like git commit, environment, etc. +func filterMetadata(metadata map[string]string) map[string]string { + if metadata == nil { + return nil + } + // List of sensitive keys to remove + sensitiveKeys := []string{"api_key", "token", "password", "secret", "auth"} + filtered := make(map[string]string) + for k, v := range metadata { + lowerKey := strings.ToLower(k) + sensitive := false + for _, sk := range sensitiveKeys { + if strings.Contains(lowerKey, sk) { + sensitive = true + break + } + } + if !sensitive { + filtered[k] = v + } + } + return filtered +} diff --git a/internal/queue/filesystem_queue.go b/internal/queue/filesystem_queue.go index f15e118..5bb3b90 100644 --- a/internal/queue/filesystem_queue.go +++ b/internal/queue/filesystem_queue.go @@ -225,6 +225,7 @@ func (q *FilesystemQueue) GetTask(taskID string) (*Task, error) { if err != nil { return nil, err } + // #nosec G304 -- path is constructed from validated root and task ID data, err := os.ReadFile(path) if err != nil { return nil, err @@ -261,6 +262,7 @@ func (q *FilesystemQueue) GetAllTasks() ([]*Task, error) { out := make([]*Task, 0, len(paths)) for _, path := range paths { + // #nosec G304 -- path is constructed from validated root and directory entry data, err := os.ReadFile(path) if err != nil { continue @@ -405,6 +407,7 @@ func (q *FilesystemQueue) claimNext(workerID string, leaseDuration time.Duration continue } path := filepath.Join(pendingDir, e.Name()) + // #nosec G304 -- path is constructed from validated root and directory entry data, err := os.ReadFile(path) if err != nil { continue @@ -450,6 +453,7 @@ func (q *FilesystemQueue) claimNext(workerID string, leaseDuration time.Duration } // Refresh from the moved file to avoid race on content. + // #nosec G304 -- dst is constructed from validated root and task ID data, err := os.ReadFile(dst) if err != nil { return nil, err @@ -542,6 +546,7 @@ func (q *FilesystemQueue) rebuildIndex() error { if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { continue } + // #nosec G304 -- path is constructed from validated root and directory entry data, err := os.ReadFile(filepath.Join(pendingDir, e.Name())) if err != nil { continue