From d0c68772ea6a1522f9ee30fa010faf5d7d886b28 Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Fri, 20 Feb 2026 21:28:16 -0500 Subject: [PATCH] feat(cli): Implement unified run wrapper command - Fork child process and capture stdout/stderr via pipe - Parse FETCHML_METRIC key=value [step=N] lines from output - Write run_manifest.json with run metadata - Insert/update ml_runs table in SQLite with PID tracking - Stream output to output.log file - Support entrypoint from config or explicit command after -- --- cli/src/commands/run.zig | 595 ++++++++++++++++++++++----------------- 1 file changed, 338 insertions(+), 257 deletions(-) diff --git a/cli/src/commands/run.zig b/cli/src/commands/run.zig index c48afa0..7d79a0f 100644 --- a/cli/src/commands/run.zig +++ b/cli/src/commands/run.zig @@ -1,104 +1,25 @@ const std = @import("std"); const config = @import("../config.zig"); const db = @import("../db.zig"); +const core = @import("../core.zig"); const colors = @import("../utils/colors.zig"); +const manifest_lib = @import("../manifest.zig"); +const Manifest = manifest_lib.RunManifest; -const RunOptions = struct { - json: bool = false, - help: bool = false, -}; - +/// Run command - always executes locally +/// Usage: +/// ml run # Use entrypoint from config + args +/// ml run --lr 0.001 # Args appended to entrypoint +/// ml run -- python train.py # Explicit command pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void { - var options = RunOptions{}; - var command_args = std.ArrayList([]const u8).initCapacity(allocator, 10) catch |err| { - return err; - }; + var flags = core.flags.CommonFlags{}; + var command_args = try core.flags.parseCommon(allocator, args, &flags); defer command_args.deinit(allocator); - // Parse flags - var i: usize = 0; - while (i < args.len) : (i += 1) { - const arg = args[i]; - if (std.mem.eql(u8, arg, "--json")) { - options.json = true; - } else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { - options.help = true; - } else { - try command_args.append(allocator, arg); - } - } + core.output.init(if (flags.json) .json else .text); - if (command_args.items.len < 1 or options.help) { - try printUsage(); - return; - } - - const command = command_args.items[0]; - - // Load config to determine mode - const cfg = try config.Config.load(allocator); - defer { - var mut_cfg = cfg; - mut_cfg.deinit(allocator); - } - - if (std.mem.eql(u8, command, "start")) { - if (cfg.isLocalMode()) { - try executeStartLocal(allocator, command_args.items[1..], &options); - } else { - try executeStartServer(allocator, command_args.items[1..], &options); - } - } else if (std.mem.eql(u8, command, "finish")) { - if (cfg.isLocalMode()) { - try executeFinishLocal(allocator, command_args.items[1..], &options); - } else { - try executeFinishServer(allocator, command_args.items[1..], &options); - } - } else if (std.mem.eql(u8, command, "fail")) { - if (cfg.isLocalMode()) { - try executeFailLocal(allocator, command_args.items[1..], &options); - } else { - try executeFailServer(allocator, command_args.items[1..], &options); - } - } else if (std.mem.eql(u8, command, "list")) { - if (cfg.isLocalMode()) { - try executeListLocal(allocator, &options); - } else { - try executeListServer(allocator, &options); - } - } else { - if (options.json) { - std.debug.print("{{\"success\":false,\"command\":\"run\",\"error\":\"Unknown command: {s}\"}}\n", .{command}); - } else { - colors.printError("Unknown command: {s}\n", .{command}); - try printUsage(); - } - } -} - -// Local mode implementations -fn executeStartLocal(allocator: std.mem.Allocator, args: []const []const u8, options: *const RunOptions) !void { - var experiment_id: ?[]const u8 = null; - var run_name: ?[]const u8 = null; - - var i: usize = 0; - while (i < args.len) : (i += 1) { - if (std.mem.eql(u8, args[i], "--experiment") and i + 1 < args.len) { - experiment_id = args[i + 1]; - i += 1; - } else if (std.mem.eql(u8, args[i], "--name") and i + 1 < args.len) { - run_name = args[i + 1]; - i += 1; - } - } - - if (experiment_id == null) { - if (options.json) { - std.debug.print("{{\"success\":false,\"command\":\"run.start\",\"error\":\"--experiment is required\"}}\n", .{}); - } else { - colors.printError("Error: --experiment is required\n", .{}); - } - return error.MissingArgument; + if (flags.help) { + return printUsage(); } const cfg = try config.Config.load(allocator); @@ -107,219 +28,379 @@ fn executeStartLocal(allocator: std.mem.Allocator, args: []const []const u8, opt mut_cfg.deinit(allocator); } + // Parse command: entrypoint + args, or explicit -- command + const command = try resolveCommand(allocator, &cfg, command_args.items); + defer freeCommand(allocator, command); + + // Generate run_id + const run_id = try db.generateUUID(allocator); + defer allocator.free(run_id); + + // Determine experiment name + const experiment_name = if (cfg.experiment) |exp| exp.name else "default"; + + // Build artifact path + const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{ + cfg.artifact_path, + experiment_name, + run_id, + }); + defer allocator.free(artifact_path); + + // Create run directory + std.fs.makeDirAbsolute(artifact_path) catch |err| { + if (err != error.PathAlreadyExists) { + std.log.err("Failed to create run directory: {}", .{err}); + return error.MkdirFailed; + } + }; + + // Get DB path and initialize if needed (lazy bootstrap) const db_path = try cfg.getDBPath(allocator); defer allocator.free(db_path); - var database = try db.DB.init(allocator, db_path); + var database = try initOrOpenDB(allocator, db_path); defer database.close(); - const run_id = try db.generateUUID(allocator); - defer allocator.free(run_id); + // Write run manifest (status=RUNNING) + const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ + artifact_path, + "run_manifest.json", + }); + defer allocator.free(manifest_path); const timestamp = try db.currentTimestamp(allocator); defer allocator.free(timestamp); - const sql = "INSERT INTO ml_runs (run_id, experiment_id, name, status, start_time) VALUES (?, ?, ?, 'RUNNING', ?);"; + var manifest = Manifest.init(allocator); + manifest.run_id = run_id; + manifest.experiment = experiment_name; + manifest.command = try std.mem.join(allocator, " ", command); + manifest.args = try duplicateStrings(allocator, command); + manifest.started_at = try allocator.dupe(u8, timestamp); + manifest.status = "RUNNING"; + manifest.artifact_path = artifact_path; + manifest.synced = false; + + // Insert run into database + const run_name = try std.fmt.allocPrint(allocator, "run-{s}", .{run_id[0..8]}); + defer allocator.free(run_name); + + const sql = "INSERT INTO ml_runs (run_id, experiment_id, name, status, start_time, synced) VALUES (?, ?, ?, 'RUNNING', ?, 0);"; const stmt = try database.prepare(sql); defer db.DB.finalize(stmt); try db.DB.bindText(stmt, 1, run_id); - try db.DB.bindText(stmt, 2, experiment_id.?); - try db.DB.bindText(stmt, 3, run_name orelse "unnamed-run"); + try db.DB.bindText(stmt, 2, experiment_name); + try db.DB.bindText(stmt, 3, run_name); try db.DB.bindText(stmt, 4, timestamp); - _ = try db.DB.step(stmt); - if (options.json) { - std.debug.print("{{\"success\":true,\"command\":\"run.start\",\"data\":{{\"run_id\":\"{s}\",\"experiment_id\":\"{s}\",\"name\":\"{s}\",\"status\":\"RUNNING\"}}}}\n", .{ run_id, experiment_id.?, run_name orelse "unnamed-run" }); - } else { - colors.printSuccess("✓ Started run: {s}\n", .{run_name orelse "unnamed-run"}); - colors.printInfo(" run_id: {s}\n", .{run_id}); - colors.printInfo(" experiment_id: {s}\n", .{experiment_id.?}); - } -} + // Write manifest + try manifest_lib.writeManifest(manifest, manifest_path); -fn executeFinishLocal(allocator: std.mem.Allocator, args: []const []const u8, options: *const RunOptions) !void { - var run_id: ?[]const u8 = null; + // Fork and execute + const output_log_path = try std.fs.path.join(allocator, &[_][]const u8{ + artifact_path, + "output.log", + }); + defer allocator.free(output_log_path); - var i: usize = 0; - while (i < args.len) : (i += 1) { - if (std.mem.eql(u8, args[i], "--run") and i + 1 < args.len) { - run_id = args[i + 1]; - i += 1; - } - } + // Execute and capture + const exit_code = try executeAndCapture( + allocator, + command, + output_log_path, + &database, + run_id, + ); - if (run_id == null) { - if (options.json) { - std.debug.print("{{\"success\":false,\"command\":\"run.finish\",\"error\":\"--run is required\"}}\n", .{}); - } else { - colors.printError("Error: --run is required\n", .{}); - } - return error.MissingArgument; - } + // Update run status in database + const end_time = try db.currentTimestamp(allocator); + defer allocator.free(end_time); - const cfg = try config.Config.load(allocator); - defer { - var mut_cfg = cfg; - mut_cfg.deinit(allocator); - } + const status = if (exit_code == 0) "FINISHED" else "FAILED"; - const db_path = try cfg.getDBPath(allocator); - defer allocator.free(db_path); + const update_sql = "UPDATE ml_runs SET status = ?, end_time = ?, exit_code = ?, pid = NULL WHERE run_id = ?;"; + const update_stmt = try database.prepare(update_sql); + defer db.DB.finalize(update_stmt); - var database = try db.DB.init(allocator, db_path); - defer database.close(); + try db.DB.bindText(update_stmt, 1, status); + try db.DB.bindText(update_stmt, 2, end_time); + try db.DB.bindInt64(update_stmt, 3, exit_code); + try db.DB.bindText(update_stmt, 4, run_id); + _ = try db.DB.step(update_stmt); - const timestamp = try db.currentTimestamp(allocator); - defer allocator.free(timestamp); + // Update manifest + try manifest_lib.updateManifestStatus(manifest_path, status, exit_code, allocator); - const sql = "UPDATE ml_runs SET status = 'FINISHED', end_time = ? WHERE run_id = ?;"; - const stmt = try database.prepare(sql); - defer db.DB.finalize(stmt); - - try db.DB.bindText(stmt, 1, timestamp); - try db.DB.bindText(stmt, 2, run_id.?); - - _ = try db.DB.step(stmt); + // Checkpoint WAL database.checkpointOnExit(); - if (options.json) { - std.debug.print("{{\"success\":true,\"command\":\"run.finish\",\"data\":{{\"run_id\":\"{s}\",\"status\":\"FINISHED\"}}}}\n", .{run_id.?}); + // Print result + if (flags.json) { + std.debug.print("{{\"success\":true,\"run_id\":\"{s}\",\"status\":\"{s}\",\"exit_code\":{d}}}\n", .{ + run_id, + status, + exit_code, + }); } else { - colors.printSuccess("✓ Finished run: {s}\n", .{run_id.?}); + colors.printSuccess("✓ Run {s} complete ({s})\n", .{ run_id[0..8], status }); + if (cfg.sync_uri.len > 0) { + colors.printInfo("↑ queued for sync\n", .{}); + } } } -fn executeFailLocal(allocator: std.mem.Allocator, args: []const []const u8, options: *const RunOptions) !void { - var run_id: ?[]const u8 = null; - - var i: usize = 0; - while (i < args.len) : (i += 1) { - if (std.mem.eql(u8, args[i], "--run") and i + 1 < args.len) { - run_id = args[i + 1]; - i += 1; +/// Resolve command from entrypoint + args, or explicit -- command +fn resolveCommand(allocator: std.mem.Allocator, cfg: *const config.Config, args: []const []const u8) ![][]const u8 { + // Check for explicit -- separator + var double_dash_idx: ?usize = null; + for (args, 0..) |arg, i| { + if (std.mem.eql(u8, arg, "--")) { + double_dash_idx = i; + break; } } - if (run_id == null) { - if (options.json) { - std.debug.print("{{\"success\":false,\"command\":\"run.fail\",\"error\":\"--run is required\"}}\n", .{}); - } else { - colors.printError("Error: --run is required\n", .{}); + if (double_dash_idx) |idx| { + // Explicit command after -- + if (idx + 1 >= args.len) { + std.log.err("No command provided after --", .{}); + return error.NoCommand; } - return error.MissingArgument; + return try allocator.dupe([]const u8, args[idx + 1 ..]); } - const cfg = try config.Config.load(allocator); + // Use entrypoint from config + args + if (cfg.experiment) |exp| { + if (exp.entrypoint.len > 0) { + // Split entrypoint on spaces + var argv = std.ArrayList([]const u8).init(allocator); + + // Parse entrypoint (split on spaces) + var iter = std.mem.splitScalar(u8, exp.entrypoint, ' '); + while (iter.next()) |part| { + if (part.len > 0) { + try argv.append(try allocator.dupe(u8, part)); + } + } + + // Append args + for (args) |arg| { + try argv.append(try allocator.dupe(u8, arg)); + } + + return argv.toOwnedSlice(); + } + } + + // No entrypoint configured and no explicit command + std.log.err("No entrypoint configured. Set entrypoint in .fetchml/config.toml or use: ml run -- ", .{}); + return error.NoEntrypoint; +} + +/// Free command array +fn freeCommand(allocator: std.mem.Allocator, command: [][]const u8) void { + for (command) |arg| { + allocator.free(arg); + } + allocator.free(command); +} + +/// Duplicate array of strings +fn duplicateStrings(allocator: std.mem.Allocator, strings: []const []const u8) ![][]const u8 { + const result = try allocator.alloc([]const u8, strings.len); + for (strings, 0..) |s, i| { + result[i] = try allocator.dupe(u8, s); + } + return result; +} + +/// Initialize or open database (lazy bootstrap) +fn initOrOpenDB(allocator: std.mem.Allocator, db_path: []const u8) !db.DB { + const db_exists = blk: { + std.fs.accessAbsolute(db_path, .{}) catch |err| { + if (err == error.FileNotFound) break :blk false; + }; + break :blk true; + }; + + const database = try db.DB.init(allocator, db_path); + + if (!db_exists) { + std.log.info("local mode active — tracking to {s}", .{db_path}); + } + + return database; +} + +/// Execute command and capture output, parsing FETCHML_METRIC lines +fn executeAndCapture( + allocator: std.mem.Allocator, + command: []const []const u8, + output_path: []const u8, + database: *db.DB, + run_id: []const u8, +) !i32 { + // Create output file + var output_file = try std.fs.cwd().createFile(output_path, .{}); + defer output_file.close(); + const output_writer = output_file.writer(); + + // Create pipe for stdout + const pipe = try std.posix.pipe(); defer { - var mut_cfg = cfg; - mut_cfg.deinit(allocator); + std.posix.close(pipe[0]); + std.posix.close(pipe[1]); } - const db_path = try cfg.getDBPath(allocator); - defer allocator.free(db_path); + // Fork child process + const pid = try std.posix.fork(); - var database = try db.DB.init(allocator, db_path); - defer database.close(); + if (pid == 0) { + // Child process + std.posix.close(pipe[0]); // Close read end - const timestamp = try db.currentTimestamp(allocator); - defer allocator.free(timestamp); + // Redirect stdout to pipe + _ = std.posix.dup2(pipe[1], std.posix.STDOUT_FILENO); + _ = std.posix.dup2(pipe[1], std.posix.STDERR_FILENO); + std.posix.close(pipe[1]); - const sql = "UPDATE ml_runs SET status = 'FAILED', end_time = ? WHERE run_id = ?;"; + // Execute command + const err = std.posix.execvpe(command[0], command, &[_:null]?[*:0]const u8{null}); + std.log.err("Failed to execute {s}: {}", .{ command[0], err }); + std.process.exit(1); + unreachable; + } + + // Parent process + std.posix.close(pipe[1]); // Close write end + + // Store PID in database + const pid_sql = "UPDATE ml_runs SET pid = ? WHERE run_id = ?;"; + const pid_stmt = try database.prepare(pid_sql); + defer db.DB.finalize(pid_stmt); + try db.DB.bindInt64(pid_stmt, 1, pid); + try db.DB.bindText(pid_stmt, 2, run_id); + _ = try db.DB.step(pid_stmt); + + // Read from pipe and parse FETCHML_METRIC lines + var buf: [4096]u8 = undefined; + var line_buf: [1024]u8 = undefined; + var line_len: usize = 0; + + while (true) { + const bytes_read = std.posix.read(pipe[0], &buf) catch |err| { + if (err == error.WouldBlock or err == error.BrokenPipe) break; + break; + }; + + if (bytes_read == 0) break; + + // Write to output file + try output_writer.writeAll(buf[0..bytes_read]); + + // Parse lines + for (buf[0..bytes_read]) |byte| { + if (byte == '\n' or line_len >= line_buf.len - 1) { + if (line_len > 0) { + line_buf[line_len] = 0; + const line = line_buf[0..line_len]; + try parseAndLogMetric(allocator, line, database, run_id); + line_len = 0; + } + } else { + line_buf[line_len] = byte; + line_len += 1; + } + } + } + + // Wait for child + var status: u32 = 0; + _ = std.posix.waitpid(pid, &status, 0); + + // Parse exit code + if (std.os.linux.W.IFEXITED(status)) { + return std.os.linux.W.EXITSTATUS(status); + } else if (std.os.linux.W.IFSIGNALED(status)) { + return 128 + std.os.linux.W.TERMSIG(status); + } + + return -1; +} + +/// Parse FETCHML_METRIC line and log to database +/// Format: FETCHML_METRIC key=value [step=N] +fn parseAndLogMetric( + allocator: std.mem.Allocator, + line: []const u8, + database: *db.DB, + run_id: []const u8, +) !void { + const trimmed = std.mem.trim(u8, line, " \t\r"); + + // Check prefix + const prefix = "FETCHML_METRIC"; + if (!std.mem.startsWith(u8, trimmed, prefix)) return; + + // Get the rest after prefix + var rest = trimmed[prefix.len..]; + rest = std.mem.trimLeft(u8, rest, " \t"); + + // Parse key=value + var iter = std.mem.splitScalar(u8, rest, ' '); + const kv_part = iter.next() orelse return; + + var kv_iter = std.mem.splitScalar(u8, kv_part, '='); + const key = kv_iter.next() orelse return; + const value_str = kv_iter.next() orelse return; + + // Validate key: [a-zA-Z][a-zA-Z0-9_]* + if (key.len == 0) return; + const first_char = key[0]; + if (!std.ascii.isAlphabetic(first_char)) return; + for (key[1..]) |c| { + if (!std.ascii.isAlphanumeric(c) and c != '_') return; + } + + // Parse value + const value = std.fmt.parseFloat(f64, value_str) catch return; + + // Parse optional step + var step: i64 = 0; + while (iter.next()) |part| { + if (std.mem.startsWith(u8, part, "step=")) { + const step_str = part[5..]; + step = std.fmt.parseInt(i64, step_str, 10) catch 0; + if (step < 0) step = 0; + } + } + + // Insert metric + const sql = "INSERT INTO ml_metrics (run_id, key, value, step) VALUES (?, ?, ?, ?);"; const stmt = try database.prepare(sql); defer db.DB.finalize(stmt); - try db.DB.bindText(stmt, 1, timestamp); - try db.DB.bindText(stmt, 2, run_id.?); - + try db.DB.bindText(stmt, 1, run_id); + try db.DB.bindText(stmt, 2, key); + try db.DB.bindDouble(stmt, 3, value); + try db.DB.bindInt64(stmt, 4, step); _ = try db.DB.step(stmt); - database.checkpointOnExit(); - if (options.json) { - std.debug.print("{{\"success\":true,\"command\":\"run.fail\",\"data\":{{\"run_id\":\"{s}\",\"status\":\"FAILED\"}}}}\n", .{run_id.?}); - } else { - colors.printSuccess("✓ Marked run as failed: {s}\n", .{run_id.?}); - } -} - -fn executeListLocal(allocator: std.mem.Allocator, options: *const RunOptions) !void { - const cfg = try config.Config.load(allocator); - defer { - var mut_cfg = cfg; - mut_cfg.deinit(allocator); - } - - const db_path = try cfg.getDBPath(allocator); - defer allocator.free(db_path); - - var database = try db.DB.init(allocator, db_path); - defer database.close(); - - const sql = "SELECT run_id, experiment_id, name, status, start_time FROM ml_runs ORDER BY start_time DESC;"; - const stmt = try database.prepare(sql); - defer db.DB.finalize(stmt); - - if (options.json) { - std.debug.print("{{\"success\":true,\"command\":\"run.list\",\"data\":{{\"runs\":[", .{}); - var first = true; - while (try db.DB.step(stmt)) { - if (!first) std.debug.print(",", .{}); - first = false; - const run_id = db.DB.columnText(stmt, 0); - const exp_id = db.DB.columnText(stmt, 1); - const name = db.DB.columnText(stmt, 2); - const status = db.DB.columnText(stmt, 3); - const start = db.DB.columnText(stmt, 4); - std.debug.print("{{\"run_id\":\"{s}\",\"experiment_id\":\"{s}\",\"name\":\"{s}\",\"status\":\"{s}\",\"start_time\":\"{s}\"}}", .{ run_id, exp_id, name, status, start }); - } - std.debug.print("]}}}\n", .{}); - } else { - colors.printInfo("\nRuns:\n", .{}); - colors.printInfo("{s:-<80}\n", .{""}); - var count: usize = 0; - while (try db.DB.step(stmt)) { - const run_id = db.DB.columnText(stmt, 0); - const exp_id = db.DB.columnText(stmt, 1); - const name = db.DB.columnText(stmt, 2); - const status = db.DB.columnText(stmt, 3); - std.debug.print("{s} | {s} | {s} | {s}\n", .{ run_id, exp_id, name, status }); - count += 1; - } - if (count == 0) { - colors.printWarning("No runs found. Start one with: ml run start --experiment \n", .{}); - } - } -} - -// Server mode stubs (to be implemented) -fn executeStartServer(_: std.mem.Allocator, _: []const []const u8, _: *const RunOptions) !void { - std.debug.print("Server mode run start not yet implemented\n", .{}); -} - -fn executeFinishServer(_: std.mem.Allocator, _: []const []const u8, _: *const RunOptions) !void { - std.debug.print("Server mode run finish not yet implemented\n", .{}); -} - -fn executeFailServer(_: std.mem.Allocator, _: []const []const u8, _: *const RunOptions) !void { - std.debug.print("Server mode run fail not yet implemented\n", .{}); -} - -fn executeListServer(_: std.mem.Allocator, _: *const RunOptions) !void { - std.debug.print("Server mode run list not yet implemented\n", .{}); + _ = allocator; } fn printUsage() !void { - colors.printInfo("Usage: ml run [options] [args]\n", .{}); - colors.printInfo("\nOptions:\n", .{}); - colors.printInfo(" --json Output structured JSON\n", .{}); - colors.printInfo(" --help, -h Show this help message\n", .{}); - colors.printInfo("\nCommands:\n", .{}); - colors.printInfo(" start --experiment [--name ] Start a new run\n", .{}); - colors.printInfo(" finish --run Mark run as finished\n", .{}); - colors.printInfo(" fail --run Mark run as failed\n", .{}); - colors.printInfo(" list List all runs\n", .{}); - colors.printInfo("\nExamples:\n", .{}); - colors.printInfo(" ml run start --experiment --name training\n", .{}); - colors.printInfo(" ml run finish --run \n", .{}); + std.debug.print("Usage: ml run [options] [args...]\n", .{}); + std.debug.print(" ml run -- [args...]\n\n", .{}); + std.debug.print("Execute a run locally with experiment tracking.\n\n", .{}); + std.debug.print("Options:\n", .{}); + std.debug.print(" --help, -h Show this help message\n", .{}); + std.debug.print(" --json Output structured JSON\n\n", .{}); + std.debug.print("Examples:\n", .{}); + std.debug.print(" ml run # Use entrypoint from config\n", .{}); + std.debug.print(" ml run --lr 0.001 # Append args to entrypoint\n", .{}); + std.debug.print(" ml run -- python train.py # Run explicit command\n", .{}); }