feat(cli): Implement unified run wrapper command

- Fork child process and capture stdout/stderr via pipe
- Parse FETCHML_METRIC key=value [step=N] lines from output
- Write run_manifest.json with run metadata
- Insert/update ml_runs table in SQLite with PID tracking
- Stream output to output.log file
- Support entrypoint from config or explicit command after --
This commit is contained in:
Jeremie Fraeys 2026-02-20 21:28:16 -05:00
parent 551597b5df
commit d0c68772ea
No known key found for this signature in database

View file

@ -1,104 +1,25 @@
const std = @import("std");
const config = @import("../config.zig");
const db = @import("../db.zig");
const core = @import("../core.zig");
const colors = @import("../utils/colors.zig");
const manifest_lib = @import("../manifest.zig");
const Manifest = manifest_lib.RunManifest;
const RunOptions = struct {
json: bool = false,
help: bool = false,
};
/// Run command - always executes locally
/// Usage:
/// ml run # Use entrypoint from config + args
/// ml run --lr 0.001 # Args appended to entrypoint
/// ml run -- python train.py # Explicit command
pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void {
var options = RunOptions{};
var command_args = std.ArrayList([]const u8).initCapacity(allocator, 10) catch |err| {
return err;
};
var flags = core.flags.CommonFlags{};
var command_args = try core.flags.parseCommon(allocator, args, &flags);
defer command_args.deinit(allocator);
// Parse flags
var i: usize = 0;
while (i < args.len) : (i += 1) {
const arg = args[i];
if (std.mem.eql(u8, arg, "--json")) {
options.json = true;
} else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
options.help = true;
} else {
try command_args.append(allocator, arg);
}
}
core.output.init(if (flags.json) .json else .text);
if (command_args.items.len < 1 or options.help) {
try printUsage();
return;
}
const command = command_args.items[0];
// Load config to determine mode
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
if (std.mem.eql(u8, command, "start")) {
if (cfg.isLocalMode()) {
try executeStartLocal(allocator, command_args.items[1..], &options);
} else {
try executeStartServer(allocator, command_args.items[1..], &options);
}
} else if (std.mem.eql(u8, command, "finish")) {
if (cfg.isLocalMode()) {
try executeFinishLocal(allocator, command_args.items[1..], &options);
} else {
try executeFinishServer(allocator, command_args.items[1..], &options);
}
} else if (std.mem.eql(u8, command, "fail")) {
if (cfg.isLocalMode()) {
try executeFailLocal(allocator, command_args.items[1..], &options);
} else {
try executeFailServer(allocator, command_args.items[1..], &options);
}
} else if (std.mem.eql(u8, command, "list")) {
if (cfg.isLocalMode()) {
try executeListLocal(allocator, &options);
} else {
try executeListServer(allocator, &options);
}
} else {
if (options.json) {
std.debug.print("{{\"success\":false,\"command\":\"run\",\"error\":\"Unknown command: {s}\"}}\n", .{command});
} else {
colors.printError("Unknown command: {s}\n", .{command});
try printUsage();
}
}
}
// Local mode implementations
fn executeStartLocal(allocator: std.mem.Allocator, args: []const []const u8, options: *const RunOptions) !void {
var experiment_id: ?[]const u8 = null;
var run_name: ?[]const u8 = null;
var i: usize = 0;
while (i < args.len) : (i += 1) {
if (std.mem.eql(u8, args[i], "--experiment") and i + 1 < args.len) {
experiment_id = args[i + 1];
i += 1;
} else if (std.mem.eql(u8, args[i], "--name") and i + 1 < args.len) {
run_name = args[i + 1];
i += 1;
}
}
if (experiment_id == null) {
if (options.json) {
std.debug.print("{{\"success\":false,\"command\":\"run.start\",\"error\":\"--experiment is required\"}}\n", .{});
} else {
colors.printError("Error: --experiment is required\n", .{});
}
return error.MissingArgument;
if (flags.help) {
return printUsage();
}
const cfg = try config.Config.load(allocator);
@ -107,219 +28,379 @@ fn executeStartLocal(allocator: std.mem.Allocator, args: []const []const u8, opt
mut_cfg.deinit(allocator);
}
// Parse command: entrypoint + args, or explicit -- command
const command = try resolveCommand(allocator, &cfg, command_args.items);
defer freeCommand(allocator, command);
// Generate run_id
const run_id = try db.generateUUID(allocator);
defer allocator.free(run_id);
// Determine experiment name
const experiment_name = if (cfg.experiment) |exp| exp.name else "default";
// Build artifact path
const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{
cfg.artifact_path,
experiment_name,
run_id,
});
defer allocator.free(artifact_path);
// Create run directory
std.fs.makeDirAbsolute(artifact_path) catch |err| {
if (err != error.PathAlreadyExists) {
std.log.err("Failed to create run directory: {}", .{err});
return error.MkdirFailed;
}
};
// Get DB path and initialize if needed (lazy bootstrap)
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
var database = try db.DB.init(allocator, db_path);
var database = try initOrOpenDB(allocator, db_path);
defer database.close();
const run_id = try db.generateUUID(allocator);
defer allocator.free(run_id);
// Write run manifest (status=RUNNING)
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{
artifact_path,
"run_manifest.json",
});
defer allocator.free(manifest_path);
const timestamp = try db.currentTimestamp(allocator);
defer allocator.free(timestamp);
const sql = "INSERT INTO ml_runs (run_id, experiment_id, name, status, start_time) VALUES (?, ?, ?, 'RUNNING', ?);";
var manifest = Manifest.init(allocator);
manifest.run_id = run_id;
manifest.experiment = experiment_name;
manifest.command = try std.mem.join(allocator, " ", command);
manifest.args = try duplicateStrings(allocator, command);
manifest.started_at = try allocator.dupe(u8, timestamp);
manifest.status = "RUNNING";
manifest.artifact_path = artifact_path;
manifest.synced = false;
// Insert run into database
const run_name = try std.fmt.allocPrint(allocator, "run-{s}", .{run_id[0..8]});
defer allocator.free(run_name);
const sql = "INSERT INTO ml_runs (run_id, experiment_id, name, status, start_time, synced) VALUES (?, ?, ?, 'RUNNING', ?, 0);";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
try db.DB.bindText(stmt, 1, run_id);
try db.DB.bindText(stmt, 2, experiment_id.?);
try db.DB.bindText(stmt, 3, run_name orelse "unnamed-run");
try db.DB.bindText(stmt, 2, experiment_name);
try db.DB.bindText(stmt, 3, run_name);
try db.DB.bindText(stmt, 4, timestamp);
_ = try db.DB.step(stmt);
if (options.json) {
std.debug.print("{{\"success\":true,\"command\":\"run.start\",\"data\":{{\"run_id\":\"{s}\",\"experiment_id\":\"{s}\",\"name\":\"{s}\",\"status\":\"RUNNING\"}}}}\n", .{ run_id, experiment_id.?, run_name orelse "unnamed-run" });
} else {
colors.printSuccess("✓ Started run: {s}\n", .{run_name orelse "unnamed-run"});
colors.printInfo(" run_id: {s}\n", .{run_id});
colors.printInfo(" experiment_id: {s}\n", .{experiment_id.?});
}
}
// Write manifest
try manifest_lib.writeManifest(manifest, manifest_path);
fn executeFinishLocal(allocator: std.mem.Allocator, args: []const []const u8, options: *const RunOptions) !void {
var run_id: ?[]const u8 = null;
// Fork and execute
const output_log_path = try std.fs.path.join(allocator, &[_][]const u8{
artifact_path,
"output.log",
});
defer allocator.free(output_log_path);
var i: usize = 0;
while (i < args.len) : (i += 1) {
if (std.mem.eql(u8, args[i], "--run") and i + 1 < args.len) {
run_id = args[i + 1];
i += 1;
}
}
// Execute and capture
const exit_code = try executeAndCapture(
allocator,
command,
output_log_path,
&database,
run_id,
);
if (run_id == null) {
if (options.json) {
std.debug.print("{{\"success\":false,\"command\":\"run.finish\",\"error\":\"--run is required\"}}\n", .{});
} else {
colors.printError("Error: --run is required\n", .{});
}
return error.MissingArgument;
}
// Update run status in database
const end_time = try db.currentTimestamp(allocator);
defer allocator.free(end_time);
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
const status = if (exit_code == 0) "FINISHED" else "FAILED";
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
const update_sql = "UPDATE ml_runs SET status = ?, end_time = ?, exit_code = ?, pid = NULL WHERE run_id = ?;";
const update_stmt = try database.prepare(update_sql);
defer db.DB.finalize(update_stmt);
var database = try db.DB.init(allocator, db_path);
defer database.close();
try db.DB.bindText(update_stmt, 1, status);
try db.DB.bindText(update_stmt, 2, end_time);
try db.DB.bindInt64(update_stmt, 3, exit_code);
try db.DB.bindText(update_stmt, 4, run_id);
_ = try db.DB.step(update_stmt);
const timestamp = try db.currentTimestamp(allocator);
defer allocator.free(timestamp);
// Update manifest
try manifest_lib.updateManifestStatus(manifest_path, status, exit_code, allocator);
const sql = "UPDATE ml_runs SET status = 'FINISHED', end_time = ? WHERE run_id = ?;";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
try db.DB.bindText(stmt, 1, timestamp);
try db.DB.bindText(stmt, 2, run_id.?);
_ = try db.DB.step(stmt);
// Checkpoint WAL
database.checkpointOnExit();
if (options.json) {
std.debug.print("{{\"success\":true,\"command\":\"run.finish\",\"data\":{{\"run_id\":\"{s}\",\"status\":\"FINISHED\"}}}}\n", .{run_id.?});
// Print result
if (flags.json) {
std.debug.print("{{\"success\":true,\"run_id\":\"{s}\",\"status\":\"{s}\",\"exit_code\":{d}}}\n", .{
run_id,
status,
exit_code,
});
} else {
colors.printSuccess("✓ Finished run: {s}\n", .{run_id.?});
colors.printSuccess("✓ Run {s} complete ({s})\n", .{ run_id[0..8], status });
if (cfg.sync_uri.len > 0) {
colors.printInfo("↑ queued for sync\n", .{});
}
}
}
fn executeFailLocal(allocator: std.mem.Allocator, args: []const []const u8, options: *const RunOptions) !void {
var run_id: ?[]const u8 = null;
var i: usize = 0;
while (i < args.len) : (i += 1) {
if (std.mem.eql(u8, args[i], "--run") and i + 1 < args.len) {
run_id = args[i + 1];
i += 1;
/// Resolve command from entrypoint + args, or explicit -- command
fn resolveCommand(allocator: std.mem.Allocator, cfg: *const config.Config, args: []const []const u8) ![][]const u8 {
// Check for explicit -- separator
var double_dash_idx: ?usize = null;
for (args, 0..) |arg, i| {
if (std.mem.eql(u8, arg, "--")) {
double_dash_idx = i;
break;
}
}
if (run_id == null) {
if (options.json) {
std.debug.print("{{\"success\":false,\"command\":\"run.fail\",\"error\":\"--run is required\"}}\n", .{});
} else {
colors.printError("Error: --run is required\n", .{});
if (double_dash_idx) |idx| {
// Explicit command after --
if (idx + 1 >= args.len) {
std.log.err("No command provided after --", .{});
return error.NoCommand;
}
return error.MissingArgument;
return try allocator.dupe([]const u8, args[idx + 1 ..]);
}
const cfg = try config.Config.load(allocator);
// Use entrypoint from config + args
if (cfg.experiment) |exp| {
if (exp.entrypoint.len > 0) {
// Split entrypoint on spaces
var argv = std.ArrayList([]const u8).init(allocator);
// Parse entrypoint (split on spaces)
var iter = std.mem.splitScalar(u8, exp.entrypoint, ' ');
while (iter.next()) |part| {
if (part.len > 0) {
try argv.append(try allocator.dupe(u8, part));
}
}
// Append args
for (args) |arg| {
try argv.append(try allocator.dupe(u8, arg));
}
return argv.toOwnedSlice();
}
}
// No entrypoint configured and no explicit command
std.log.err("No entrypoint configured. Set entrypoint in .fetchml/config.toml or use: ml run -- <command>", .{});
return error.NoEntrypoint;
}
/// Free command array
fn freeCommand(allocator: std.mem.Allocator, command: [][]const u8) void {
for (command) |arg| {
allocator.free(arg);
}
allocator.free(command);
}
/// Duplicate array of strings
fn duplicateStrings(allocator: std.mem.Allocator, strings: []const []const u8) ![][]const u8 {
const result = try allocator.alloc([]const u8, strings.len);
for (strings, 0..) |s, i| {
result[i] = try allocator.dupe(u8, s);
}
return result;
}
/// Initialize or open database (lazy bootstrap)
fn initOrOpenDB(allocator: std.mem.Allocator, db_path: []const u8) !db.DB {
const db_exists = blk: {
std.fs.accessAbsolute(db_path, .{}) catch |err| {
if (err == error.FileNotFound) break :blk false;
};
break :blk true;
};
const database = try db.DB.init(allocator, db_path);
if (!db_exists) {
std.log.info("local mode active — tracking to {s}", .{db_path});
}
return database;
}
/// Execute command and capture output, parsing FETCHML_METRIC lines
fn executeAndCapture(
allocator: std.mem.Allocator,
command: []const []const u8,
output_path: []const u8,
database: *db.DB,
run_id: []const u8,
) !i32 {
// Create output file
var output_file = try std.fs.cwd().createFile(output_path, .{});
defer output_file.close();
const output_writer = output_file.writer();
// Create pipe for stdout
const pipe = try std.posix.pipe();
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
std.posix.close(pipe[0]);
std.posix.close(pipe[1]);
}
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
// Fork child process
const pid = try std.posix.fork();
var database = try db.DB.init(allocator, db_path);
defer database.close();
if (pid == 0) {
// Child process
std.posix.close(pipe[0]); // Close read end
const timestamp = try db.currentTimestamp(allocator);
defer allocator.free(timestamp);
// Redirect stdout to pipe
_ = std.posix.dup2(pipe[1], std.posix.STDOUT_FILENO);
_ = std.posix.dup2(pipe[1], std.posix.STDERR_FILENO);
std.posix.close(pipe[1]);
const sql = "UPDATE ml_runs SET status = 'FAILED', end_time = ? WHERE run_id = ?;";
// Execute command
const err = std.posix.execvpe(command[0], command, &[_:null]?[*:0]const u8{null});
std.log.err("Failed to execute {s}: {}", .{ command[0], err });
std.process.exit(1);
unreachable;
}
// Parent process
std.posix.close(pipe[1]); // Close write end
// Store PID in database
const pid_sql = "UPDATE ml_runs SET pid = ? WHERE run_id = ?;";
const pid_stmt = try database.prepare(pid_sql);
defer db.DB.finalize(pid_stmt);
try db.DB.bindInt64(pid_stmt, 1, pid);
try db.DB.bindText(pid_stmt, 2, run_id);
_ = try db.DB.step(pid_stmt);
// Read from pipe and parse FETCHML_METRIC lines
var buf: [4096]u8 = undefined;
var line_buf: [1024]u8 = undefined;
var line_len: usize = 0;
while (true) {
const bytes_read = std.posix.read(pipe[0], &buf) catch |err| {
if (err == error.WouldBlock or err == error.BrokenPipe) break;
break;
};
if (bytes_read == 0) break;
// Write to output file
try output_writer.writeAll(buf[0..bytes_read]);
// Parse lines
for (buf[0..bytes_read]) |byte| {
if (byte == '\n' or line_len >= line_buf.len - 1) {
if (line_len > 0) {
line_buf[line_len] = 0;
const line = line_buf[0..line_len];
try parseAndLogMetric(allocator, line, database, run_id);
line_len = 0;
}
} else {
line_buf[line_len] = byte;
line_len += 1;
}
}
}
// Wait for child
var status: u32 = 0;
_ = std.posix.waitpid(pid, &status, 0);
// Parse exit code
if (std.os.linux.W.IFEXITED(status)) {
return std.os.linux.W.EXITSTATUS(status);
} else if (std.os.linux.W.IFSIGNALED(status)) {
return 128 + std.os.linux.W.TERMSIG(status);
}
return -1;
}
/// Parse FETCHML_METRIC line and log to database
/// Format: FETCHML_METRIC key=value [step=N]
fn parseAndLogMetric(
allocator: std.mem.Allocator,
line: []const u8,
database: *db.DB,
run_id: []const u8,
) !void {
const trimmed = std.mem.trim(u8, line, " \t\r");
// Check prefix
const prefix = "FETCHML_METRIC";
if (!std.mem.startsWith(u8, trimmed, prefix)) return;
// Get the rest after prefix
var rest = trimmed[prefix.len..];
rest = std.mem.trimLeft(u8, rest, " \t");
// Parse key=value
var iter = std.mem.splitScalar(u8, rest, ' ');
const kv_part = iter.next() orelse return;
var kv_iter = std.mem.splitScalar(u8, kv_part, '=');
const key = kv_iter.next() orelse return;
const value_str = kv_iter.next() orelse return;
// Validate key: [a-zA-Z][a-zA-Z0-9_]*
if (key.len == 0) return;
const first_char = key[0];
if (!std.ascii.isAlphabetic(first_char)) return;
for (key[1..]) |c| {
if (!std.ascii.isAlphanumeric(c) and c != '_') return;
}
// Parse value
const value = std.fmt.parseFloat(f64, value_str) catch return;
// Parse optional step
var step: i64 = 0;
while (iter.next()) |part| {
if (std.mem.startsWith(u8, part, "step=")) {
const step_str = part[5..];
step = std.fmt.parseInt(i64, step_str, 10) catch 0;
if (step < 0) step = 0;
}
}
// Insert metric
const sql = "INSERT INTO ml_metrics (run_id, key, value, step) VALUES (?, ?, ?, ?);";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
try db.DB.bindText(stmt, 1, timestamp);
try db.DB.bindText(stmt, 2, run_id.?);
try db.DB.bindText(stmt, 1, run_id);
try db.DB.bindText(stmt, 2, key);
try db.DB.bindDouble(stmt, 3, value);
try db.DB.bindInt64(stmt, 4, step);
_ = try db.DB.step(stmt);
database.checkpointOnExit();
if (options.json) {
std.debug.print("{{\"success\":true,\"command\":\"run.fail\",\"data\":{{\"run_id\":\"{s}\",\"status\":\"FAILED\"}}}}\n", .{run_id.?});
} else {
colors.printSuccess("✓ Marked run as failed: {s}\n", .{run_id.?});
}
}
fn executeListLocal(allocator: std.mem.Allocator, options: *const RunOptions) !void {
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
var database = try db.DB.init(allocator, db_path);
defer database.close();
const sql = "SELECT run_id, experiment_id, name, status, start_time FROM ml_runs ORDER BY start_time DESC;";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
if (options.json) {
std.debug.print("{{\"success\":true,\"command\":\"run.list\",\"data\":{{\"runs\":[", .{});
var first = true;
while (try db.DB.step(stmt)) {
if (!first) std.debug.print(",", .{});
first = false;
const run_id = db.DB.columnText(stmt, 0);
const exp_id = db.DB.columnText(stmt, 1);
const name = db.DB.columnText(stmt, 2);
const status = db.DB.columnText(stmt, 3);
const start = db.DB.columnText(stmt, 4);
std.debug.print("{{\"run_id\":\"{s}\",\"experiment_id\":\"{s}\",\"name\":\"{s}\",\"status\":\"{s}\",\"start_time\":\"{s}\"}}", .{ run_id, exp_id, name, status, start });
}
std.debug.print("]}}}\n", .{});
} else {
colors.printInfo("\nRuns:\n", .{});
colors.printInfo("{s:-<80}\n", .{""});
var count: usize = 0;
while (try db.DB.step(stmt)) {
const run_id = db.DB.columnText(stmt, 0);
const exp_id = db.DB.columnText(stmt, 1);
const name = db.DB.columnText(stmt, 2);
const status = db.DB.columnText(stmt, 3);
std.debug.print("{s} | {s} | {s} | {s}\n", .{ run_id, exp_id, name, status });
count += 1;
}
if (count == 0) {
colors.printWarning("No runs found. Start one with: ml run start --experiment <id>\n", .{});
}
}
}
// Server mode stubs (to be implemented)
fn executeStartServer(_: std.mem.Allocator, _: []const []const u8, _: *const RunOptions) !void {
std.debug.print("Server mode run start not yet implemented\n", .{});
}
fn executeFinishServer(_: std.mem.Allocator, _: []const []const u8, _: *const RunOptions) !void {
std.debug.print("Server mode run finish not yet implemented\n", .{});
}
fn executeFailServer(_: std.mem.Allocator, _: []const []const u8, _: *const RunOptions) !void {
std.debug.print("Server mode run fail not yet implemented\n", .{});
}
fn executeListServer(_: std.mem.Allocator, _: *const RunOptions) !void {
std.debug.print("Server mode run list not yet implemented\n", .{});
_ = allocator;
}
fn printUsage() !void {
colors.printInfo("Usage: ml run [options] <command> [args]\n", .{});
colors.printInfo("\nOptions:\n", .{});
colors.printInfo(" --json Output structured JSON\n", .{});
colors.printInfo(" --help, -h Show this help message\n", .{});
colors.printInfo("\nCommands:\n", .{});
colors.printInfo(" start --experiment <id> [--name <name>] Start a new run\n", .{});
colors.printInfo(" finish --run <id> Mark run as finished\n", .{});
colors.printInfo(" fail --run <id> Mark run as failed\n", .{});
colors.printInfo(" list List all runs\n", .{});
colors.printInfo("\nExamples:\n", .{});
colors.printInfo(" ml run start --experiment <exp_id> --name training\n", .{});
colors.printInfo(" ml run finish --run <run_id>\n", .{});
std.debug.print("Usage: ml run [options] [args...]\n", .{});
std.debug.print(" ml run -- <command> [args...]\n\n", .{});
std.debug.print("Execute a run locally with experiment tracking.\n\n", .{});
std.debug.print("Options:\n", .{});
std.debug.print(" --help, -h Show this help message\n", .{});
std.debug.print(" --json Output structured JSON\n\n", .{});
std.debug.print("Examples:\n", .{});
std.debug.print(" ml run # Use entrypoint from config\n", .{});
std.debug.print(" ml run --lr 0.001 # Append args to entrypoint\n", .{});
std.debug.print(" ml run -- python train.py # Run explicit command\n", .{});
}