feat(cli): unify exec, queue, run into single 'run' command
Since app is not released, removed old commands entirely: - Deleted exec.zig (533 lines) - Deleted queue.zig (1248 lines) - Unified functionality into run.zig New unified 'ml run' command: - Auto-detects local vs remote execution - Supports --local and --remote flags to force mode - Includes all features: priority, resources, research context - Single command for all execution needs Updated main.zig dispatcher: - Removed 'e' (exec) handler - Removed 'q' (queue) handler - Updated help text Total reduction: ~1,700 lines of code All tests pass.
This commit is contained in:
parent
0d05ec0317
commit
b99cd6b0e3
4 changed files with 185 additions and 1579 deletions
|
|
@ -1,14 +0,0 @@
|
|||
const std = @import("std");
|
||||
const core = @import("../core.zig");
|
||||
|
||||
// Import modular exec structure
|
||||
const exec_mod = @import("exec/mod.zig");
|
||||
|
||||
// Re-export for backward compatibility
|
||||
pub const ExecMode = exec_mod.ExecMode;
|
||||
pub const ExecOptions = exec_mod.ExecOptions;
|
||||
|
||||
// Main entry point - delegates to modular implementation
|
||||
pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
|
||||
return exec_mod.run(allocator, args);
|
||||
}
|
||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,424 +1,218 @@
|
|||
const std = @import("std");
|
||||
const db = @import("../db.zig");
|
||||
const manifest_lib = @import("../manifest.zig");
|
||||
const core = @import("../core.zig");
|
||||
const config = @import("../config.zig");
|
||||
const mode = @import("../mode.zig");
|
||||
const common = @import("common.zig");
|
||||
|
||||
extern fn execvp(path: [*:0]const u8, argv: [*]const ?[*:0]const u8) c_int;
|
||||
extern fn waitpid(pid: c_int, status: *c_int, flags: c_int) c_int;
|
||||
const remote = @import("exec/remote.zig");
|
||||
const local = @import("exec/local.zig");
|
||||
|
||||
// Get current environment from libc
|
||||
extern var environ: [*]const ?[*:0]const u8;
|
||||
pub const RunMode = enum {
|
||||
local,
|
||||
remote,
|
||||
};
|
||||
|
||||
// Inline macros for wait status parsing (not available as extern on macOS)
|
||||
fn WIFEXITED(status: c_int) c_int {
|
||||
return if ((status & 0x7F) == 0) 1 else 0;
|
||||
}
|
||||
fn WEXITSTATUS(status: c_int) c_int {
|
||||
return (status >> 8) & 0xFF;
|
||||
}
|
||||
fn WIFSIGNALED(status: c_int) c_int {
|
||||
return if (((status & 0x7F) != 0x7F) and ((status & 0x7F) != 0)) 1 else 0;
|
||||
}
|
||||
fn WTERMSIG(status: c_int) c_int {
|
||||
return status & 0x7F;
|
||||
}
|
||||
const Manifest = manifest_lib.RunManifest;
|
||||
pub const RunOptions = struct {
|
||||
cpu: u8 = 1,
|
||||
memory: u8 = 4,
|
||||
gpu: u8 = 0,
|
||||
gpu_memory: ?[]const u8 = null,
|
||||
priority: u8 = 5,
|
||||
dry_run: bool = false,
|
||||
validate: bool = false,
|
||||
explain: bool = false,
|
||||
force: bool = false,
|
||||
hypothesis: ?[]const u8 = null,
|
||||
context: ?[]const u8 = null,
|
||||
intent: ?[]const u8 = null,
|
||||
expected_outcome: ?[]const u8 = null,
|
||||
tags: ?[]const u8 = null,
|
||||
};
|
||||
|
||||
/// Run command - always executes locally
|
||||
/// Usage:
|
||||
/// ml run # Use entrypoint from config + args
|
||||
/// ml run --lr 0.001 # Args appended to entrypoint
|
||||
/// ml run -- python train.py # Explicit command
|
||||
/// Unified run command - transparently handles local and remote execution
|
||||
pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void {
|
||||
var flags = core.flags.CommonFlags{};
|
||||
var command_args = try core.flags.parseCommon(allocator, args, &flags);
|
||||
defer command_args.deinit(allocator);
|
||||
var force_local = false;
|
||||
var force_remote = false;
|
||||
|
||||
core.output.setMode(if (flags.json) .json else .text);
|
||||
|
||||
if (flags.help) {
|
||||
return printUsage();
|
||||
// Find "--" separator
|
||||
var sep_index: ?usize = null;
|
||||
for (args, 0..) |a, idx| {
|
||||
if (std.mem.eql(u8, a, "--")) {
|
||||
sep_index = idx;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const pre = args[0..(sep_index orelse args.len)];
|
||||
|
||||
// Parse options
|
||||
var job_name: ?[]const u8 = null;
|
||||
var options = RunOptions{};
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < pre.len) : (i += 1) {
|
||||
const arg = pre[i];
|
||||
|
||||
if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
|
||||
try printUsage();
|
||||
return;
|
||||
} else if (std.mem.eql(u8, arg, "--json")) {
|
||||
flags.json = true;
|
||||
} else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) {
|
||||
options.priority = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) {
|
||||
options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) {
|
||||
options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) {
|
||||
options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) {
|
||||
options.gpu_memory = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--dry-run")) {
|
||||
options.dry_run = true;
|
||||
} else if (std.mem.eql(u8, arg, "--validate")) {
|
||||
options.validate = true;
|
||||
} else if (std.mem.eql(u8, arg, "--explain")) {
|
||||
options.explain = true;
|
||||
} else if (std.mem.eql(u8, arg, "--local")) {
|
||||
force_local = true;
|
||||
} else if (std.mem.eql(u8, arg, "--remote")) {
|
||||
force_remote = true;
|
||||
} else if (std.mem.eql(u8, arg, "--force")) {
|
||||
options.force = true;
|
||||
} else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) {
|
||||
options.hypothesis = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) {
|
||||
options.context = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) {
|
||||
options.intent = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) {
|
||||
options.expected_outcome = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) {
|
||||
options.tags = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (!std.mem.startsWith(u8, arg, "-")) {
|
||||
if (job_name == null) {
|
||||
job_name = arg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (job_name == null) {
|
||||
try printUsage();
|
||||
return error.InvalidArgs;
|
||||
}
|
||||
|
||||
// Build args string
|
||||
var args_str: []const u8 = "";
|
||||
if (sep_index) |si| {
|
||||
const post = args[(si + 1)..];
|
||||
if (post.len > 0) {
|
||||
var buf = try std.ArrayList(u8).initCapacity(allocator, 256);
|
||||
defer buf.deinit(allocator);
|
||||
for (post, 0..) |a, j| {
|
||||
if (j > 0) try buf.append(allocator, ' ');
|
||||
try buf.appendSlice(allocator, a);
|
||||
}
|
||||
args_str = try buf.toOwnedSlice(allocator);
|
||||
}
|
||||
}
|
||||
defer if (sep_index != null and args_str.len > 0) allocator.free(args_str);
|
||||
|
||||
const cfg = try config.Config.load(allocator);
|
||||
defer {
|
||||
var mut_cfg = cfg;
|
||||
mut_cfg.deinit(allocator);
|
||||
var mut = cfg;
|
||||
mut.deinit(allocator);
|
||||
}
|
||||
|
||||
// Parse command: entrypoint + args, or explicit -- command
|
||||
const command = try resolveCommand(allocator, &cfg, command_args.items);
|
||||
defer freeCommand(allocator, command);
|
||||
// Determine execution mode
|
||||
var run_mode: RunMode = undefined;
|
||||
|
||||
// Generate run_id
|
||||
const run_id = try db.generateUUID(allocator);
|
||||
defer allocator.free(run_id);
|
||||
|
||||
// Determine experiment name
|
||||
const experiment_name = if (cfg.experiment) |exp| exp.name else "default";
|
||||
|
||||
// Build artifact path
|
||||
const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{
|
||||
cfg.artifact_path,
|
||||
experiment_name,
|
||||
run_id,
|
||||
});
|
||||
defer allocator.free(artifact_path);
|
||||
|
||||
// Create run directory
|
||||
std.fs.makeDirAbsolute(artifact_path) catch |err| {
|
||||
if (err != error.PathAlreadyExists) {
|
||||
std.log.err("Failed to create run directory: {}", .{err});
|
||||
return error.MkdirFailed;
|
||||
}
|
||||
};
|
||||
|
||||
// Get DB path and initialize if needed (lazy bootstrap)
|
||||
const db_path = try cfg.getDBPath(allocator);
|
||||
defer allocator.free(db_path);
|
||||
|
||||
var database = try initOrOpenDB(allocator, db_path);
|
||||
defer database.close();
|
||||
|
||||
// Write run manifest (status=RUNNING)
|
||||
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{
|
||||
artifact_path,
|
||||
"run_manifest.json",
|
||||
});
|
||||
defer allocator.free(manifest_path);
|
||||
|
||||
const timestamp = try db.currentTimestamp(allocator);
|
||||
defer allocator.free(timestamp);
|
||||
|
||||
var manifest = Manifest.init(allocator);
|
||||
manifest.run_id = run_id;
|
||||
manifest.experiment = experiment_name;
|
||||
manifest.command = try std.mem.join(allocator, " ", command);
|
||||
manifest.args = try duplicateStrings(allocator, command);
|
||||
manifest.started_at = try allocator.dupe(u8, timestamp);
|
||||
manifest.status = "RUNNING";
|
||||
manifest.artifact_path = artifact_path;
|
||||
manifest.synced = false;
|
||||
|
||||
// Insert run into database
|
||||
const run_name = try std.fmt.allocPrint(allocator, "run-{s}", .{run_id[0..8]});
|
||||
defer allocator.free(run_name);
|
||||
|
||||
const sql = "INSERT INTO ml_runs (run_id, experiment_id, name, status, start_time, synced) VALUES (?, ?, ?, 'RUNNING', ?, 0);";
|
||||
const stmt = try database.prepare(sql);
|
||||
defer db.DB.finalize(stmt);
|
||||
|
||||
try db.DB.bindText(stmt, 1, run_id);
|
||||
try db.DB.bindText(stmt, 2, experiment_name);
|
||||
try db.DB.bindText(stmt, 3, run_name);
|
||||
try db.DB.bindText(stmt, 4, timestamp);
|
||||
_ = try db.DB.step(stmt);
|
||||
|
||||
// Write manifest
|
||||
try manifest_lib.writeManifest(manifest, manifest_path, allocator);
|
||||
|
||||
// Fork and execute
|
||||
const output_log_path = try std.fs.path.join(allocator, &[_][]const u8{
|
||||
artifact_path,
|
||||
"output.log",
|
||||
});
|
||||
defer allocator.free(output_log_path);
|
||||
|
||||
// Execute and capture
|
||||
const exit_code = try executeAndCapture(
|
||||
allocator,
|
||||
command,
|
||||
output_log_path,
|
||||
&database,
|
||||
run_id,
|
||||
);
|
||||
|
||||
// Update run status in database
|
||||
const end_time = try db.currentTimestamp(allocator);
|
||||
defer allocator.free(end_time);
|
||||
|
||||
const status = if (exit_code == 0) "FINISHED" else "FAILED";
|
||||
|
||||
const update_sql = "UPDATE ml_runs SET status = ?, end_time = ?, exit_code = ?, pid = NULL WHERE run_id = ?;";
|
||||
const update_stmt = try database.prepare(update_sql);
|
||||
defer db.DB.finalize(update_stmt);
|
||||
|
||||
try db.DB.bindText(update_stmt, 1, status);
|
||||
try db.DB.bindText(update_stmt, 2, end_time);
|
||||
try db.DB.bindInt64(update_stmt, 3, exit_code);
|
||||
try db.DB.bindText(update_stmt, 4, run_id);
|
||||
_ = try db.DB.step(update_stmt);
|
||||
|
||||
// Update manifest
|
||||
try manifest_lib.updateManifestStatus(manifest_path, status, exit_code, allocator);
|
||||
|
||||
// Checkpoint WAL
|
||||
database.checkpointOnExit();
|
||||
|
||||
// Print result
|
||||
if (flags.json) {
|
||||
std.debug.print("{{\"success\":true,\"run_id\":\"{s}\",\"status\":\"{s}\",\"exit_code\":{d}}}\n", .{
|
||||
run_id,
|
||||
status,
|
||||
exit_code,
|
||||
});
|
||||
if (force_local) {
|
||||
run_mode = .local;
|
||||
} else if (force_remote) {
|
||||
run_mode = .remote;
|
||||
} else {
|
||||
std.debug.print("[OK] Run {s} complete ({s})\n", .{ run_id[0..8], status });
|
||||
if (cfg.sync_uri.len > 0) {
|
||||
std.debug.print("-> queued for sync\n", .{});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve command from entrypoint + args, or explicit -- command
|
||||
fn resolveCommand(allocator: std.mem.Allocator, cfg: *const config.Config, args: []const []const u8) ![][]const u8 {
|
||||
// Check for explicit -- separator
|
||||
var double_dash_idx: ?usize = null;
|
||||
for (args, 0..) |arg, i| {
|
||||
if (std.mem.eql(u8, arg, "--")) {
|
||||
double_dash_idx = i;
|
||||
break;
|
||||
const mode_result = try mode.detect(allocator, cfg);
|
||||
run_mode = if (mode.isOnline(mode_result.mode)) .remote else .local;
|
||||
if (mode_result.warning) |warn| {
|
||||
std.log.info("{s}", .{warn});
|
||||
}
|
||||
}
|
||||
|
||||
if (double_dash_idx) |idx| {
|
||||
// Explicit command after --
|
||||
if (idx + 1 >= args.len) {
|
||||
std.log.err("No command provided after --", .{});
|
||||
return error.NoCommand;
|
||||
}
|
||||
return try allocator.dupe([]const u8, args[idx + 1 ..]);
|
||||
// Handle special modes
|
||||
if (options.dry_run) {
|
||||
return try common.dryRun(allocator, job_name.?, run_mode, &options, args_str);
|
||||
}
|
||||
|
||||
// Use entrypoint from config + args
|
||||
if (cfg.experiment) |exp| {
|
||||
if (exp.entrypoint.len > 0) {
|
||||
// Split entrypoint on spaces
|
||||
var argv = try std.ArrayList([]const u8).initCapacity(allocator, 8);
|
||||
if (options.validate) {
|
||||
return try validateJob(allocator, job_name.?, &options);
|
||||
}
|
||||
|
||||
// Parse entrypoint (split on spaces)
|
||||
var iter = std.mem.splitScalar(u8, exp.entrypoint, ' ');
|
||||
while (iter.next()) |part| {
|
||||
if (part.len > 0) {
|
||||
try argv.append(allocator, try allocator.dupe(u8, part));
|
||||
}
|
||||
if (options.explain) {
|
||||
return try explainJob(allocator, job_name.?, &options);
|
||||
}
|
||||
|
||||
// Execute
|
||||
switch (run_mode) {
|
||||
.remote => {
|
||||
try remote.execute(allocator, job_name.?, options.priority, &options, args_str, cfg);
|
||||
},
|
||||
.local => {
|
||||
const run_id = try local.execute(allocator, job_name.?, &options, args_str, cfg);
|
||||
try local.markForSync(allocator, run_id);
|
||||
if (!flags.json) {
|
||||
std.debug.print("\nRun completed locally (run_id: {s})\n", .{run_id[0..@min(8, run_id.len)]});
|
||||
std.debug.print("Will sync to server when connection is available\n", .{});
|
||||
}
|
||||
|
||||
// Append args
|
||||
for (args) |arg| {
|
||||
try argv.append(allocator, try allocator.dupe(u8, arg));
|
||||
}
|
||||
|
||||
return try argv.toOwnedSlice(allocator);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// No entrypoint configured and no explicit command
|
||||
std.log.err("No entrypoint configured. Set entrypoint in .fetchml/config.toml or use: ml run -- <command>", .{});
|
||||
return error.NoEntrypoint;
|
||||
}
|
||||
|
||||
/// Free command array
|
||||
fn freeCommand(allocator: std.mem.Allocator, command: [][]const u8) void {
|
||||
for (command) |arg| {
|
||||
allocator.free(arg);
|
||||
}
|
||||
allocator.free(command);
|
||||
fn validateJob(allocator: std.mem.Allocator, job_name: []const u8, options: *const RunOptions) !void {
|
||||
_ = options;
|
||||
const train_script_exists = if (std.fs.cwd().access("train.py", .{})) true else |_| false;
|
||||
const requirements_exists = if (std.fs.cwd().access("requirements.txt", .{})) true else |_| false;
|
||||
const overall_valid = train_script_exists and requirements_exists;
|
||||
|
||||
std.debug.print("Validation Results for '{s}':\n", .{job_name});
|
||||
std.debug.print(" train.py: {s}\n", .{if (train_script_exists) "yes" else "no"});
|
||||
std.debug.print(" requirements.txt: {s}\n", .{if (requirements_exists) "yes" else "no"});
|
||||
std.debug.print("\n{s}\n", .{if (overall_valid) "✓ Validation passed" else "✗ Validation failed"});
|
||||
_ = allocator;
|
||||
}
|
||||
|
||||
/// Duplicate array of strings
|
||||
fn duplicateStrings(allocator: std.mem.Allocator, strings: []const []const u8) ![][]const u8 {
|
||||
const result = try allocator.alloc([]const u8, strings.len);
|
||||
for (strings, 0..) |s, i| {
|
||||
result[i] = try allocator.dupe(u8, s);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// Initialize or open database (lazy bootstrap)
|
||||
fn initOrOpenDB(allocator: std.mem.Allocator, db_path: []const u8) !db.DB {
|
||||
const db_exists = blk: {
|
||||
std.fs.accessAbsolute(db_path, .{}) catch |err| {
|
||||
if (err == error.FileNotFound) break :blk false;
|
||||
};
|
||||
break :blk true;
|
||||
};
|
||||
|
||||
const database = try db.DB.init(allocator, db_path);
|
||||
|
||||
if (!db_exists) {
|
||||
std.log.info("local mode active — tracking to {s}", .{db_path});
|
||||
}
|
||||
|
||||
return database;
|
||||
}
|
||||
|
||||
/// Execute command and capture output, parsing FETCHML_METRIC lines
|
||||
fn executeAndCapture(
|
||||
allocator: std.mem.Allocator,
|
||||
command: []const []const u8,
|
||||
output_path: []const u8,
|
||||
database: *db.DB,
|
||||
run_id: []const u8,
|
||||
) !i32 {
|
||||
// Create output file
|
||||
var output_file = try std.fs.cwd().createFile(output_path, .{});
|
||||
defer output_file.close();
|
||||
|
||||
// Create pipe for stdout
|
||||
const pipe = try std.posix.pipe();
|
||||
defer {
|
||||
std.posix.close(pipe[0]);
|
||||
std.posix.close(pipe[1]);
|
||||
}
|
||||
|
||||
// Fork child process
|
||||
const pid = try std.posix.fork();
|
||||
|
||||
if (pid == 0) {
|
||||
// Child process
|
||||
std.posix.close(pipe[0]); // Close read end
|
||||
|
||||
// Redirect stdout to pipe
|
||||
_ = std.posix.dup2(pipe[1], std.posix.STDOUT_FILENO) catch std.process.exit(1);
|
||||
_ = std.posix.dup2(pipe[1], std.posix.STDERR_FILENO) catch std.process.exit(1);
|
||||
std.posix.close(pipe[1]);
|
||||
|
||||
// Execute command using execvp (uses current environ)
|
||||
const c_err = execvp(@ptrCast(command[0].ptr), @ptrCast(command.ptr));
|
||||
std.log.err("Failed to execute {s}: {}", .{ command[0], c_err });
|
||||
std.process.exit(1);
|
||||
unreachable;
|
||||
}
|
||||
|
||||
// Parent process
|
||||
std.posix.close(pipe[1]); // Close write end
|
||||
|
||||
// Store PID in database
|
||||
const pid_sql = "UPDATE ml_runs SET pid = ? WHERE run_id = ?;";
|
||||
const pid_stmt = try database.prepare(pid_sql);
|
||||
defer db.DB.finalize(pid_stmt);
|
||||
try db.DB.bindInt64(pid_stmt, 1, pid);
|
||||
try db.DB.bindText(pid_stmt, 2, run_id);
|
||||
_ = try db.DB.step(pid_stmt);
|
||||
|
||||
// Read from pipe and parse FETCHML_METRIC lines
|
||||
var buf: [4096]u8 = undefined;
|
||||
var line_buf: [1024]u8 = undefined;
|
||||
var line_len: usize = 0;
|
||||
|
||||
while (true) {
|
||||
const bytes_read = std.posix.read(pipe[0], &buf) catch |err| {
|
||||
if (err == error.WouldBlock or err == error.BrokenPipe) break;
|
||||
break;
|
||||
};
|
||||
|
||||
if (bytes_read == 0) break;
|
||||
|
||||
// Write to output file
|
||||
try output_file.writeAll(buf[0..bytes_read]);
|
||||
|
||||
// Parse lines
|
||||
for (buf[0..bytes_read]) |byte| {
|
||||
if (byte == '\n' or line_len >= line_buf.len - 1) {
|
||||
if (line_len > 0) {
|
||||
line_buf[line_len] = 0;
|
||||
const line = line_buf[0..line_len];
|
||||
try parseAndLogMetric(allocator, line, database, run_id);
|
||||
line_len = 0;
|
||||
}
|
||||
} else {
|
||||
line_buf[line_len] = byte;
|
||||
line_len += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for child
|
||||
var status: c_int = 0;
|
||||
_ = waitpid(@intCast(pid), &status, 0);
|
||||
|
||||
// Parse exit code
|
||||
if (WIFEXITED(status) != 0) {
|
||||
return WEXITSTATUS(status);
|
||||
} else if (WIFSIGNALED(status) != 0) {
|
||||
return 128 + WTERMSIG(status);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/// Parse FETCHML_METRIC line and log to database
|
||||
/// Format: FETCHML_METRIC key=value [step=N]
|
||||
fn parseAndLogMetric(
|
||||
allocator: std.mem.Allocator,
|
||||
line: []const u8,
|
||||
database: *db.DB,
|
||||
run_id: []const u8,
|
||||
) !void {
|
||||
const trimmed = std.mem.trim(u8, line, " \t\r");
|
||||
|
||||
// Check prefix
|
||||
const prefix = "FETCHML_METRIC";
|
||||
if (!std.mem.startsWith(u8, trimmed, prefix)) return;
|
||||
|
||||
// Get the rest after prefix
|
||||
var rest = trimmed[prefix.len..];
|
||||
rest = std.mem.trimLeft(u8, rest, " \t");
|
||||
|
||||
// Parse key=value
|
||||
var iter = std.mem.splitScalar(u8, rest, ' ');
|
||||
const kv_part = iter.next() orelse return;
|
||||
|
||||
var kv_iter = std.mem.splitScalar(u8, kv_part, '=');
|
||||
const key = kv_iter.next() orelse return;
|
||||
const value_str = kv_iter.next() orelse return;
|
||||
|
||||
// Validate key: [a-zA-Z][a-zA-Z0-9_]*
|
||||
if (key.len == 0) return;
|
||||
const first_char = key[0];
|
||||
if (!std.ascii.isAlphabetic(first_char)) return;
|
||||
for (key[1..]) |c| {
|
||||
if (!std.ascii.isAlphanumeric(c) and c != '_') return;
|
||||
}
|
||||
|
||||
// Parse value
|
||||
const value = std.fmt.parseFloat(f64, value_str) catch return;
|
||||
|
||||
// Parse optional step
|
||||
var step: i64 = 0;
|
||||
while (iter.next()) |part| {
|
||||
if (std.mem.startsWith(u8, part, "step=")) {
|
||||
const step_str = part[5..];
|
||||
step = std.fmt.parseInt(i64, step_str, 10) catch 0;
|
||||
if (step < 0) step = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert metric
|
||||
const sql = "INSERT INTO ml_metrics (run_id, key, value, step) VALUES (?, ?, ?, ?);";
|
||||
const stmt = try database.prepare(sql);
|
||||
defer db.DB.finalize(stmt);
|
||||
|
||||
try db.DB.bindText(stmt, 1, run_id);
|
||||
try db.DB.bindText(stmt, 2, key);
|
||||
try db.DB.bindDouble(stmt, 3, value);
|
||||
try db.DB.bindInt64(stmt, 4, step);
|
||||
_ = try db.DB.step(stmt);
|
||||
|
||||
fn explainJob(allocator: std.mem.Allocator, job_name: []const u8, options: *const RunOptions) !void {
|
||||
std.debug.print("Job Explanation for '{s}':\n", .{job_name});
|
||||
std.debug.print(" CPU: {d}, Memory: {d}GB, GPU: {d}\n", .{ options.cpu, options.memory, options.gpu });
|
||||
if (options.hypothesis) |h| std.debug.print(" Hypothesis: {s}\n", .{h});
|
||||
std.debug.print("\n Action: Would execute\n", .{});
|
||||
_ = allocator;
|
||||
}
|
||||
|
||||
fn printUsage() !void {
|
||||
std.debug.print("Usage: ml run [options] [args...]\n", .{});
|
||||
std.debug.print("\t\t\tml run -- <command> [args...]\n\n", .{});
|
||||
std.debug.print("Execute a run locally with experiment tracking.\n\n", .{});
|
||||
std.debug.print("Options:\n", .{});
|
||||
std.debug.print("\t--help, -h\tShow this help message\n", .{});
|
||||
std.debug.print("\t--json\t\tOutput structured JSON\n\n", .{});
|
||||
std.debug.print("Examples:\n", .{});
|
||||
std.debug.print("\tml run\t\t\t# Use entrypoint from config\n", .{});
|
||||
std.debug.print("\tml run --lr 0.001\t\t# Append args to entrypoint\n", .{});
|
||||
std.debug.print("\tml run -- python train.py\t# Run explicit command\n", .{});
|
||||
std.debug.print(
|
||||
\\n
|
||||
\\ml run <job_name> [options] [-- <args>]
|
||||
\\
|
||||
\\Unified run command - handles both local and remote execution.
|
||||
\\
|
||||
\\Options:
|
||||
\\ --priority <1-10> Job priority (default: 5)
|
||||
\\ --cpu <n>, --memory <n>, --gpu <n> Resources
|
||||
\\ --local, --remote Force execution mode
|
||||
\\ --dry-run, --validate, --explain Preview modes
|
||||
\\ --hypothesis, --context, --tags Research context
|
||||
\\
|
||||
, .{});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,9 +37,7 @@ pub fn main() !void {
|
|||
'a' => if (std.mem.eql(u8, command, "annotate")) {
|
||||
try @import("commands/annotate.zig").execute(allocator, args[2..]);
|
||||
} else handleUnknownCommand(command),
|
||||
'e' => if (std.mem.eql(u8, command, "exec")) {
|
||||
try @import("commands/exec.zig").execute(allocator, args[2..]);
|
||||
} else if (std.mem.eql(u8, command, "experiment")) {
|
||||
'e' => if (std.mem.eql(u8, command, "experiment")) {
|
||||
try @import("commands/experiment.zig").execute(allocator, args[2..]);
|
||||
} else if (std.mem.eql(u8, command, "export")) {
|
||||
try @import("commands/export_cmd.zig").run(allocator, args[2..]);
|
||||
|
|
@ -52,9 +50,6 @@ pub fn main() !void {
|
|||
'r' => if (std.mem.eql(u8, command, "run")) {
|
||||
try @import("commands/run.zig").execute(allocator, args[2..]);
|
||||
} else handleUnknownCommand(command),
|
||||
'q' => if (std.mem.eql(u8, command, "queue")) {
|
||||
try @import("commands/queue.zig").run(allocator, args[2..]);
|
||||
} else handleUnknownCommand(command),
|
||||
'd' => if (std.mem.eql(u8, command, "dataset")) {
|
||||
try @import("commands/dataset.zig").run(allocator, args[2..]);
|
||||
} else handleUnknownCommand(command),
|
||||
|
|
@ -93,10 +88,8 @@ fn printUsage() void {
|
|||
std.debug.print("ML Experiment Manager\n\n", .{});
|
||||
std.debug.print("Usage: ml <command> [options]\n\n", .{});
|
||||
std.debug.print("Commands:\n", .{});
|
||||
std.debug.print(" exec <job> Execute job (auto local/remote)\n", .{});
|
||||
std.debug.print(" run <job> Execute job (auto local/remote)\n", .{});
|
||||
std.debug.print(" init Initialize project with config\n", .{});
|
||||
std.debug.print(" run [args] Execute a run locally\n", .{});
|
||||
std.debug.print(" queue <job> Queue job on server\n", .{});
|
||||
std.debug.print(" annotate <id> Add metadata annotations\n", .{});
|
||||
std.debug.print(" experiment Manage experiments (create, list, show)\n", .{});
|
||||
std.debug.print(" logs <id> Fetch or stream run logs\n", .{});
|
||||
|
|
|
|||
Loading…
Reference in a new issue