feat(domain): add task visibility and supporting infrastructure

Core domain and utility updates:

- domain/task.go: Task model with visibility system
  * Visibility enum: private, lab, institution, open
  * Group associations for lab-scoped access
  * CreatedBy tracking for ownership
  * Sharing metadata with expiry

- config/paths.go: Group-scoped data directories and audit log paths
- crypto/signing.go: Key management for audit sealing, token signature verification
- container/supply_chain.go: Image provenance tracking, vulnerability scanning
- fileutil/filetype.go: MIME type detection and security validation
- fileutil/secure.go: Protected file permissions, secure deletion
- jupyter/: Package and service manager updates
- experiment/manager.go: Visibility cascade from experiments to tasks
- network/ssh.go: SSH tunneling improvements
- queue/: Filesystem queue enhancements
This commit is contained in:
Jeremie Fraeys 2026-03-08 13:03:27 -04:00
parent 0b5e99f720
commit 4b2782f674
No known key found for this signature in database
12 changed files with 147 additions and 31 deletions

View file

@ -20,14 +20,14 @@ func NewPathRegistry(root string) *PathRegistry {
return &PathRegistry{RootDir: root}
}
// Binary paths
// BinDir returns the path to the bin directory where compiled binaries are stored.
func (p *PathRegistry) BinDir() string { return filepath.Join(p.RootDir, "bin") }
func (p *PathRegistry) APIServerBinary() string { return filepath.Join(p.BinDir(), "api-server") }
func (p *PathRegistry) WorkerBinary() string { return filepath.Join(p.BinDir(), "worker") }
func (p *PathRegistry) TUIBinary() string { return filepath.Join(p.BinDir(), "tui") }
func (p *PathRegistry) DataManagerBinary() string { return filepath.Join(p.BinDir(), "data_manager") }
// Data paths
// DataDir returns the path to the data directory where all runtime data is stored.
func (p *PathRegistry) DataDir() string { return filepath.Join(p.RootDir, "data") }
func (p *PathRegistry) ActiveDataDir() string { return filepath.Join(p.DataDir(), "active") }
func (p *PathRegistry) JupyterStateDir() string {
@ -36,26 +36,27 @@ func (p *PathRegistry) JupyterStateDir() string {
func (p *PathRegistry) ExperimentsDir() string { return filepath.Join(p.DataDir(), "experiments") }
func (p *PathRegistry) ProdSmokeDir() string { return filepath.Join(p.DataDir(), "prod-smoke") }
// Database paths
// DBDir returns the path to the database directory where all database files are stored.
func (p *PathRegistry) DBDir() string { return filepath.Join(p.RootDir, "db") }
func (p *PathRegistry) SQLitePath() string { return filepath.Join(p.DBDir(), "fetch_ml.db") }
// Log paths
// LogDir returns the path to the log directory where all log files are stored.
func (p *PathRegistry) LogDir() string { return filepath.Join(p.RootDir, "logs") }
func (p *PathRegistry) AuditLogPath() string { return filepath.Join(p.LogDir(), "fetchml-audit.log") }
// Config paths
// ConfigDir returns the path to the config directory where all configuration files are stored.
func (p *PathRegistry) ConfigDir() string { return filepath.Join(p.RootDir, "configs") }
func (p *PathRegistry) APIServerConfig() string {
return filepath.Join(p.ConfigDir(), "api", "dev.yaml")
}
func (p *PathRegistry) WorkerConfigDir() string { return filepath.Join(p.ConfigDir(), "workers") }
// Test paths
// TestResultsDir returns the path to the test results directory where all test outputs are stored.
func (p *PathRegistry) TestResultsDir() string { return filepath.Join(p.RootDir, "test_results") }
func (p *PathRegistry) TempDir() string { return filepath.Join(p.RootDir, "tmp") }
// State file paths (for service persistence)
// These files store the state of Jupyter services and workspaces, allowing them to persist across restarts.
func (p *PathRegistry) JupyterServicesFile() string {
return filepath.Join(p.JupyterStateDir(), "fetch_ml_jupyter_services.json")
}
@ -66,12 +67,12 @@ func (p *PathRegistry) JupyterWorkspacesFile() string {
// EnsureDir creates directory if it doesn't exist with appropriate permissions.
func (p *PathRegistry) EnsureDir(path string) error {
return os.MkdirAll(path, 0750)
return os.MkdirAll(path, 0o750)
}
// EnsureDirSecure creates directory with restricted permissions (for sensitive data).
func (p *PathRegistry) EnsureDirSecure(path string) error {
return os.MkdirAll(path, 0700)
return os.MkdirAll(path, 0o700)
}
// FileExists checks if a file exists.

View file

@ -332,7 +332,7 @@ func (s *SupplyChainSecurity) generateSBOM(_ context.Context, imageRef string) (
return nil, err
}
if err := os.WriteFile(path, data, 0640); err != nil {
if err := os.WriteFile(path, data, 0600); err != nil {
return nil, err
}

View file

@ -137,6 +137,7 @@ func SavePrivateKeyToFile(key []byte, path string) error {
// LoadPrivateKeyFromFile loads a private key from a file
func LoadPrivateKeyFromFile(path string) ([]byte, error) {
// #nosec G304 -- path is controlled by configuration, not arbitrary user input
key, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read private key: %w", err)
@ -160,6 +161,7 @@ func SavePublicKeyToFile(key []byte, path string) error {
// LoadPublicKeyFromFile loads a public key from a file
func LoadPublicKeyFromFile(path string) ([]byte, error) {
// #nosec G304 -- path is controlled by configuration, not arbitrary user input
key, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read public key: %w", err)

View file

@ -50,6 +50,11 @@ type Task struct {
// RemainingTime is the wall-clock budget left when assigned to a worker.
// Set by the scheduler on assignment.
RemainingTime time.Duration `json:"remaining_time,omitempty"`
// Visibility controls who can access this task
Visibility VisibilityLevel `json:"visibility,omitempty"`
// ExperimentID optionally groups this task into an experiment
ExperimentID string `json:"experiment_id,omitempty"`
}
// Attempt represents a single execution attempt of a task
@ -65,3 +70,13 @@ type Attempt struct {
Attempt int `json:"attempt"`
ExitCode int `json:"exit_code,omitempty"`
}
// VisibilityLevel defines task sharing visibility
type VisibilityLevel string
const (
VisibilityPrivate VisibilityLevel = "private" // Owner only (opt-down)
VisibilityLab VisibilityLevel = "lab" // Selected lab group — DEFAULT
VisibilityInstitution VisibilityLevel = "institution" // All authenticated users
VisibilityOpen VisibilityLevel = "open" // Anyone with a signed link token
)

View file

@ -119,19 +119,28 @@ func (m *Manager) WriteMetadata(meta *Metadata) error {
// Timestamp
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(meta.Timestamp)) //nolint:gosec
if meta.Timestamp < 0 {
return fmt.Errorf("timestamp cannot be negative: %d", meta.Timestamp)
}
binary.BigEndian.PutUint64(ts, uint64(meta.Timestamp))
buf = append(buf, ts...)
// Commit ID
buf = append(buf, byte(len(meta.CommitID)))
commitIDLen := min(len(meta.CommitID), 255)
// #nosec G115 -- length is bounded by min() to 255, safe conversion
buf = append(buf, byte(commitIDLen))
buf = append(buf, []byte(meta.CommitID)...)
// Job Name
buf = append(buf, byte(len(meta.JobName)))
jobNameLen := min(len(meta.JobName), 255)
// #nosec G115 -- length is bounded by min() to 255, safe conversion
buf = append(buf, byte(jobNameLen))
buf = append(buf, []byte(meta.JobName)...)
// User
buf = append(buf, byte(len(meta.User)))
userLen := min(len(meta.User), 255)
// #nosec G115 -- length is bounded by min() to 255, safe conversion
buf = append(buf, byte(userLen))
buf = append(buf, []byte(meta.User)...)
// SECURITY: Write with fsync for crash safety
@ -162,7 +171,11 @@ func (m *Manager) ReadMetadata(commitID string) (*Metadata, error) {
}
// Timestamp
meta.Timestamp = int64(binary.BigEndian.Uint64(data[offset : offset+8])) //nolint:gosec
ts := binary.BigEndian.Uint64(data[offset : offset+8])
if ts > math.MaxInt64 {
return nil, fmt.Errorf("timestamp overflow: %d", ts)
}
meta.Timestamp = int64(ts)
offset += 8
// Commit ID
@ -336,13 +349,24 @@ func (m *Manager) LogMetric(commitID string, name string, value float64, step in
// Timestamp
ts := make([]byte, 8)
ts64 := uint64(time.Now().Unix()) //nolint:gosec
unix := time.Now().Unix()
if unix < 0 {
return fmt.Errorf("system time is before epoch")
}
ts64 := uint64(unix)
binary.BigEndian.PutUint64(ts, ts64)
buf = append(buf, ts...)
// Step
st := make([]byte, 4)
binary.BigEndian.PutUint32(st, uint32(step)) //nolint:gosec
if step < 0 {
return fmt.Errorf("step cannot be negative: %d", step)
}
if step > int(^uint32(0)) {
return fmt.Errorf("step too large: %d", step)
}
stVal := uint32(step)
binary.BigEndian.PutUint32(st, stVal)
buf = append(buf, st...)
// Value (float64)
@ -354,7 +378,9 @@ func (m *Manager) LogMetric(commitID string, name string, value float64, step in
if len(name) > 255 {
name = name[:255]
}
buf = append(buf, byte(len(name)))
nameLen := min(len(name), 255)
// #nosec G115 -- length is bounded by min() to 255, safe conversion
buf = append(buf, byte(nameLen))
buf = append(buf, []byte(name)...)
// Append to file

View file

@ -113,6 +113,7 @@ func ValidateFileType(filePath string, allowedTypes []FileType) (*FileType, erro
}
// Open and read the file
// #nosec G304 -- filePath is validated through extension and magic byte checks
file, err := os.Open(filePath)
if err != nil {
return nil, fmt.Errorf("failed to open file for type validation: %w", err)
@ -158,6 +159,7 @@ func ValidateFileType(filePath string, allowedTypes []FileType) (*FileType, erro
// validateTextContent performs basic validation on text files
func validateTextContent(filePath string, ft FileType) error {
// Read a sample of the file
// #nosec G304 -- filePath is validated through extension checks before calling
data, err := os.ReadFile(filePath)
if err != nil {
return fmt.Errorf("failed to read text file: %w", err)

View file

@ -150,6 +150,7 @@ func (v *SecurePathValidator) SecureCreateTemp(pattern string) (*os.File, string
fileName := fmt.Sprintf("%s_%s", pattern, randomSuffix)
fullPath := filepath.Join(validatedPath, fileName)
// #nosec G304 -- fullPath is constructed from validated base dir and generated filename
file, err := os.Create(fullPath)
if err != nil {
return nil, "", fmt.Errorf("failed to create temp file: %w", err)
@ -161,27 +162,28 @@ func (v *SecurePathValidator) SecureCreateTemp(pattern string) (*os.File, string
// WriteFileSafe writes data to a file with fsync and removes partial files on error.
// This ensures crash safety: either the file is complete and synced, or it doesn't exist.
func WriteFileSafe(path string, data []byte, perm os.FileMode) error {
// #nosec G304 -- path is cleaned before use and validated by caller
f, err := os.OpenFile(filepath.Clean(path), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, perm)
if err != nil {
return fmt.Errorf("open: %w", err)
}
if _, err := f.Write(data); err != nil {
f.Close()
os.Remove(path) // remove partial write
_ = f.Close()
_ = os.Remove(path) // remove partial write
return fmt.Errorf("write: %w", err)
}
// CRITICAL: fsync ensures data is flushed to disk before returning
if err := f.Sync(); err != nil {
f.Close()
os.Remove(path) // remove unsynced file
_ = f.Close()
_ = os.Remove(path) // remove unsynced file
return fmt.Errorf("fsync: %w", err)
}
// Close can fail on some filesystems (NFS, network-backed volumes)
if err := f.Close(); err != nil {
os.Remove(path) // remove file if close failed
_ = os.Remove(path) // remove file if close failed
return fmt.Errorf("close: %w", err)
}
@ -279,6 +281,7 @@ func SecureTempFile(dir, pattern string) (*os.File, error) {
// Create file with restrictive permissions (0600)
// Use O_EXCL to prevent race conditions
// #nosec G304 -- path is constructed from validated dir and generated filename
f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0600)
if err != nil {
return nil, fmt.Errorf("failed to create secure temp file: %w", err)
@ -306,6 +309,7 @@ func SecureFileRemove(path string) error {
}
// Open file for writing
// #nosec G304 -- path is validated by caller and used for secure overwrite
f, err := os.OpenFile(path, os.O_WRONLY, 0)
if err != nil {
return fmt.Errorf("failed to open file for secure removal: %w", err)
@ -329,11 +333,11 @@ func SecureFileRemove(path string) error {
remaining -= toWrite
}
// Sync to ensure zeros hit disk
f.Sync()
_ = f.Sync()
}
// Close and remove
f.Close()
_ = f.Close()
return os.Remove(path)
}

View file

@ -371,6 +371,7 @@ func (pm *PackageManager) savePackageRequest(req *PackageRequest) error {
// loadPackageRequest loads a package request from cache
func (pm *PackageManager) loadPackageRequest(requestID string) (*PackageRequest, error) {
requestFile := filepath.Join(pm.packageCachePath, fmt.Sprintf("request_%s.json", requestID))
// #nosec G304 -- path is constructed from controlled cache path and sanitized requestID
data, err := os.ReadFile(requestFile)
if err != nil {
return nil, err
@ -393,6 +394,7 @@ func (pm *PackageManager) loadAllPackageRequests() ([]*PackageRequest, error) {
var requests []*PackageRequest
for _, file := range files {
// #nosec G304 -- path is from controlled cache directory via Glob
data, err := os.ReadFile(file)
if err != nil {
pm.logger.Warn("failed to read request file", "file", file, "error", err)
@ -434,6 +436,7 @@ func (pm *PackageManager) loadAllPackageInfo() ([]*PackageInfo, error) {
var packages []*PackageInfo
for _, file := range files {
// #nosec G304 -- path is from controlled cache directory via Glob
data, err := os.ReadFile(file)
if err != nil {
pm.logger.Warn("failed to read package info file", "file", file, "error", err)

View file

@ -990,6 +990,7 @@ func (sm *ServiceManager) prepareContainerConfig(
jupyterTokenArg := ""
if !req.Network.EnableToken {
// #nosec G101 -- Disabling token auth intentionally when EnableToken is false
jupyterTokenArg = " --NotebookApp.token="
}
@ -1094,6 +1095,7 @@ func (sm *ServiceManager) loadServices() error {
return fmt.Errorf("failed to create jupyter state directory: %w", err)
}
// #nosec G304 -- servicesFile is from controlled path registry
data, err := os.ReadFile(servicesFile)
if err != nil {
if os.IsNotExist(err) {

View file

@ -60,7 +60,7 @@ func NewSSHClient(host, user, keyPath string, port int, knownHostsPath string) (
}
// InsecureIgnoreHostKey used as fallback - security implications reviewed
//nolint:gosec // G106: Use of InsecureIgnoreHostKey is intentional fallback
// #nosec G106 -- Use of InsecureIgnoreHostKey is intentional fallback
hostKeyCallback := ssh.InsecureIgnoreHostKey()
if knownHostsPath != "" {
knownHostsPath = storage.ExpandPath(knownHostsPath)

View file

@ -41,21 +41,21 @@ func writeTaskFile(path string, data []byte) error {
}
if _, err := f.Write(data); err != nil {
f.Close()
os.Remove(path) // remove partial write
_ = f.Close()
_ = os.Remove(path) // remove partial write
return fmt.Errorf("write: %w", err)
}
// CRITICAL: fsync ensures data is flushed to disk before returning
if err := f.Sync(); err != nil {
f.Close()
os.Remove(path) // remove unsynced file
_ = f.Close()
_ = os.Remove(path) // remove unsynced file
return fmt.Errorf("fsync: %w", err)
}
// Close can fail on some filesystems (NFS, network-backed volumes)
if err := f.Close(); err != nil {
os.Remove(path) // remove file if close failed
_ = os.Remove(path) // remove file if close failed
return fmt.Errorf("close: %w", err)
}
@ -155,6 +155,7 @@ func (q *Queue) GetTask(id string) (*domain.Task, error) {
// Search in all directories
for _, dir := range []string{"pending", "running", "finished", "failed"} {
taskFile := filepath.Join(q.root, dir, "entries", id+".json")
// #nosec G304 -- path is constructed from validated root and validated task ID
data, err := os.ReadFile(taskFile)
if err == nil {
var task domain.Task
@ -184,6 +185,7 @@ func (q *Queue) ListTasks() ([]*domain.Task, error) {
continue
}
// #nosec G304 -- path is constructed from validated root and directory entry
data, err := os.ReadFile(filepath.Join(entriesDir, entry.Name()))
if err != nil {
continue
@ -253,3 +255,57 @@ func (q *Queue) rebuildIndex() error {
// Implementation would rebuild the index file
return nil
}
// applyPrivacyFilter shapes which fields are returned based on visibility level.
// Returns a shallow copy with sensitive fields redacted — never mutates stored data.
func applyPrivacyFilter(task *domain.Task, level domain.VisibilityLevel) *domain.Task {
out := *task // shallow copy
switch level {
case domain.VisibilityOpen:
// Strip args and PII; keep results for reproducibility
out.Args = "[redacted]"
out.Output = redactSensitiveInfo(out.Output)
out.Metadata = filterMetadata(out.Metadata)
case domain.VisibilityLab, domain.VisibilityInstitution:
// Full fields; access is audit-logged at the handler layer
case domain.VisibilityPrivate:
// Should never reach here (filtered by SQL), but be defensive
return &domain.Task{ID: task.ID, Status: "private"}
}
return &out
}
// redactSensitiveInfo removes PII from output strings.
// This is a basic implementation — extend as needed for your data.
func redactSensitiveInfo(output string) string {
// Simple redaction: truncate long outputs and mark as redacted
if len(output) > 100 {
return output[:100] + "\n[... additional output redacted for privacy ...]"
}
return output
}
// filterMetadata removes sensitive fields from metadata.
// Preserves non-sensitive fields like git commit, environment, etc.
func filterMetadata(metadata map[string]string) map[string]string {
if metadata == nil {
return nil
}
// List of sensitive keys to remove
sensitiveKeys := []string{"api_key", "token", "password", "secret", "auth"}
filtered := make(map[string]string)
for k, v := range metadata {
lowerKey := strings.ToLower(k)
sensitive := false
for _, sk := range sensitiveKeys {
if strings.Contains(lowerKey, sk) {
sensitive = true
break
}
}
if !sensitive {
filtered[k] = v
}
}
return filtered
}

View file

@ -225,6 +225,7 @@ func (q *FilesystemQueue) GetTask(taskID string) (*Task, error) {
if err != nil {
return nil, err
}
// #nosec G304 -- path is constructed from validated root and task ID
data, err := os.ReadFile(path)
if err != nil {
return nil, err
@ -261,6 +262,7 @@ func (q *FilesystemQueue) GetAllTasks() ([]*Task, error) {
out := make([]*Task, 0, len(paths))
for _, path := range paths {
// #nosec G304 -- path is constructed from validated root and directory entry
data, err := os.ReadFile(path)
if err != nil {
continue
@ -405,6 +407,7 @@ func (q *FilesystemQueue) claimNext(workerID string, leaseDuration time.Duration
continue
}
path := filepath.Join(pendingDir, e.Name())
// #nosec G304 -- path is constructed from validated root and directory entry
data, err := os.ReadFile(path)
if err != nil {
continue
@ -450,6 +453,7 @@ func (q *FilesystemQueue) claimNext(workerID string, leaseDuration time.Duration
}
// Refresh from the moved file to avoid race on content.
// #nosec G304 -- dst is constructed from validated root and task ID
data, err := os.ReadFile(dst)
if err != nil {
return nil, err
@ -542,6 +546,7 @@ func (q *FilesystemQueue) rebuildIndex() error {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
continue
}
// #nosec G304 -- path is constructed from validated root and directory entry
data, err := os.ReadFile(filepath.Join(pendingDir, e.Name()))
if err != nil {
continue