From d1bef0a450bfe3b0d9f31983df72a2104674165c Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Tue, 17 Feb 2026 12:49:53 -0500 Subject: [PATCH] refactor: Phase 3 - fix config/storage boundaries Move schema ownership to infrastructure layer: - Redis keys: config/constants.go -> queue/keys.go (TaskQueueKey, TaskPrefix, etc.) - Filesystem paths: config/paths.go -> storage/paths.go (JobPaths) - Create config/shared.go with RedisConfig, SSHConfig - Update all imports: worker/, api/helpers, api/ws_jobs, api/ws_validate - Clean up: remove duplicates from queue/task.go, queue/queue.go, config/paths.go Build status: Compiles successfully --- internal/api/helpers/validation_helpers.go | 4 +- internal/api/ws_jobs.go | 5 +- internal/api/ws_validate.go | 4 +- internal/config/paths.go | 72 +--------------------- internal/config/shared.go | 61 ++++++++++++++++++ internal/queue/keys.go | 23 +++++++ internal/queue/queue.go | 1 - internal/queue/task.go | 11 ---- internal/storage/paths.go | 55 +++++++++++++++++ internal/worker/execution.go | 11 ++-- tests/unit/config/paths_test.go | 9 +-- 11 files changed, 158 insertions(+), 98 deletions(-) create mode 100644 internal/config/shared.go create mode 100644 internal/queue/keys.go create mode 100644 internal/storage/paths.go diff --git a/internal/api/helpers/validation_helpers.go b/internal/api/helpers/validation_helpers.go index 78404ec..6812680 100644 --- a/internal/api/helpers/validation_helpers.go +++ b/internal/api/helpers/validation_helpers.go @@ -7,10 +7,10 @@ import ( "path/filepath" "strings" - "github.com/jfraeys/fetch_ml/internal/config" "github.com/jfraeys/fetch_ml/internal/experiment" "github.com/jfraeys/fetch_ml/internal/manifest" "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/storage" "github.com/jfraeys/fetch_ml/internal/worker" ) @@ -114,7 +114,7 @@ func FindRunManifestDir(basePath string, jobName string) (dir string, bucket str if strings.TrimSpace(basePath) == "" || strings.TrimSpace(jobName) == "" { return "", "", false } - jobPaths := config.NewJobPaths(basePath) + jobPaths := storage.NewJobPaths(basePath) typedRoots := []struct { bucket string root string diff --git a/internal/api/ws_jobs.go b/internal/api/ws_jobs.go index 919c84c..f21be99 100644 --- a/internal/api/ws_jobs.go +++ b/internal/api/ws_jobs.go @@ -15,7 +15,6 @@ import ( "github.com/gorilla/websocket" "github.com/jfraeys/fetch_ml/internal/api/helpers" "github.com/jfraeys/fetch_ml/internal/auth" - "github.com/jfraeys/fetch_ml/internal/config" "github.com/jfraeys/fetch_ml/internal/container" "github.com/jfraeys/fetch_ml/internal/manifest" "github.com/jfraeys/fetch_ml/internal/queue" @@ -71,7 +70,7 @@ func (h *WSHandler) handleAnnotateRun(conn *websocket.Conn, payload []byte) erro return h.sendErrorPacket(conn, ErrorCodeInvalidConfiguration, "Missing api base_path", "") } - jobPaths := config.NewJobPaths(base) + jobPaths := storage.NewJobPaths(base) typedRoots := []struct{ root string }{ {root: jobPaths.RunningPath()}, {root: jobPaths.PendingPath()}, @@ -148,7 +147,7 @@ func (h *WSHandler) handleSetRunNarrative(conn *websocket.Conn, payload []byte) return h.sendErrorPacket(conn, ErrorCodeInvalidConfiguration, "Missing api base_path", "") } - jobPaths := config.NewJobPaths(base) + jobPaths := storage.NewJobPaths(base) typedRoots := []struct{ root string }{ {root: jobPaths.RunningPath()}, {root: jobPaths.PendingPath()}, diff --git a/internal/api/ws_validate.go b/internal/api/ws_validate.go index bc3308e..6d88d38 100644 --- a/internal/api/ws_validate.go +++ b/internal/api/ws_validate.go @@ -11,10 +11,10 @@ import ( "github.com/gorilla/websocket" "github.com/jfraeys/fetch_ml/internal/api/helpers" - "github.com/jfraeys/fetch_ml/internal/config" "github.com/jfraeys/fetch_ml/internal/container" "github.com/jfraeys/fetch_ml/internal/manifest" "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/storage" "github.com/jfraeys/fetch_ml/internal/worker" ) @@ -68,7 +68,7 @@ func findRunManifestDir(basePath string, jobName string) (string, string, bool) if strings.TrimSpace(basePath) == "" || strings.TrimSpace(jobName) == "" { return "", "", false } - jobPaths := config.NewJobPaths(basePath) + jobPaths := storage.NewJobPaths(basePath) typedRoots := []struct { bucket string root string diff --git a/internal/config/paths.go b/internal/config/paths.go index 13b4981..d629db4 100644 --- a/internal/config/paths.go +++ b/internal/config/paths.go @@ -1,73 +1,5 @@ // Package config provides shared utilities for the fetch_ml project. package config -import ( - "fmt" - "os" - "path/filepath" - "strings" -) - -// ExpandPath expands environment variables and tilde in a path -func ExpandPath(path string) string { - if path == "" { - return "" - } - expanded := os.ExpandEnv(path) - if strings.HasPrefix(expanded, "~") { - home, err := os.UserHomeDir() - if err == nil { - expanded = filepath.Join(home, expanded[1:]) - } - } - return expanded -} - -// ResolveConfigPath resolves a config file path, checking multiple locations -func ResolveConfigPath(path string) (string, error) { - candidates := []string{path} - if !filepath.IsAbs(path) { - candidates = append(candidates, filepath.Join("configs", path)) - } - - var checked []string - for _, candidate := range candidates { - resolved := ExpandPath(candidate) - checked = append(checked, resolved) - if _, err := os.Stat(resolved); err == nil { - return resolved, nil - } - } - - return "", fmt.Errorf("config file not found (looked in %s)", strings.Join(checked, ", ")) -} - -// JobPaths provides helper methods for job directory paths -type JobPaths struct { - BasePath string -} - -// NewJobPaths creates a new JobPaths instance -func NewJobPaths(basePath string) *JobPaths { - return &JobPaths{BasePath: basePath} -} - -// PendingPath returns the path to pending jobs directory -func (j *JobPaths) PendingPath() string { - return filepath.Join(j.BasePath, "pending") -} - -// RunningPath returns the path to running jobs directory -func (j *JobPaths) RunningPath() string { - return filepath.Join(j.BasePath, "running") -} - -// FinishedPath returns the path to finished jobs directory -func (j *JobPaths) FinishedPath() string { - return filepath.Join(j.BasePath, "finished") -} - -// FailedPath returns the path to failed jobs directory -func (j *JobPaths) FailedPath() string { - return filepath.Join(j.BasePath, "failed") -} +// Deprecated: Use config.ExpandPath from shared.go instead +// This file is kept for backward compatibility during migration diff --git a/internal/config/shared.go b/internal/config/shared.go new file mode 100644 index 0000000..19317e3 --- /dev/null +++ b/internal/config/shared.go @@ -0,0 +1,61 @@ +// Package config provides shared configuration types for fetch_ml. +// These types are embedded by per-command config structs. +package config + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +// RedisConfig holds Redis connection settings +type RedisConfig struct { + Addr string `yaml:"addr" json:"addr"` + Password string `yaml:"password" json:"password"` + DB int `yaml:"db" json:"db"` +} + +// SSHConfig holds SSH connection settings +type SSHConfig struct { + Host string `yaml:"host" json:"host"` + Port int `yaml:"port" json:"port"` + User string `yaml:"user" json:"user"` + KeyPath string `yaml:"key_path" json:"key_path"` + KnownHosts string `yaml:"known_hosts" json:"known_hosts"` +} + +// ExpandPath expands environment variables and tilde in a path +// This remains in config package as it's a config parsing utility +func ExpandPath(path string) string { + if path == "" { + return "" + } + expanded := os.ExpandEnv(path) + if strings.HasPrefix(expanded, "~") { + home, err := os.UserHomeDir() + if err == nil { + expanded = filepath.Join(home, expanded[1:]) + } + } + return expanded +} + +// ResolveConfigPath resolves a config file path, checking multiple locations +func ResolveConfigPath(path string) (string, error) { + candidates := []string{path} + if !filepath.IsAbs(path) { + candidates = append(candidates, filepath.Join("configs", path)) + } + + var checked []string + for _, candidate := range candidates { + resolved := ExpandPath(candidate) + checked = append(checked, resolved) + if _, err := os.Stat(resolved); err == nil { + return resolved, nil + } + } + + return "", fmt.Errorf("config file not found (looked in %s)", strings.Join(checked, ", ")) +} diff --git a/internal/queue/keys.go b/internal/queue/keys.go new file mode 100644 index 0000000..db4962b --- /dev/null +++ b/internal/queue/keys.go @@ -0,0 +1,23 @@ +// Package queue provides task queue functionality +package queue + +// Redis key prefixes - schema ownership belongs with the queue package +const ( + TaskQueueKey = "ml:queue" + TaskPrefix = "ml:task:" + JobMetricsPrefix = "ml:metrics:" + TaskStatusPrefix = "ml:status:" + DatasetPrefix = "ml:dataset:" + WorkerHeartbeat = "ml:workers:heartbeat" + WorkerPrewarmKey = "ml:workers:prewarm:" + PrewarmGCRequestKey = "ml:prewarm:gc:request" +) + +// Task status constants +const ( + TaskStatusQueued = "queued" + TaskStatusRunning = "running" + TaskStatusCompleted = "completed" + TaskStatusFailed = "failed" + TaskStatusCancelled = "cancelled" +) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index ac81519..9b10c09 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -16,7 +16,6 @@ const ( defaultLeaseDuration = 30 * time.Minute defaultMaxRetries = 3 defaultBlockTimeout = 1 * time.Second - PrewarmGCRequestKey = "ml:prewarm:gc_request" ) // TaskQueue manages ML experiment tasks via Redis diff --git a/internal/queue/task.go b/internal/queue/task.go index 5aeb626..e335011 100644 --- a/internal/queue/task.go +++ b/internal/queue/task.go @@ -1,7 +1,6 @@ package queue import ( - "github.com/jfraeys/fetch_ml/internal/config" "github.com/jfraeys/fetch_ml/internal/domain" ) @@ -16,13 +15,3 @@ type ( TensorBoardTrackingConfig = domain.TensorBoardTrackingConfig WandbTrackingConfig = domain.WandbTrackingConfig ) - -// Redis key constants -var ( - TaskQueueKey = config.RedisTaskQueueKey - TaskPrefix = config.RedisTaskPrefix - TaskStatusPrefix = config.RedisTaskStatusPrefix - WorkerHeartbeat = config.RedisWorkerHeartbeat - WorkerPrewarmKey = config.RedisWorkerPrewarmKey - JobMetricsPrefix = config.RedisJobMetricsPrefix -) diff --git a/internal/storage/paths.go b/internal/storage/paths.go new file mode 100644 index 0000000..99e84f1 --- /dev/null +++ b/internal/storage/paths.go @@ -0,0 +1,55 @@ +// Package storage provides filesystem storage utilities for the fetch_ml project. +// This package owns schema for filesystem paths used by the storage layer. +package storage + +import ( + "os" + "path/filepath" + "strings" +) + +// ExpandPath expands environment variables and tilde in a path +func ExpandPath(path string) string { + if path == "" { + return "" + } + expanded := os.ExpandEnv(path) + if strings.HasPrefix(expanded, "~") { + home, err := os.UserHomeDir() + if err == nil { + expanded = filepath.Join(home, expanded[1:]) + } + } + return expanded +} + +// JobPaths provides helper methods for job directory paths +// This belongs in the storage layer because it defines filesystem schema +type JobPaths struct { + BasePath string +} + +// NewJobPaths creates a new JobPaths instance +func NewJobPaths(basePath string) *JobPaths { + return &JobPaths{BasePath: basePath} +} + +// PendingPath returns the path to pending jobs directory +func (j *JobPaths) PendingPath() string { + return filepath.Join(j.BasePath, "pending") +} + +// RunningPath returns the path to running jobs directory +func (j *JobPaths) RunningPath() string { + return filepath.Join(j.BasePath, "running") +} + +// FinishedPath returns the path to finished jobs directory +func (j *JobPaths) FinishedPath() string { + return filepath.Join(j.BasePath, "finished") +} + +// FailedPath returns the path to failed jobs directory +func (j *JobPaths) FailedPath() string { + return filepath.Join(j.BasePath, "failed") +} diff --git a/internal/worker/execution.go b/internal/worker/execution.go index d8c7f36..834e60d 100644 --- a/internal/worker/execution.go +++ b/internal/worker/execution.go @@ -21,6 +21,7 @@ import ( "github.com/jfraeys/fetch_ml/internal/fileutil" "github.com/jfraeys/fetch_ml/internal/manifest" "github.com/jfraeys/fetch_ml/internal/queue" + "github.com/jfraeys/fetch_ml/internal/storage" "github.com/jfraeys/fetch_ml/internal/telemetry" "github.com/jfraeys/fetch_ml/internal/tracking" ) @@ -263,7 +264,7 @@ func (w *Worker) runJob(ctx context.Context, task *queue.Task, cudaVisibleDevice m.MarkFinished(now, &exitCode, err) m.Metadata["failure_phase"] = "stage_experiment_files" }) - failedDir := filepath.Join(config.NewJobPaths(w.config.BasePath).FailedPath(), task.JobName) + failedDir := filepath.Join(storage.NewJobPaths(w.config.BasePath).FailedPath(), task.JobName) _ = os.MkdirAll(filepath.Dir(failedDir), 0750) _ = os.RemoveAll(failedDir) _ = os.Rename(jobDir, failedDir) @@ -286,7 +287,7 @@ func (w *Worker) runJob(ctx context.Context, task *queue.Task, cudaVisibleDevice m.MarkFinished(now, &exitCode, err) m.Metadata["failure_phase"] = "stage_snapshot" }) - failedDir := filepath.Join(config.NewJobPaths(w.config.BasePath).FailedPath(), task.JobName) + failedDir := filepath.Join(storage.NewJobPaths(w.config.BasePath).FailedPath(), task.JobName) _ = os.MkdirAll(filepath.Dir(failedDir), 0750) _ = os.RemoveAll(failedDir) _ = os.Rename(jobDir, failedDir) @@ -474,7 +475,7 @@ func copyDir(src, dst string) error { func (w *Worker) setupJobDirectories( task *queue.Task, ) (jobDir, outputDir, logFile string, err error) { - jobPaths := config.NewJobPaths(w.config.BasePath) + jobPaths := storage.NewJobPaths(w.config.BasePath) pendingDir := jobPaths.PendingPath() jobDir = filepath.Join(pendingDir, task.JobName) outputDir = filepath.Join(jobPaths.RunningPath(), task.JobName) @@ -611,7 +612,7 @@ func (w *Worker) executeJob( }) finalizeStart := time.Now() - jobPaths := config.NewJobPaths(w.config.BasePath) + jobPaths := storage.NewJobPaths(w.config.BasePath) var dest string if err != nil { dest = filepath.Join(jobPaths.FailedPath(), task.JobName) @@ -763,7 +764,7 @@ func (w *Worker) executeContainerJob( containerWorkspace = config.DefaultContainerWorkspace } - jobPaths := config.NewJobPaths(w.config.BasePath) + jobPaths := storage.NewJobPaths(w.config.BasePath) stagingStart := time.Now() // Optional: provision tracking tools for this task diff --git a/tests/unit/config/paths_test.go b/tests/unit/config/paths_test.go index 84bec58..9afede2 100644 --- a/tests/unit/config/paths_test.go +++ b/tests/unit/config/paths_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/jfraeys/fetch_ml/internal/config" + "github.com/jfraeys/fetch_ml/internal/storage" ) func TestExpandPath(t *testing.T) { @@ -145,7 +146,7 @@ func TestNewJobPaths(t *testing.T) { t.Parallel() // Enable parallel execution basePath := "/test/base" - jobPaths := config.NewJobPaths(basePath) + jobPaths := storage.NewJobPaths(basePath) if jobPaths.BasePath != basePath { t.Errorf("Expected BasePath %s, got %s", basePath, jobPaths.BasePath) @@ -156,7 +157,7 @@ func TestJobPathsMethods(t *testing.T) { t.Parallel() // Enable parallel execution basePath := "/test/base" - jobPaths := config.NewJobPaths(basePath) + jobPaths := storage.NewJobPaths(basePath) // Test all path methods tests := []struct { @@ -184,7 +185,7 @@ func TestJobPathsWithComplexBase(t *testing.T) { t.Parallel() // Enable parallel execution basePath := "/very/complex/base/path/with/subdirs" - jobPaths := config.NewJobPaths(basePath) + jobPaths := storage.NewJobPaths(basePath) expectedPending := filepath.Join(basePath, "pending") if jobPaths.PendingPath() != expectedPending { @@ -200,7 +201,7 @@ func TestJobPathsWithComplexBase(t *testing.T) { func TestJobPathsEmptyBase(t *testing.T) { t.Parallel() // Enable parallel execution - jobPaths := config.NewJobPaths("") + jobPaths := storage.NewJobPaths("") // Should still work with empty base path expectedPending := "pending"