Some checks failed
Build CLI with Embedded SQLite / build (arm64, aarch64-linux) (push) Waiting to run
Build CLI with Embedded SQLite / build (x86_64, x86_64-linux) (push) Waiting to run
Build CLI with Embedded SQLite / build-macos (arm64) (push) Waiting to run
Build CLI with Embedded SQLite / build-macos (x86_64) (push) Waiting to run
CI/CD Pipeline / Docker Build (push) Blocked by required conditions
Security Scan / Security Analysis (push) Waiting to run
Security Scan / Native Library Security (push) Waiting to run
Checkout test / test (push) Successful in 6s
CI with Native Libraries / Check Build Environment (push) Successful in 12s
CI/CD Pipeline / Test (push) Failing after 5m15s
CI/CD Pipeline / Dev Compose Smoke Test (push) Has been skipped
CI/CD Pipeline / Build (push) Has been skipped
CI/CD Pipeline / Test Scripts (push) Has been skipped
CI/CD Pipeline / Security Scan (push) Failing after 4m49s
Contract Tests / Spec Drift Detection (push) Failing after 13s
Contract Tests / API Contract Tests (push) Has been skipped
Deploy API Docs / Build API Documentation (push) Failing after 36s
Deploy API Docs / Deploy to GitHub Pages (push) Has been skipped
Documentation / build-and-publish (push) Failing after 26s
CI with Native Libraries / Build and Test Native Libraries (push) Has been cancelled
CI with Native Libraries / Build Release Libraries (push) Has been cancelled
386 lines
12 KiB
Go
386 lines
12 KiB
Go
// Package controller provides TUI command handlers
|
|
package controller
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"time"
|
|
|
|
tea "github.com/charmbracelet/bubbletea"
|
|
"github.com/jfraeys/fetch_ml/cmd/tui/internal/model"
|
|
"github.com/jfraeys/fetch_ml/internal/container"
|
|
"github.com/jfraeys/fetch_ml/internal/worker"
|
|
)
|
|
|
|
func shellQuote(s string) string {
|
|
return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'"
|
|
}
|
|
|
|
// Command factories for loading data
|
|
|
|
func (c *Controller) loadAllData() tea.Cmd {
|
|
return tea.Batch(
|
|
c.loadJobs(),
|
|
c.loadQueue(),
|
|
c.loadGPU(),
|
|
c.loadContainer(),
|
|
c.loadDatasets(),
|
|
)
|
|
}
|
|
|
|
func (c *Controller) loadJobs() tea.Cmd {
|
|
return func() tea.Msg {
|
|
type jobResult struct {
|
|
jobs []model.Job
|
|
err error
|
|
}
|
|
|
|
resultChan := make(chan jobResult, 1)
|
|
go func() {
|
|
var jobs []model.Job
|
|
statusChan := make(chan []model.Job, 4)
|
|
|
|
// Debug: Print paths being used
|
|
c.logger.Info("Loading jobs from paths",
|
|
"pending", c.getPathForStatus(model.StatusPending),
|
|
"running", c.getPathForStatus(model.StatusRunning),
|
|
"finished", c.getPathForStatus(model.StatusFinished),
|
|
"failed", c.getPathForStatus(model.StatusFailed))
|
|
|
|
for _, status := range []model.JobStatus{
|
|
model.StatusPending,
|
|
model.StatusRunning,
|
|
model.StatusFinished,
|
|
model.StatusFailed,
|
|
} {
|
|
go func(s model.JobStatus) {
|
|
path := c.getPathForStatus(s)
|
|
names := c.server.ListDir(path)
|
|
|
|
// Debug: Log what we found
|
|
c.logger.Info("Listed directory", "status", s, "path", path, "count", len(names))
|
|
|
|
var statusJobs []model.Job
|
|
for _, name := range names {
|
|
// Lazy loading: only fetch basic info for list view
|
|
// Full details (GPU, narrative) loaded on selection
|
|
statusJobs = append(statusJobs, model.Job{
|
|
Name: name,
|
|
Status: s,
|
|
// TaskID, Priority, GPU info loaded lazily
|
|
})
|
|
}
|
|
statusChan <- statusJobs
|
|
}(status)
|
|
}
|
|
|
|
for range 4 {
|
|
jobs = append(jobs, <-statusChan...)
|
|
}
|
|
|
|
resultChan <- jobResult{jobs: jobs, err: nil}
|
|
}()
|
|
|
|
result := <-resultChan
|
|
if result.err != nil {
|
|
return model.StatusMsg{Text: "Failed to load jobs: " + result.err.Error(), Level: "error"}
|
|
}
|
|
return model.JobsLoadedMsg(result.jobs)
|
|
}
|
|
}
|
|
|
|
func (c *Controller) loadQueue() tea.Cmd {
|
|
return func() tea.Msg {
|
|
tasks, err := c.taskQueue.GetQueuedTasks()
|
|
if err != nil {
|
|
c.logger.Error("failed to load queue", "error", err)
|
|
return model.StatusMsg{Text: "Failed to load queue: " + err.Error(), Level: "error"}
|
|
}
|
|
c.logger.Info("loaded queue", "task_count", len(tasks))
|
|
return model.TasksLoadedMsg(tasks)
|
|
}
|
|
}
|
|
|
|
func (c *Controller) loadGPU() tea.Cmd {
|
|
return func() tea.Msg {
|
|
type gpuResult struct {
|
|
content string
|
|
err error
|
|
}
|
|
|
|
resultChan := make(chan gpuResult, 1)
|
|
go func() {
|
|
// Try NVML first for accurate GPU info (Linux/Windows with NVIDIA)
|
|
if worker.IsNVMLAvailable() {
|
|
gpus, err := worker.GetAllGPUInfo()
|
|
if err == nil && len(gpus) > 0 {
|
|
var formatted strings.Builder
|
|
formatted.WriteString("GPU Status (NVML)\n")
|
|
formatted.WriteString(strings.Repeat("═", 50) + "\n\n")
|
|
|
|
for _, gpu := range gpus {
|
|
formatted.WriteString(fmt.Sprintf("🎮 GPU %d: %s\n", gpu.Index, gpu.Name))
|
|
formatted.WriteString(fmt.Sprintf(" Utilization: %d%%\n", gpu.Utilization))
|
|
formatted.WriteString(fmt.Sprintf(" Memory: %d/%d MB\n",
|
|
gpu.MemoryUsed/1024/1024, gpu.MemoryTotal/1024/1024))
|
|
formatted.WriteString(fmt.Sprintf(" Temperature: %d°C\n", gpu.Temperature))
|
|
if gpu.PowerDraw > 0 {
|
|
formatted.WriteString(fmt.Sprintf(" Power: %.1f W\n", float64(gpu.PowerDraw)/1000.0))
|
|
}
|
|
if gpu.ClockSM > 0 {
|
|
formatted.WriteString(fmt.Sprintf(" SM Clock: %d MHz\n", gpu.ClockSM))
|
|
}
|
|
formatted.WriteString("\n")
|
|
}
|
|
|
|
c.logger.Info("loaded GPU status", "type", "nvml", "count", len(gpus))
|
|
resultChan <- gpuResult{content: formatted.String(), err: nil}
|
|
return
|
|
}
|
|
}
|
|
|
|
// Try macOS GPU monitoring (development mode on macOS)
|
|
if worker.IsMacOS() {
|
|
gpuStatus, err := worker.FormatMacOSGPUStatus()
|
|
if err == nil && gpuStatus != "" {
|
|
c.logger.Info("loaded GPU status", "type", "macos")
|
|
resultChan <- gpuResult{content: gpuStatus, err: nil}
|
|
return
|
|
}
|
|
}
|
|
|
|
// No GPU monitoring available
|
|
c.logger.Warn("GPU info unavailable", "platform", runtime.GOOS)
|
|
resultChan <- gpuResult{
|
|
content: "GPU info unavailable\n\nNVML: NVIDIA driver not installed or incompatible\nmacOS: system_profiler not available",
|
|
err: fmt.Errorf("no GPU monitoring available on %s", runtime.GOOS),
|
|
}
|
|
}()
|
|
|
|
result := <-resultChan
|
|
return model.GpuLoadedMsg(result.content)
|
|
}
|
|
}
|
|
|
|
func (c *Controller) loadContainer() tea.Cmd {
|
|
return func() tea.Msg {
|
|
resultChan := make(chan string, 1)
|
|
go func() {
|
|
var formatted strings.Builder
|
|
formatted.WriteString("Container Status\n")
|
|
formatted.WriteString(strings.Repeat("═", 50) + "\n\n")
|
|
|
|
formatted.WriteString("📋 Configuration:\n")
|
|
formatted.WriteString(fmt.Sprintf(" Image: %s\n", c.config.PodmanImage))
|
|
formatted.WriteString(fmt.Sprintf(" GPU Devices: %v\n", c.config.GPUDevices))
|
|
formatted.WriteString(fmt.Sprintf(" Workspace: %s\n", c.config.ContainerWorkspace))
|
|
formatted.WriteString(fmt.Sprintf(" Results: %s\n\n", c.config.ContainerResults))
|
|
|
|
cmd := "podman ps -a --format '{{.Names}}|{{.Status}}|{{.Image}}'"
|
|
out, err := c.server.Exec(cmd)
|
|
if err == nil && strings.TrimSpace(out) != "" {
|
|
formatted.WriteString("🐳 Running Containers (Podman):\n")
|
|
lines := strings.Split(strings.TrimSpace(out), "\n")
|
|
for _, line := range lines {
|
|
parts := strings.Split(line, "|")
|
|
if len(parts) >= 3 {
|
|
status := "🟢"
|
|
if strings.Contains(parts[1], "Exited") {
|
|
status = "🔴"
|
|
}
|
|
formatted.WriteString(fmt.Sprintf(" %s %s\n", status, parts[0]))
|
|
formatted.WriteString(fmt.Sprintf(" Status: %s\n", parts[1]))
|
|
formatted.WriteString(fmt.Sprintf(" Image: %s\n\n", parts[2]))
|
|
}
|
|
}
|
|
} else {
|
|
cmd = "docker ps -a --format '{{.Names}}|{{.Status}}|{{.Image}}'"
|
|
out, err = c.server.Exec(cmd)
|
|
if err == nil && strings.TrimSpace(out) != "" {
|
|
formatted.WriteString("🐳 Running Containers (Docker):\n")
|
|
lines := strings.Split(strings.TrimSpace(out), "\n")
|
|
for _, line := range lines {
|
|
parts := strings.Split(line, "|")
|
|
if len(parts) >= 3 {
|
|
status := "🟢"
|
|
if strings.Contains(parts[1], "Exited") {
|
|
status = "🔴"
|
|
}
|
|
formatted.WriteString(fmt.Sprintf(" %s %s\n", status, parts[0]))
|
|
formatted.WriteString(fmt.Sprintf(" Status: %s\n", parts[1]))
|
|
formatted.WriteString(fmt.Sprintf(" Image: %s\n\n", parts[2]))
|
|
}
|
|
}
|
|
} else {
|
|
formatted.WriteString("⚠️ No containers found\n")
|
|
}
|
|
}
|
|
|
|
formatted.WriteString("💻 System Info:\n")
|
|
if podmanVersion, err := c.server.Exec("podman --version"); err == nil {
|
|
formatted.WriteString(fmt.Sprintf(" Podman: %s\n", strings.TrimSpace(podmanVersion)))
|
|
} else if dockerVersion, err := c.server.Exec("docker --version"); err == nil {
|
|
formatted.WriteString(fmt.Sprintf(" Docker: %s\n", strings.TrimSpace(dockerVersion)))
|
|
} else {
|
|
formatted.WriteString(" ⚠️ Container engine not available\n")
|
|
}
|
|
|
|
c.logger.Info("loaded container status")
|
|
resultChan <- formatted.String()
|
|
}()
|
|
|
|
return model.ContainerLoadedMsg(<-resultChan)
|
|
}
|
|
}
|
|
|
|
func (c *Controller) queueJob(jobName string, args string) tea.Cmd {
|
|
return func() tea.Msg {
|
|
resultChan := make(chan model.StatusMsg, 1)
|
|
go func() {
|
|
priority := int64(5)
|
|
if strings.Contains(args, "--priority") {
|
|
_, err := fmt.Sscanf(args, "--priority %d", &priority)
|
|
if err != nil {
|
|
c.logger.Error("invalid priority argument", "args", args, "error", err)
|
|
resultChan <- model.StatusMsg{
|
|
Text: fmt.Sprintf("Invalid priority: %v", err),
|
|
Level: "error",
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
task, err := c.taskQueue.EnqueueTask(jobName, args, priority)
|
|
if err != nil {
|
|
c.logger.Error("failed to queue job", "job_name", jobName, "error", err)
|
|
resultChan <- model.StatusMsg{
|
|
Text: fmt.Sprintf("Failed to queue %s: %v", jobName, err),
|
|
Level: "error",
|
|
}
|
|
return
|
|
}
|
|
|
|
c.logger.Info("job queued", "job_name", jobName, "task_id", task.ID[:8], "priority", priority)
|
|
resultChan <- model.StatusMsg{
|
|
Text: fmt.Sprintf("✓ Queued: %s (ID: %s, P:%d)", jobName, task.ID[:8], priority),
|
|
Level: "success",
|
|
}
|
|
}()
|
|
|
|
return <-resultChan
|
|
}
|
|
}
|
|
|
|
func (c *Controller) deleteJob(jobName string) tea.Cmd {
|
|
return func() tea.Msg {
|
|
if err := container.ValidateJobName(jobName); err != nil {
|
|
return model.StatusMsg{Text: fmt.Sprintf("Invalid job name %s: %v", jobName, err), Level: "error"}
|
|
}
|
|
|
|
jobPath := filepath.Join(c.config.PendingPath(), jobName)
|
|
stamp := time.Now().UTC().Format("20060102-150405")
|
|
archiveRoot := filepath.Join(c.config.BasePath, "archive", "pending", stamp)
|
|
dst := filepath.Join(archiveRoot, jobName)
|
|
cmd := fmt.Sprintf("mkdir -p %s && mv %s %s", shellQuote(archiveRoot), shellQuote(jobPath), shellQuote(dst))
|
|
if _, err := c.server.Exec(cmd); err != nil {
|
|
return model.StatusMsg{Text: fmt.Sprintf("Failed to archive %s: %v", jobName, err), Level: "error"}
|
|
}
|
|
return model.StatusMsg{Text: fmt.Sprintf("✓ Archived: %s", jobName), Level: "success"}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) markFailed(jobName string) tea.Cmd {
|
|
return func() tea.Msg {
|
|
src := filepath.Join(c.config.RunningPath(), jobName)
|
|
dst := filepath.Join(c.config.FailedPath(), jobName)
|
|
if _, err := c.server.Exec(fmt.Sprintf("mv %s %s", src, dst)); err != nil {
|
|
return model.StatusMsg{Text: fmt.Sprintf("Failed to mark failed: %v", err), Level: "error"}
|
|
}
|
|
return model.StatusMsg{Text: fmt.Sprintf("⚠ Marked failed: %s", jobName), Level: "warning"}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) cancelTask(taskID string) tea.Cmd {
|
|
return func() tea.Msg {
|
|
if err := c.taskQueue.CancelTask(taskID); err != nil {
|
|
c.logger.Error("failed to cancel task", "task_id", taskID[:8], "error", err)
|
|
return model.StatusMsg{Text: fmt.Sprintf("Cancel failed: %v", err), Level: "error"}
|
|
}
|
|
c.logger.Info("task cancelled", "task_id", taskID[:8])
|
|
return model.StatusMsg{Text: fmt.Sprintf("✓ Cancelled: %s", taskID[:8]), Level: "success"}
|
|
}
|
|
}
|
|
|
|
func (c *Controller) showQueue(m model.State) tea.Cmd {
|
|
return func() tea.Msg {
|
|
var content strings.Builder
|
|
content.WriteString("Task Queue\n")
|
|
content.WriteString(strings.Repeat("═", 60) + "\n\n")
|
|
|
|
if len(m.QueuedTasks) == 0 {
|
|
content.WriteString("📭 No tasks in queue\n")
|
|
} else {
|
|
for i, task := range m.QueuedTasks {
|
|
statusIcon := "⏳"
|
|
if task.Status == "running" {
|
|
statusIcon = "▶"
|
|
}
|
|
|
|
content.WriteString(fmt.Sprintf("%d. %s %s [ID: %s]\n",
|
|
i+1, statusIcon, task.JobName, task.ID[:8]))
|
|
content.WriteString(fmt.Sprintf(" Priority: %d | Status: %s\n",
|
|
task.Priority, task.Status))
|
|
if task.Args != "" {
|
|
content.WriteString(fmt.Sprintf(" Args: %s\n", task.Args))
|
|
}
|
|
content.WriteString(fmt.Sprintf(" Created: %s\n",
|
|
task.CreatedAt.Format("2006-01-02 15:04:05")))
|
|
|
|
if task.StartedAt != nil {
|
|
duration := time.Since(*task.StartedAt)
|
|
content.WriteString(fmt.Sprintf(" Running for: %s\n",
|
|
duration.Round(time.Second)))
|
|
}
|
|
|
|
if task.Tracking != nil {
|
|
var tools []string
|
|
if task.Tracking.MLflow != nil && task.Tracking.MLflow.Enabled {
|
|
tools = append(tools, "MLflow")
|
|
}
|
|
if task.Tracking.TensorBoard != nil && task.Tracking.TensorBoard.Enabled {
|
|
tools = append(tools, "TensorBoard")
|
|
}
|
|
if task.Tracking.Wandb != nil && task.Tracking.Wandb.Enabled {
|
|
tools = append(tools, "Wandb")
|
|
}
|
|
if len(tools) > 0 {
|
|
content.WriteString(fmt.Sprintf(" Tracking: %s\n", strings.Join(tools, ", ")))
|
|
}
|
|
}
|
|
content.WriteString("\n")
|
|
}
|
|
}
|
|
|
|
return model.QueueLoadedMsg(content.String())
|
|
}
|
|
}
|
|
|
|
func (c *Controller) loadDatasets() tea.Cmd {
|
|
return func() tea.Msg {
|
|
datasets, err := c.taskQueue.ListDatasets()
|
|
if err != nil {
|
|
c.logger.Error("failed to load datasets", "error", err)
|
|
return model.StatusMsg{Text: "Failed to load datasets: " + err.Error(), Level: "error"}
|
|
}
|
|
c.logger.Info("loaded datasets", "count", len(datasets))
|
|
return model.DatasetsLoadedMsg(datasets)
|
|
}
|
|
}
|
|
|
|
func tickCmd() tea.Cmd {
|
|
return tea.Tick(time.Second, func(t time.Time) tea.Msg {
|
|
return model.TickMsg(t)
|
|
})
|
|
}
|