diff --git a/cli/src/commands/exec.zig b/cli/src/commands/exec.zig deleted file mode 100644 index 8bf91e8..0000000 --- a/cli/src/commands/exec.zig +++ /dev/null @@ -1,14 +0,0 @@ -const std = @import("std"); -const core = @import("../core.zig"); - -// Import modular exec structure -const exec_mod = @import("exec/mod.zig"); - -// Re-export for backward compatibility -pub const ExecMode = exec_mod.ExecMode; -pub const ExecOptions = exec_mod.ExecOptions; - -// Main entry point - delegates to modular implementation -pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { - return exec_mod.run(allocator, args); -} diff --git a/cli/src/commands/queue.zig b/cli/src/commands/queue.zig deleted file mode 100644 index 256a0d8..0000000 --- a/cli/src/commands/queue.zig +++ /dev/null @@ -1,1167 +0,0 @@ -const std = @import("std"); -const Config = @import("../config.zig").Config; -const ws = @import("../net/ws/client.zig"); -const history = @import("../utils/history.zig"); -const crypto = @import("../utils/crypto.zig"); -const protocol = @import("../net/protocol.zig"); -const stdcrypto = std.crypto; -const mode = @import("../mode.zig"); -const db = @import("../db.zig"); -const manifest_lib = @import("../manifest.zig"); -const progress = @import("../utils/progress.zig"); -const common = @import("common.zig"); - -// Use modular queue structure -const queue_mod = @import("queue/mod.zig"); - -// Re-export for backward compatibility -pub const TrackingConfig = queue_mod.TrackingConfig; -pub const QueueOptions = queue_mod.QueueOptions; -pub const parse = queue_mod.parse; -pub const validate = queue_mod.validate; -pub const submit = queue_mod.submit; - -fn resolveCommitHexOrPrefix(allocator: std.mem.Allocator, base_path: []const u8, input: []const u8) ![]u8 { - if (input.len < 7 or input.len > 40) return error.InvalidArgs; - for (input) |c| { - if (!std.ascii.isHex(c)) return error.InvalidArgs; - } - - if (input.len == 40) { - return allocator.dupe(u8, input); - } - - var dir = if (std.fs.path.isAbsolute(base_path)) - try std.fs.openDirAbsolute(base_path, .{ .iterate = true }) - else - try std.fs.cwd().openDir(base_path, .{ .iterate = true }); - defer dir.close(); - - var it = dir.iterate(); - var found: ?[]u8 = null; - errdefer if (found) |s| allocator.free(s); - - while (try it.next()) |entry| { - if (entry.kind != .directory) continue; - const name = entry.name; - if (name.len != 40) continue; - if (!std.mem.startsWith(u8, name, input)) continue; - for (name) |c| { - if (!std.ascii.isHex(c)) break; - } else { - if (found != null) return error.InvalidArgs; - found = try allocator.dupe(u8, name); - } - } - - if (found) |s| return s; - return error.FileNotFound; -} - -pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { - if (args.len == 0) { - try printUsage(); - return error.InvalidArgs; - } - - if (std.mem.eql(u8, args[0], "--help") or std.mem.eql(u8, args[0], "-h")) { - try printUsage(); - return; - } - - // Load config for mode detection - const config = try Config.load(allocator); - defer { - var mut_config = config; - mut_config.deinit(allocator); - } - - // Detect mode early to provide clear error for offline - const mode_result = try mode.detect(allocator, config); - - // Check for --rerun flag - var rerun_id: ?[]const u8 = null; - for (args, 0..) |arg, i| { - if (std.mem.eql(u8, arg, "--rerun") and i + 1 < args.len) { - rerun_id = args[i + 1]; - break; - } - } - - // If --rerun is specified, handle re-queueing - if (rerun_id) |id| { - if (mode.isOffline(mode_result.mode)) { - std.debug.print("ml queue --rerun requires server connection\n", .{}); - return error.RequiresServer; - } - return try handleRerun(allocator, id, args, config); - } - - // Regular queue - requires server - if (mode.isOffline(mode_result.mode)) { - std.debug.print("ml queue requires server connection (use 'ml run' for local execution)\n", .{}); - return error.RequiresServer; - } - - // Continue with regular queue logic... - try executeQueue(allocator, args, config); -} - -fn executeQueue(allocator: std.mem.Allocator, args: []const []const u8, config: Config) !void { - // Support batch operations - multiple job names - var job_names = std.ArrayList([]const u8).initCapacity(allocator, 10) catch |err| { - std.debug.print("Failed to allocate job list: {}\n", .{err}); - return err; - }; - defer job_names.deinit(allocator); - - var commit_id_override: ?[]const u8 = null; - var priority: u8 = 5; - var snapshot_id: ?[]const u8 = null; - var snapshot_sha256: ?[]const u8 = null; - var args_override: ?[]const u8 = null; - var note_override: ?[]const u8 = null; - - // Initialize options with config defaults - var options = QueueOptions{ - .cpu = config.default_cpu, - .memory = config.default_memory, - .gpu = config.default_gpu, - .gpu_memory = config.default_gpu_memory, - .dry_run = config.default_dry_run, - .validate = config.default_validate, - .json = config.default_json, - .secrets = try std.ArrayList([]const u8).initCapacity(allocator, 4), - }; - defer options.secrets.deinit(allocator); - priority = config.default_priority; - - // Tracking configuration - var tracking = TrackingConfig{}; - - // Support passing runner args after "--". - var sep_index: ?usize = null; - for (args, 0..) |a, idx| { - if (std.mem.eql(u8, a, "--")) { - sep_index = idx; - break; - } - } - const pre = args[0..(sep_index orelse args.len)]; - const post = if (sep_index) |si| args[(si + 1)..] else args[0..0]; - - var args_joined: []const u8 = ""; - if (post.len > 0) { - var buf: std.ArrayList(u8) = .{}; - defer buf.deinit(allocator); - for (post, 0..) |a, j| { - if (j > 0) try buf.append(allocator, ' '); - try buf.appendSlice(allocator, a); - } - args_joined = try buf.toOwnedSlice(allocator); - } - defer if (post.len > 0) allocator.free(args_joined); - - // Parse arguments - separate job names from flags - var i: usize = 0; - while (i < pre.len) : (i += 1) { - const arg = pre[i]; - - if (std.mem.startsWith(u8, arg, "--") or std.mem.eql(u8, arg, "-h")) { - // Parse flags - if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { - try printUsage(); - return; - } - if (std.mem.eql(u8, arg, "--commit") and i + 1 < pre.len) { - if (commit_id_override != null) { - allocator.free(commit_id_override.?); - } - const commit_in = pre[i + 1]; - const commit_hex = resolveCommitHexOrPrefix(allocator, config.worker_base, commit_in) catch |err| { - if (err == error.FileNotFound) { - std.debug.print("No commit matches prefix: {s}\n", .{commit_in}); - return error.InvalidArgs; - } - std.debug.print("Invalid commit id\n", .{}); - return error.InvalidArgs; - }; - defer allocator.free(commit_hex); - - const commit_bytes = crypto.decodeHex(allocator, commit_hex) catch { - std.debug.print("Invalid commit id: must be hex\n", .{}); - return error.InvalidArgs; - }; - if (commit_bytes.len != 20) { - allocator.free(commit_bytes); - std.debug.print("Invalid commit id: expected 20 bytes\n", .{}); - return error.InvalidArgs; - } - commit_id_override = commit_bytes; - i += 1; - } else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) { - priority = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--mlflow")) { - tracking.mlflow = TrackingConfig.MLflowConfig{}; - } else if (std.mem.eql(u8, arg, "--mlflow-uri") and i + 1 < pre.len) { - tracking.mlflow = TrackingConfig.MLflowConfig{ - .mode = "remote", - .tracking_uri = pre[i + 1], - }; - i += 1; - } else if (std.mem.eql(u8, arg, "--tensorboard")) { - tracking.tensorboard = TrackingConfig.TensorBoardConfig{}; - } else if (std.mem.eql(u8, arg, "--wandb-key") and i + 1 < pre.len) { - if (tracking.wandb == null) tracking.wandb = TrackingConfig.WandbConfig{}; - tracking.wandb.?.api_key = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--wandb-project") and i + 1 < pre.len) { - if (tracking.wandb == null) tracking.wandb = TrackingConfig.WandbConfig{}; - tracking.wandb.?.project = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--wandb-entity") and i + 1 < pre.len) { - if (tracking.wandb == null) tracking.wandb = TrackingConfig.WandbConfig{}; - tracking.wandb.?.entity = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--dry-run")) { - options.dry_run = true; - } else if (std.mem.eql(u8, arg, "--validate")) { - options.validate = true; - } else if (std.mem.eql(u8, arg, "--explain")) { - options.explain = true; - } else if (std.mem.eql(u8, arg, "--json")) { - options.json = true; - } else if (std.mem.eql(u8, arg, "--force")) { - options.force = true; - } else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) { - options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) { - options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) { - options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) { - options.gpu_memory = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--snapshot-id") and i + 1 < pre.len) { - snapshot_id = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--snapshot-sha256") and i + 1 < pre.len) { - snapshot_sha256 = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--args") and i + 1 < pre.len) { - args_override = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--note") and i + 1 < pre.len) { - note_override = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) { - options.hypothesis = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) { - options.context = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) { - options.intent = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) { - options.expected_outcome = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--experiment-group") and i + 1 < pre.len) { - options.experiment_group = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) { - options.tags = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--network") and i + 1 < pre.len) { - options.network_mode = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--read-only")) { - options.read_only = true; - } else if (std.mem.eql(u8, arg, "--secret") and i + 1 < pre.len) { - try options.secrets.append(allocator, pre[i + 1]); - i += 1; - } else if (std.mem.eql(u8, arg, "--reservation") and i + 1 < pre.len) { - options.reservation_id = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--gang-size") and i + 1 < pre.len) { - options.gang_size = try std.fmt.parseInt(u32, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--max-wait") and i + 1 < pre.len) { - options.max_wait_time = try std.fmt.parseInt(u32, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--preemptible")) { - options.preemptible = true; - } else if (std.mem.eql(u8, arg, "--worker") and i + 1 < pre.len) { - options.preferred_worker = pre[i + 1]; - i += 1; - } - } else { - // This is a job name - job_names.append(allocator, arg) catch |err| { - std.debug.print("Failed to append job: {}\n", .{err}); - return err; - }; - } - } - - if (job_names.items.len == 0) { - std.debug.print("No job names specified\n", .{}); - return error.InvalidArgs; - } - - const print_next_steps = (!options.json) and (job_names.items.len == 1); - - // Handle special modes - if (options.explain) { - try explainJob(allocator, job_names.items[0], commit_id_override, priority, &options); - return; - } - - if (options.validate) { - try validateJob(allocator, job_names.items[0], commit_id_override, &options); - return; - } - - if (options.dry_run) { - try dryRunJob(allocator, job_names.items[0], commit_id_override, priority, &options); - return; - } - - std.debug.print("Queueing {d} job(s)...\n", .{job_names.items.len}); - - // Generate tracking JSON if needed (simplified for now) - const tracking_json: []const u8 = ""; - - // Process each job - var success_count: usize = 0; - var failed_jobs = std.ArrayList([]const u8).initCapacity(allocator, 10) catch |err| { - std.debug.print("Failed to allocate failed jobs list: {}\n", .{err}); - return err; - }; - defer failed_jobs.deinit(allocator); - - const args_str: []const u8 = if (args_override) |a| a else args_joined; - const note_str: []const u8 = if (note_override) |n| n else ""; - - // Show progress bar for multiple jobs (not in JSON mode) - var pb: ?progress.ProgressBar = null; - if (!options.json and job_names.items.len > 1) { - pb = progress.ProgressBar.init(allocator, job_names.items.len, "Queuing jobs..."); - } - defer if (pb) |*p| p.finish(); - - for (job_names.items, 0..) |job_name, index| { - if (!options.json and job_names.items.len == 1) { - std.debug.print("Processing job: {s}\n", .{job_name}); - } - - queueSingleJob( - allocator, - job_name, - commit_id_override, - priority, - tracking_json, - &options, - snapshot_id, - snapshot_sha256, - args_str, - note_str, - print_next_steps, - ) catch |err| { - std.debug.print("Failed to queue job '{s}': {}\n", .{ job_name, err }); - failed_jobs.append(allocator, job_name) catch |append_err| { - std.debug.print("Failed to track failed job: {}\n", .{append_err}); - }; - continue; - }; - - if (!options.json and job_names.items.len == 1) { - std.debug.print("Successfully queued job '{s}'\n", .{job_name}); - } - success_count += 1; - - // Update progress bar - if (pb) |*p| p.update(index + 1); - } - - // Show summary - std.debug.print("Batch queuing complete.\n", .{}); - std.debug.print("Successfully queued: {d} job(s)\n", .{success_count}); - - if (failed_jobs.items.len > 0) { - std.debug.print("Failed to queue: {d} job(s)\n", .{failed_jobs.items.len}); - for (failed_jobs.items) |failed_job| { - std.debug.print(" - {s}\n", .{failed_job}); - } - } - - if (!options.json and success_count > 0 and job_names.items.len > 1) { - std.debug.print("\nNext steps:\n", .{}); - std.debug.print(" ml status --watch\n", .{}); - } -} - -/// Handle --rerun flag: re-queue a completed run -fn handleRerun(allocator: std.mem.Allocator, run_id: []const u8, args: []const []const u8, cfg: Config) !void { - _ = args; // Override args not implemented yet - - const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key); - defer allocator.free(api_key_hash); - - const ws_url = try cfg.getWebSocketUrl(allocator); - defer allocator.free(ws_url); - - var client = try ws.Client.connect(allocator, ws_url, cfg.api_key); - defer client.close(); - - // Send rerun request to server - try client.sendRerunRequest(run_id, api_key_hash); - - // Wait for response - const message = try client.receiveMessage(allocator); - defer allocator.free(message); - - // Parse response (simplified) - if (std.mem.indexOf(u8, message, "success") != null) { - std.debug.print("Re-queued run {s}\n", .{run_id[0..8]}); - } else { - std.debug.print("Failed to re-queue: {s}\n", .{message}); - return error.RerunFailed; - } -} - -fn generateCommitID(allocator: std.mem.Allocator) ![]const u8 { - var bytes: [20]u8 = undefined; - stdcrypto.random.bytes(&bytes); - return allocator.dupe(u8, &bytes); -} - -fn queueSingleJob( - allocator: std.mem.Allocator, - job_name: []const u8, - commit_override: ?[]const u8, - priority: u8, - tracking_json: []const u8, - options: *const QueueOptions, - snapshot_id: ?[]const u8, - snapshot_sha256: ?[]const u8, - args_str: []const u8, - note_str: []const u8, - print_next_steps: bool, -) !void { - const commit_id = blk: { - if (commit_override) |cid| break :blk cid; - const generated = try generateCommitID(allocator); - break :blk generated; - }; - defer if (commit_override == null) allocator.free(commit_id); - - // Build narrative JSON if any narrative fields are set - const narrative_json = common.buildNarrativeJson(allocator, options) catch null; - defer if (narrative_json) |j| allocator.free(j); - - const config = try Config.load(allocator); - defer { - var mut_config = config; - mut_config.deinit(allocator); - } - - const commit_hex = try crypto.encodeHexLower(allocator, commit_id); - defer allocator.free(commit_hex); - - const api_key_hash = try crypto.hashApiKey(allocator, config.api_key); - defer allocator.free(api_key_hash); - - // Check for existing job with same commit (incremental queue) - if (!options.force) { - const existing = try checkExistingJob(allocator, job_name, commit_id, api_key_hash, config); - if (existing) |ex| { - defer allocator.free(ex); - // Server already has this job - handle duplicate response - try handleDuplicateResponse(allocator, ex, job_name, commit_hex, options); - return; - } - } - - std.debug.print("Queueing job '{s}' with commit {s}...\n", .{ job_name, commit_hex }); - - // Connect to WebSocket and send queue message - const ws_url = try config.getWebSocketUrl(allocator); - defer allocator.free(ws_url); - - var client = try ws.Client.connect(allocator, ws_url, config.api_key); - defer client.close(); - - if ((snapshot_id != null) != (snapshot_sha256 != null)) { - std.debug.print("Both --snapshot-id and --snapshot-sha256 must be set\n", .{}); - return error.InvalidArgs; - } - if (snapshot_id != null and tracking_json.len > 0) { - std.debug.print("Snapshot queueing is not supported with tracking yet\n", .{}); - return error.InvalidArgs; - } - - // Build combined metadata JSON with tracking and/or narrative - const combined_json = blk: { - if (tracking_json.len > 0 and narrative_json != null) { - // Merge tracking and narrative - var buf = try std.ArrayList(u8).initCapacity(allocator, 256); - defer buf.deinit(allocator); - const writer = buf.writer(allocator); - try writer.writeAll("{"); - try writer.writeAll(tracking_json[1 .. tracking_json.len - 1]); // Remove outer braces - try writer.writeAll(","); - try writer.writeAll("\"narrative\":"); - try writer.writeAll(narrative_json.?); - try writer.writeAll("}"); - break :blk try buf.toOwnedSlice(allocator); - } else if (tracking_json.len > 0) { - break :blk try allocator.dupe(u8, tracking_json); - } else if (narrative_json) |nj| { - var buf = try std.ArrayList(u8).initCapacity(allocator, 256); - defer buf.deinit(allocator); - const writer = buf.writer(allocator); - try writer.writeAll("{\"narrative\":"); - try writer.writeAll(nj); - try writer.writeAll("}"); - break :blk try buf.toOwnedSlice(allocator); - } else { - break :blk ""; - } - }; - defer if (combined_json.len > 0 and combined_json.ptr != tracking_json.ptr) allocator.free(combined_json); - - if (combined_json.len > 0) { - try client.sendQueueJobWithTrackingAndResources( - job_name, - commit_id, - priority, - api_key_hash, - combined_json, - options.cpu, - options.memory, - options.gpu, - options.gpu_memory, - ); - } else if (note_str.len > 0 or args_str.len > 0) { - if (note_str.len > 0) { - try client.sendQueueJobWithArgsNoteAndResources( - job_name, - commit_id, - priority, - api_key_hash, - args_str, - note_str, - options.force, - options.cpu, - options.memory, - options.gpu, - options.gpu_memory, - ); - } else { - try client.sendQueueJobWithArgsAndResources( - job_name, - commit_id, - priority, - api_key_hash, - args_str, - options.force, - options.cpu, - options.memory, - options.gpu, - options.gpu_memory, - ); - } - } else if (snapshot_id) |sid| { - try client.sendQueueJobWithSnapshotAndResources( - job_name, - commit_id, - priority, - api_key_hash, - sid, - snapshot_sha256.?, - options.cpu, - options.memory, - options.gpu, - options.gpu_memory, - ); - } else { - try client.sendQueueJobWithResources( - job_name, - commit_id, - priority, - api_key_hash, - options.cpu, - options.memory, - options.gpu, - options.gpu_memory, - ); - } - - // Receive and handle response with duplicate detection - const message = try client.receiveMessage(allocator); - defer allocator.free(message); - - // Try to parse as structured packet first - const packet = protocol.ResponsePacket.deserialize(message, allocator) catch { - // Fallback: handle as plain text/JSON - if (message.len > 0 and message[0] == '{') { - try handleDuplicateResponse(allocator, message, job_name, commit_hex, options); - } else { - std.debug.print("Server response: {s}\n", .{message}); - } - return; - }; - defer packet.deinit(allocator); - - switch (packet.packet_type) { - .success => { - history.record(allocator, job_name, commit_hex) catch |err| { - std.debug.print("Warning: failed to record job in history ({})\n", .{err}); - }; - if (options.json) { - std.debug.print("{{\"success\":true,\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"status\":\"queued\"}}\n", .{ job_name, commit_hex }); - } else { - std.debug.print("Job queued: {s}\n", .{job_name}); - if (print_next_steps) { - const next_steps = try common.formatNextSteps(allocator, job_name, commit_hex); - defer allocator.free(next_steps); - std.debug.print("{s}\n", .{next_steps}); - } - } - }, - .error_packet => { - const err_msg = packet.error_message orelse "Unknown error"; - if (options.json) { - std.debug.print("{{\"success\":false,\"error\":\"{s}\"}}\n", .{err_msg}); - } else { - std.debug.print("Error: {s}\n", .{err_msg}); - } - return error.ServerError; - }, - else => { - try client.handleResponsePacket(packet, "Job queue"); - history.record(allocator, job_name, commit_hex) catch |err| { - std.debug.print("Warning: failed to record job in history ({})\n", .{err}); - }; - if (print_next_steps) { - const next_steps = try common.formatNextSteps(allocator, job_name, commit_hex); - defer allocator.free(next_steps); - std.debug.print("{s}\n", .{next_steps}); - } - }, - } -} - -fn printUsage() !void { - std.debug.print("Usage: ml queue [options] [job_name2 ...]\n\n", .{}); - std.debug.print("Options:\n", .{}); - std.debug.print("\t--priority <1-10>\tJob priority (default: 5)\n", .{}); - std.debug.print("\t--commit \t\tSpecific commit to run\n", .{}); - std.debug.print("\t--snapshot-id \tSnapshot ID to use\n", .{}); - std.debug.print("\t--snapshot-sha256 \tSnapshot SHA256 to use\n", .{}); - std.debug.print("\t--dry-run\t\tShow what would be queued\n", .{}); - std.debug.print("\t--explain \tReason for running\n", .{}); - std.debug.print("\t--json\t\t\tOutput machine-readable JSON\n", .{}); - std.debug.print("\t--help, -h\t\tShow this help message\n", .{}); - std.debug.print("\t--context \tBackground context for this experiment\n", .{}); - std.debug.print("\t--intent \t\tWhat you're trying to accomplish\n", .{}); - std.debug.print("\t--expected-outcome \tWhat you expect to happen\n", .{}); - std.debug.print("\t--experiment-group \tGroup related experiments\n", .{}); - std.debug.print("\t--tags \t\tComma-separated tags (e.g., ablation,batch-size)\n", .{}); - std.debug.print("\nSpecial Modes:\n", .{}); - std.debug.print("\t--rerun \tRe-queue a completed local run to server\n", .{}); - std.debug.print("\t--dry-run\t\tShow what would be queued\n", .{}); - std.debug.print("\t--validate\t\tValidate experiment without queuing\n", .{}); - std.debug.print("\t--explain\t\tExplain what will happen\n", .{}); - std.debug.print("\t--json\t\t\tOutput structured JSON\n", .{}); - std.debug.print("\t--force\t\t\tQueue even if duplicate exists\n", .{}); - std.debug.print("\nTracking:\n", .{}); - std.debug.print("\t--mlflow\t\tEnable MLflow (sidecar)\n", .{}); - std.debug.print("\t--mlflow-uri \tEnable MLflow (remote)\n", .{}); - std.debug.print("\t--tensorboard\t\tEnable TensorBoard\n", .{}); - std.debug.print("\t--wandb-key \tEnable Wandb with API key\n", .{}); - std.debug.print("\t--wandb-project \tSet Wandb project\n", .{}); - std.debug.print("\t--wandb-entity \tSet Wandb entity\n", .{}); - std.debug.print("\nSandboxing:\n", .{}); - std.debug.print("\t--network \tNetwork mode: none, bridge, slirp4netns\n", .{}); - std.debug.print("\t--read-only\t\tMount root filesystem as read-only\n", .{}); - std.debug.print("\t--secret \t\tInject secret as env var (can repeat)\n", .{}); - std.debug.print("\nScheduler Options:\n", .{}); - std.debug.print("\t--reservation \tUse existing GPU reservation\n", .{}); - std.debug.print("\t--gang-size \t\tRequest gang scheduling for multi-node jobs\n", .{}); - std.debug.print("\t--max-wait \tMaximum wait time before failing\n", .{}); - std.debug.print("\t--preemptible\t\tAllow job to be preempted\n", .{}); - std.debug.print("\t--worker \t\tPrefer specific worker\n", .{}); - std.debug.print("\nExamples:\n", .{}); - std.debug.print("\tml queue my_job\t\t\t # Queue a job\n", .{}); - std.debug.print("\tml queue my_job --dry-run\t # Preview submission\n", .{}); - std.debug.print("\tml queue my_job --validate\t # Validate locally\n", .{}); - std.debug.print("\tml queue --rerun abc123\t # Re-queue completed run\n", .{}); - std.debug.print("\tml status --watch\t\t # Watch queue + prewarm\n", .{}); - std.debug.print("\nResearch Examples:\n", .{}); - std.debug.print("\tml queue train.py --hypothesis 'LR scaling improves convergence'\n", .{}); - std.debug.print("\t\t--context 'Following paper XYZ' --tags ablation,lr-scaling\n", .{}); -} - -fn explainJob( - allocator: std.mem.Allocator, - job_name: []const u8, - commit_override: ?[]const u8, - priority: u8, - options: *const QueueOptions, -) !void { - var commit_display: []const u8 = "current-git-head"; - var commit_display_owned: ?[]u8 = null; - defer if (commit_display_owned) |b| allocator.free(b); - if (commit_override) |cid| { - const enc = try crypto.encodeHexLower(allocator, cid); - commit_display_owned = enc; - commit_display = enc; - } - - // Build narrative JSON for display - const narrative_json = common.buildNarrativeJson(allocator, options) catch null; - defer if (narrative_json) |j| allocator.free(j); - - if (options.json) { - const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; - var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"explain\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch |err| { - std.log.err("Failed to format output: {}", .{err}); - return error.FormatError; - }; - try stdout_file.writeAll(formatted); - try writeJSONNullableString(&stdout_file, options.gpu_memory); - if (narrative_json) |nj| { - try stdout_file.writeAll("},\"narrative\":"); - try stdout_file.writeAll(nj); - try stdout_file.writeAll("}\n"); - } else { - try stdout_file.writeAll("}}\n"); - } - return; - } else { - std.debug.print("Job Explanation:\n", .{}); - std.debug.print("\tJob Name: {s}\n", .{job_name}); - std.debug.print("\tCommit ID: {s}\n", .{commit_display}); - std.debug.print("\tPriority: {d}\n", .{priority}); - std.debug.print("\tResources Requested:\n", .{}); - std.debug.print("\t\tCPU: {d} cores\n", .{options.cpu}); - std.debug.print("\t\tMemory: {d} GB\n", .{options.memory}); - std.debug.print("\t\tGPU: {d} device(s)\n", .{options.gpu}); - std.debug.print("\t\tGPU Memory: {s}\n", .{options.gpu_memory orelse "auto"}); - - // Display narrative if provided - if (narrative_json != null) { - std.debug.print("\n\tResearch Narrative:\n", .{}); - if (options.hypothesis) |h| { - std.debug.print("\t\tHypothesis: {s}\n", .{h}); - } - if (options.context) |c| { - std.debug.print("\t\tContext: {s}\n", .{c}); - } - if (options.intent) |i| { - std.debug.print("\t\tIntent: {s}\n", .{i}); - } - if (options.expected_outcome) |eo| { - std.debug.print("\t\tExpected Outcome: {s}\n", .{eo}); - } - if (options.experiment_group) |eg| { - std.debug.print("\t\tExperiment Group: {s}\n", .{eg}); - } - if (options.tags) |t| { - std.debug.print("\t\tTags: {s}\n", .{t}); - } - } - - std.debug.print("\n Action: Job would be queued for execution\n", .{}); - } -} - -fn validateJob( - allocator: std.mem.Allocator, - job_name: []const u8, - commit_override: ?[]const u8, - options: *const QueueOptions, -) !void { - var commit_display: []const u8 = "current-git-head"; - var commit_display_owned: ?[]u8 = null; - defer if (commit_display_owned) |b| allocator.free(b); - if (commit_override) |cid| { - const enc = try crypto.encodeHexLower(allocator, cid); - commit_display_owned = enc; - commit_display = enc; - } - - // Basic local validation - simplified without JSON ObjectMap for now - - // Check if current directory has required files - const train_script_exists = if (std.fs.cwd().access("train.py", .{})) true else |err| switch (err) { - error.FileNotFound => false, - else => false, // Treat other errors as file doesn't exist - }; - const requirements_exists = if (std.fs.cwd().access("requirements.txt", .{})) true else |err| switch (err) { - error.FileNotFound => false, - else => false, // Treat other errors as file doesn't exist - }; - const overall_valid = train_script_exists and requirements_exists; - - if (options.json) { - const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; - var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"validate\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"checks\":{{\"train_py\":{s},\"requirements_txt\":{s}}},\"ok\":{s}}}\n", .{ job_name, commit_display, if (train_script_exists) "true" else "false", if (requirements_exists) "true" else "false", if (overall_valid) "true" else "false" }) catch unreachable; - try stdout_file.writeAll(formatted); - return; - } else { - std.debug.print("Validation Results:\n", .{}); - std.debug.print("\tJob Name: {s}\n", .{job_name}); - std.debug.print("\tCommit ID: {s}\n", .{commit_display}); - - std.debug.print("\tRequired Files:\n", .{}); - const train_status = if (train_script_exists) "yes" else "no"; - const req_status = if (requirements_exists) "yes" else "no"; - std.debug.print("\t\ttrain.py {s}\n", .{train_status}); - std.debug.print("\t\trequirements.txt {s}\n", .{req_status}); - - if (overall_valid) { - std.debug.print("\tValidation passed - job is ready to queue\n", .{}); - } else { - std.debug.print("\tValidation failed - missing required files\n", .{}); - } - } -} - -fn dryRunJob( - allocator: std.mem.Allocator, - job_name: []const u8, - commit_override: ?[]const u8, - priority: u8, - options: *const QueueOptions, -) !void { - var commit_display: []const u8 = "current-git-head"; - var commit_display_owned: ?[]u8 = null; - defer if (commit_display_owned) |b| allocator.free(b); - if (commit_override) |cid| { - const enc = try crypto.encodeHexLower(allocator, cid); - commit_display_owned = enc; - commit_display = enc; - } - - // Build narrative JSON for display - const narrative_json = common.buildNarrativeJson(allocator, options) catch null; - defer if (narrative_json) |j| allocator.free(j); - - if (options.json) { - const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; - var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"dry_run\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch |err| { - std.log.err("Failed to format output: {}", .{err}); - return error.FormatError; - }; - try stdout_file.writeAll(formatted); - try writeJSONNullableString(&stdout_file, options.gpu_memory); - if (narrative_json) |nj| { - try stdout_file.writeAll("},\"narrative\":"); - try stdout_file.writeAll(nj); - try stdout_file.writeAll(",\"would_queue\":true}}\n"); - } else { - try stdout_file.writeAll("},\"would_queue\":true}}\n"); - } - return; - } else { - std.debug.print("Dry Run - Job Queue Preview:\n", .{}); - std.debug.print("\tJob Name: {s}\n", .{job_name}); - std.debug.print("\tCommit ID: {s}\n", .{commit_display}); - std.debug.print("\tPriority: {d}\n", .{priority}); - std.debug.print("\tResources Requested:\n", .{}); - std.debug.print("\t\tCPU: {d} cores\n", .{options.cpu}); - std.debug.print("\t\tMemory: {d} GB\n", .{options.memory}); - std.debug.print("\t\tGPU: {d} device(s)\n", .{options.gpu}); - std.debug.print("\t\tGPU Memory: {s}\n", .{options.gpu_memory orelse "auto"}); - - // Display narrative if provided - if (narrative_json != null) { - std.debug.print("\n\tResearch Narrative:\n", .{}); - if (options.hypothesis) |h| { - std.debug.print("\t\tHypothesis: {s}\n", .{h}); - } - if (options.context) |c| { - std.debug.print("\t\tContext: {s}\n", .{c}); - } - if (options.intent) |i| { - std.debug.print("\t\tIntent: {s}\n", .{i}); - } - if (options.expected_outcome) |eo| { - std.debug.print("\t\tExpected Outcome: {s}\n", .{eo}); - } - if (options.experiment_group) |eg| { - std.debug.print("\t\tExperiment Group: {s}\n", .{eg}); - } - if (options.tags) |t| { - std.debug.print("\t\tTags: {s}\n", .{t}); - } - } - - std.debug.print("\n\tAction: Would queue job\n", .{}); - std.debug.print("\tEstimated queue time: 2-5 minutes\n", .{}); - std.debug.print("\tDry run completed - no job was actually queued\n", .{}); - } -} - -fn writeJSONNullableString(writer: anytype, s: ?[]const u8) !void { - if (s) |val| { - try writeJSONString(writer, val); - } else { - try writer.writeAll("null"); - } -} - -fn writeJSONString(writer: anytype, s: []const u8) !void { - try writer.writeAll("\""); - for (s) |c| { - switch (c) { - '"' => try writer.writeAll("\\\""), - '\\' => try writer.writeAll("\\\\"), - '\n' => try writer.writeAll("\\n"), - '\r' => try writer.writeAll("\\r"), - '\t' => try writer.writeAll("\\t"), - else => { - if (c < 0x20) { - var buf: [6]u8 = undefined; - buf[0] = '\\'; - buf[1] = 'u'; - buf[2] = '0'; - buf[3] = '0'; - buf[4] = hexDigit(@intCast((c >> 4) & 0x0F)); - buf[5] = hexDigit(@intCast(c & 0x0F)); - try writer.writeAll(&buf); - } else { - try writer.writeAll(&[_]u8{c}); - } - }, - } - } - try writer.writeAll("\""); -} - -fn handleDuplicateResponse( - allocator: std.mem.Allocator, - payload: []const u8, - job_name: []const u8, - commit_hex: []const u8, - options: *const QueueOptions, -) !void { - const parsed = std.json.parseFromSlice(std.json.Value, allocator, payload, .{}) catch { - if (options.json) { - std.debug.print("{s}\n", .{payload}); - } else { - std.debug.print("Server response: {s}\n", .{payload}); - } - return; - }; - defer parsed.deinit(); - - const root = parsed.value.object; - const is_dup = root.get("duplicate") != null and root.get("duplicate").?.bool; - if (!is_dup) { - if (options.json) { - std.debug.print("{s}\n", .{payload}); - } else { - std.debug.print("Job queued: {s}\n", .{job_name}); - } - return; - } - - const existing_id = root.get("existing_id").?.string; - const status = root.get("status").?.string; - const queued_by = root.get("queued_by").?.string; - const queued_at = root.get("queued_at").?.integer; - const now = std.time.timestamp(); - const minutes_ago = @divTrunc(now - queued_at, 60); - - if (std.mem.eql(u8, status, "queued") or std.mem.eql(u8, status, "running")) { - if (options.json) { - std.debug.print("{{\"success\":true,\"duplicate\":true,\"existing_id\":\"{s}\",\"status\":\"{s}\",\"queued_by\":\"{s}\",\"minutes_ago\":{d},\"suggested_action\":\"watch\"}}\n", .{ existing_id, status, queued_by, minutes_ago }); - } else { - std.debug.print("\nIdentical job already in progress: {s}\n", .{existing_id[0..8]}); - std.debug.print("\tQueued by {s}, {d} minutes ago\n", .{ queued_by, minutes_ago }); - std.debug.print("\tStatus: {s}\n", .{status}); - std.debug.print("\n\tWatch: ml watch {s}\n", .{existing_id[0..8]}); - std.debug.print("\tRerun: ml queue {s} --commit {s} --force\n", .{ job_name, commit_hex }); - } - } else if (std.mem.eql(u8, status, "completed")) { - const duration_sec = root.get("duration_seconds").?.integer; - const duration_min = @divTrunc(duration_sec, 60); - if (options.json) { - std.debug.print("{{\"success\":true,\"duplicate\":true,\"existing_id\":\"{s}\",\"status\":\"completed\",\"queued_by\":\"{s}\",\"duration_minutes\":{d},\"suggested_action\":\"show\"}}\n", .{ existing_id, queued_by, duration_min }); - } else { - std.debug.print("\nIdentical job already completed: {s}\n", .{existing_id[0..8]}); - std.debug.print(" Queued by {s}\n", .{queued_by}); - const metrics = root.get("metrics"); - if (metrics) |m| { - if (m == .object) { - std.debug.print("\n Results:\n", .{}); - if (m.object.get("accuracy")) |v| { - if (v == .float) std.debug.print(" accuracy: {d:.3}\n", .{v.float}); - } - if (m.object.get("loss")) |v| { - if (v == .float) std.debug.print(" loss: {d:.3}\n", .{v.float}); - } - } - } - std.debug.print("\t\tduration: {d}m\n", .{duration_min}); - std.debug.print("\n\tInspect: ml experiment show {s}\n", .{existing_id[0..8]}); - std.debug.print("\tRerun: ml queue {s} --commit {s} --force\n", .{ job_name, commit_hex }); - } - } else if (std.mem.eql(u8, status, "failed")) { - const error_reason = root.get("error_reason").?.string; - const failure_class = if (root.get("failure_class")) |fc| fc.string else "unknown"; - const exit_code = if (root.get("exit_code")) |ec| ec.integer else 0; - const signal = if (root.get("signal")) |s| s.string else ""; - const log_tail = if (root.get("log_tail")) |lt| lt.string else ""; - const suggestion = if (root.get("suggestion")) |s| s.string else ""; - const retry_count = if (root.get("retry_count")) |rc| rc.integer else 0; - const retry_cap = if (root.get("retry_cap")) |rc| rc.integer else 3; - const auto_retryable = if (root.get("auto_retryable")) |ar| ar.bool else false; - const requires_fix = if (root.get("requires_fix")) |rf| rf.bool else false; - - if (options.json) { - const suggested_action = if (requires_fix) "fix" else if (auto_retryable) "wait" else "requeue"; - std.debug.print("{{\"success\":true,\"duplicate\":true,\"existing_id\":\"{s}\",\"status\":\"failed\",\"failure_class\":\"{s}\",\"exit_code\":{d},\"signal\":\"{s}\",\"error_reason\":\"{s}\",\"retry_count\":{d},\"retry_cap\":{d},\"auto_retryable\":{},\"requires_fix\":{},\"suggested_action\":\"{s}\"}}\n", .{ existing_id, failure_class, exit_code, signal, error_reason, retry_count, retry_cap, auto_retryable, requires_fix, suggested_action }); - } else { - // Print rich failure information based on FailureClass - std.debug.print("\nFAILED {s} {s} failure\n", .{ existing_id[0..8], failure_class }); - - if (signal.len > 0) { - std.debug.print("\tSignal: {s} (exit code: {d})\n", .{ signal, exit_code }); - } else if (exit_code != 0) { - std.debug.print("\tExit code: {d}\n", .{exit_code}); - } - - // Show log tail if available - if (log_tail.len > 0) { - // Truncate long log tails - const display_tail = if (log_tail.len > 160) log_tail[0..160] else log_tail; - std.debug.print("\tLog: {s}...\n", .{display_tail}); - } - - // Show retry history - if (retry_count > 0) { - if (auto_retryable and retry_count < retry_cap) { - std.debug.print("\tRetried: {d}/{d} — auto-retry in progress\n", .{ retry_count, retry_cap }); - } else { - std.debug.print("\tRetried: {d}/{d}\n", .{ retry_count, retry_cap }); - } - } - - // Class-specific guidance per design spec - if (std.mem.eql(u8, failure_class, "infrastructure")) { - std.debug.print("\n\tInfrastructure failure (node died, preempted).\n", .{}); - if (auto_retryable and retry_count < retry_cap) { - std.debug.print("\t-> Auto-retrying transparently (attempt {d}/{d})\n", .{ retry_count + 1, retry_cap }); - } else if (retry_count >= retry_cap) { - std.debug.print("\t-> Retry cap reached. Requires manual intervention.\n", .{}); - std.debug.print("\tResubmit: ml requeue {s}\n", .{existing_id[0..8]}); - } - std.debug.print("\tLogs: ml logs {s}\n", .{existing_id[0..8]}); - } else if (std.mem.eql(u8, failure_class, "code")) { - // CRITICAL RULE: code failures never auto-retry - std.debug.print("\n\tCode failure — auto-retry is blocked.\n", .{}); - std.debug.print("\tYou must fix the code before resubmitting.\n", .{}); - std.debug.print("\t\tView logs: ml logs {s}\n", .{existing_id[0..8]}); - std.debug.print("\n\tAfter fix:\n", .{}); - std.debug.print("\t\tRequeue with same config:\n", .{}); - std.debug.print("\t\t\tml requeue {s}\n", .{existing_id[0..8]}); - std.debug.print("\t\tOr with more resources:\n", .{}); - std.debug.print("\t\t\tml requeue {s} --gpu-memory 16\n", .{existing_id[0..8]}); - } else if (std.mem.eql(u8, failure_class, "data")) { - // Data failures never auto-retry - std.debug.print("\n\tData failure — verification/checksum issue.\n", .{}); - std.debug.print("\tAuto-retry will fail again with same data.\n", .{}); - std.debug.print("\n\tCheck:\n", .{}); - std.debug.print("\t\tDataset availability: ml dataset verify {s}\n", .{existing_id[0..8]}); - std.debug.print("\t\tView logs: ml logs {s}\n", .{existing_id[0..8]}); - std.debug.print("\n\tAfter data issue resolved:\n", .{}); - std.debug.print("\t\t\tml requeue {s}\n", .{existing_id[0..8]}); - } else if (std.mem.eql(u8, failure_class, "resource")) { - std.debug.print("\n\tResource failure — OOM or disk full.\n", .{}); - if (retry_count == 0 and auto_retryable) { - std.debug.print("\t-> Will retry once with backoff (30s delay)\n", .{}); - } else if (retry_count >= 1) { - std.debug.print("\t-> Retried once, failed again with same error.\n", .{}); - std.debug.print("\n\tSuggestion: resubmit with more resources:\n", .{}); - std.debug.print("\t\tml requeue {s} --gpu-memory 16\n", .{existing_id[0..8]}); - std.debug.print("\t\tml requeue {s} --memory 32 --cpu 8\n", .{existing_id[0..8]}); - } - std.debug.print("\n\tCheck capacity: ml status\n", .{}); - std.debug.print("\tLogs: ml logs {s}\n", .{existing_id[0..8]}); - } else { - // Unknown failures - std.debug.print("\n\tUnknown failure — classification unclear.\n", .{}); - std.debug.print("\n\tReview full logs and decide:\n", .{}); - std.debug.print("\t\tml logs {s}\n", .{existing_id[0..8]}); - if (auto_retryable) { - std.debug.print("\n\tOr retry:\n", .{}); - std.debug.print("\t\tml requeue {s}\n", .{existing_id[0..8]}); - } - } - - // Always show the suggestion if available - if (suggestion.len > 0) { - std.debug.print("\n\t{s}\n", .{suggestion}); - } - } - } -} - -fn hexDigit(v: u8) u8 { - return if (v < 10) ('0' + v) else ('a' + (v - 10)); -} - -/// Check if a job with the same commit_id already exists on the server -/// Returns: Optional JSON response from server if duplicate found -fn checkExistingJob( - allocator: std.mem.Allocator, - job_name: []const u8, - commit_id: []const u8, - api_key_hash: []const u8, - config: Config, -) !?[]const u8 { - // Connect to server and query for existing job - const ws_url = try config.getWebSocketUrl(allocator); - defer allocator.free(ws_url); - - var client = try ws.Client.connect(allocator, ws_url, config.api_key); - defer client.close(); - - // Send query for existing job - try client.sendQueryJobByCommit(job_name, commit_id, api_key_hash); - - const message = try client.receiveMessage(allocator); - defer allocator.free(message); - - // Parse response - const parsed = std.json.parseFromSlice(std.json.Value, allocator, message, .{}) catch |err| { - // If JSON parse fails, treat as no duplicate found - std.log.debug("Failed to parse check response: {}", .{err}); - return null; - }; - defer parsed.deinit(); - - const root = parsed.value.object; - - // Check if job exists - if (root.get("exists")) |exists| { - if (!exists.bool) return null; - - // Job exists - copy the full response for caller - return try allocator.dupe(u8, message); - } - - return null; -} diff --git a/cli/src/commands/run.zig b/cli/src/commands/run.zig index 7638282..966f086 100644 --- a/cli/src/commands/run.zig +++ b/cli/src/commands/run.zig @@ -1,424 +1,218 @@ const std = @import("std"); -const db = @import("../db.zig"); -const manifest_lib = @import("../manifest.zig"); const core = @import("../core.zig"); const config = @import("../config.zig"); +const mode = @import("../mode.zig"); +const common = @import("common.zig"); -extern fn execvp(path: [*:0]const u8, argv: [*]const ?[*:0]const u8) c_int; -extern fn waitpid(pid: c_int, status: *c_int, flags: c_int) c_int; +const remote = @import("exec/remote.zig"); +const local = @import("exec/local.zig"); -// Get current environment from libc -extern var environ: [*]const ?[*:0]const u8; +pub const RunMode = enum { + local, + remote, +}; -// Inline macros for wait status parsing (not available as extern on macOS) -fn WIFEXITED(status: c_int) c_int { - return if ((status & 0x7F) == 0) 1 else 0; -} -fn WEXITSTATUS(status: c_int) c_int { - return (status >> 8) & 0xFF; -} -fn WIFSIGNALED(status: c_int) c_int { - return if (((status & 0x7F) != 0x7F) and ((status & 0x7F) != 0)) 1 else 0; -} -fn WTERMSIG(status: c_int) c_int { - return status & 0x7F; -} -const Manifest = manifest_lib.RunManifest; +pub const RunOptions = struct { + cpu: u8 = 1, + memory: u8 = 4, + gpu: u8 = 0, + gpu_memory: ?[]const u8 = null, + priority: u8 = 5, + dry_run: bool = false, + validate: bool = false, + explain: bool = false, + force: bool = false, + hypothesis: ?[]const u8 = null, + context: ?[]const u8 = null, + intent: ?[]const u8 = null, + expected_outcome: ?[]const u8 = null, + tags: ?[]const u8 = null, +}; -/// Run command - always executes locally -/// Usage: -/// ml run # Use entrypoint from config + args -/// ml run --lr 0.001 # Args appended to entrypoint -/// ml run -- python train.py # Explicit command +/// Unified run command - transparently handles local and remote execution pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void { var flags = core.flags.CommonFlags{}; - var command_args = try core.flags.parseCommon(allocator, args, &flags); - defer command_args.deinit(allocator); + var force_local = false; + var force_remote = false; - core.output.setMode(if (flags.json) .json else .text); - - if (flags.help) { - return printUsage(); + // Find "--" separator + var sep_index: ?usize = null; + for (args, 0..) |a, idx| { + if (std.mem.eql(u8, a, "--")) { + sep_index = idx; + break; + } } + const pre = args[0..(sep_index orelse args.len)]; + + // Parse options + var job_name: ?[]const u8 = null; + var options = RunOptions{}; + + var i: usize = 0; + while (i < pre.len) : (i += 1) { + const arg = pre[i]; + + if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { + try printUsage(); + return; + } else if (std.mem.eql(u8, arg, "--json")) { + flags.json = true; + } else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) { + options.priority = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) { + options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) { + options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) { + options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) { + options.gpu_memory = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--dry-run")) { + options.dry_run = true; + } else if (std.mem.eql(u8, arg, "--validate")) { + options.validate = true; + } else if (std.mem.eql(u8, arg, "--explain")) { + options.explain = true; + } else if (std.mem.eql(u8, arg, "--local")) { + force_local = true; + } else if (std.mem.eql(u8, arg, "--remote")) { + force_remote = true; + } else if (std.mem.eql(u8, arg, "--force")) { + options.force = true; + } else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) { + options.hypothesis = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) { + options.context = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) { + options.intent = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) { + options.expected_outcome = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) { + options.tags = pre[i + 1]; + i += 1; + } else if (!std.mem.startsWith(u8, arg, "-")) { + if (job_name == null) { + job_name = arg; + } + } + } + + if (job_name == null) { + try printUsage(); + return error.InvalidArgs; + } + + // Build args string + var args_str: []const u8 = ""; + if (sep_index) |si| { + const post = args[(si + 1)..]; + if (post.len > 0) { + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); + defer buf.deinit(allocator); + for (post, 0..) |a, j| { + if (j > 0) try buf.append(allocator, ' '); + try buf.appendSlice(allocator, a); + } + args_str = try buf.toOwnedSlice(allocator); + } + } + defer if (sep_index != null and args_str.len > 0) allocator.free(args_str); + const cfg = try config.Config.load(allocator); defer { - var mut_cfg = cfg; - mut_cfg.deinit(allocator); + var mut = cfg; + mut.deinit(allocator); } - // Parse command: entrypoint + args, or explicit -- command - const command = try resolveCommand(allocator, &cfg, command_args.items); - defer freeCommand(allocator, command); + // Determine execution mode + var run_mode: RunMode = undefined; - // Generate run_id - const run_id = try db.generateUUID(allocator); - defer allocator.free(run_id); - - // Determine experiment name - const experiment_name = if (cfg.experiment) |exp| exp.name else "default"; - - // Build artifact path - const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{ - cfg.artifact_path, - experiment_name, - run_id, - }); - defer allocator.free(artifact_path); - - // Create run directory - std.fs.makeDirAbsolute(artifact_path) catch |err| { - if (err != error.PathAlreadyExists) { - std.log.err("Failed to create run directory: {}", .{err}); - return error.MkdirFailed; - } - }; - - // Get DB path and initialize if needed (lazy bootstrap) - const db_path = try cfg.getDBPath(allocator); - defer allocator.free(db_path); - - var database = try initOrOpenDB(allocator, db_path); - defer database.close(); - - // Write run manifest (status=RUNNING) - const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ - artifact_path, - "run_manifest.json", - }); - defer allocator.free(manifest_path); - - const timestamp = try db.currentTimestamp(allocator); - defer allocator.free(timestamp); - - var manifest = Manifest.init(allocator); - manifest.run_id = run_id; - manifest.experiment = experiment_name; - manifest.command = try std.mem.join(allocator, " ", command); - manifest.args = try duplicateStrings(allocator, command); - manifest.started_at = try allocator.dupe(u8, timestamp); - manifest.status = "RUNNING"; - manifest.artifact_path = artifact_path; - manifest.synced = false; - - // Insert run into database - const run_name = try std.fmt.allocPrint(allocator, "run-{s}", .{run_id[0..8]}); - defer allocator.free(run_name); - - const sql = "INSERT INTO ml_runs (run_id, experiment_id, name, status, start_time, synced) VALUES (?, ?, ?, 'RUNNING', ?, 0);"; - const stmt = try database.prepare(sql); - defer db.DB.finalize(stmt); - - try db.DB.bindText(stmt, 1, run_id); - try db.DB.bindText(stmt, 2, experiment_name); - try db.DB.bindText(stmt, 3, run_name); - try db.DB.bindText(stmt, 4, timestamp); - _ = try db.DB.step(stmt); - - // Write manifest - try manifest_lib.writeManifest(manifest, manifest_path, allocator); - - // Fork and execute - const output_log_path = try std.fs.path.join(allocator, &[_][]const u8{ - artifact_path, - "output.log", - }); - defer allocator.free(output_log_path); - - // Execute and capture - const exit_code = try executeAndCapture( - allocator, - command, - output_log_path, - &database, - run_id, - ); - - // Update run status in database - const end_time = try db.currentTimestamp(allocator); - defer allocator.free(end_time); - - const status = if (exit_code == 0) "FINISHED" else "FAILED"; - - const update_sql = "UPDATE ml_runs SET status = ?, end_time = ?, exit_code = ?, pid = NULL WHERE run_id = ?;"; - const update_stmt = try database.prepare(update_sql); - defer db.DB.finalize(update_stmt); - - try db.DB.bindText(update_stmt, 1, status); - try db.DB.bindText(update_stmt, 2, end_time); - try db.DB.bindInt64(update_stmt, 3, exit_code); - try db.DB.bindText(update_stmt, 4, run_id); - _ = try db.DB.step(update_stmt); - - // Update manifest - try manifest_lib.updateManifestStatus(manifest_path, status, exit_code, allocator); - - // Checkpoint WAL - database.checkpointOnExit(); - - // Print result - if (flags.json) { - std.debug.print("{{\"success\":true,\"run_id\":\"{s}\",\"status\":\"{s}\",\"exit_code\":{d}}}\n", .{ - run_id, - status, - exit_code, - }); + if (force_local) { + run_mode = .local; + } else if (force_remote) { + run_mode = .remote; } else { - std.debug.print("[OK] Run {s} complete ({s})\n", .{ run_id[0..8], status }); - if (cfg.sync_uri.len > 0) { - std.debug.print("-> queued for sync\n", .{}); - } - } -} - -/// Resolve command from entrypoint + args, or explicit -- command -fn resolveCommand(allocator: std.mem.Allocator, cfg: *const config.Config, args: []const []const u8) ![][]const u8 { - // Check for explicit -- separator - var double_dash_idx: ?usize = null; - for (args, 0..) |arg, i| { - if (std.mem.eql(u8, arg, "--")) { - double_dash_idx = i; - break; + const mode_result = try mode.detect(allocator, cfg); + run_mode = if (mode.isOnline(mode_result.mode)) .remote else .local; + if (mode_result.warning) |warn| { + std.log.info("{s}", .{warn}); } } - if (double_dash_idx) |idx| { - // Explicit command after -- - if (idx + 1 >= args.len) { - std.log.err("No command provided after --", .{}); - return error.NoCommand; - } - return try allocator.dupe([]const u8, args[idx + 1 ..]); + // Handle special modes + if (options.dry_run) { + return try common.dryRun(allocator, job_name.?, run_mode, &options, args_str); } - // Use entrypoint from config + args - if (cfg.experiment) |exp| { - if (exp.entrypoint.len > 0) { - // Split entrypoint on spaces - var argv = try std.ArrayList([]const u8).initCapacity(allocator, 8); + if (options.validate) { + return try validateJob(allocator, job_name.?, &options); + } - // Parse entrypoint (split on spaces) - var iter = std.mem.splitScalar(u8, exp.entrypoint, ' '); - while (iter.next()) |part| { - if (part.len > 0) { - try argv.append(allocator, try allocator.dupe(u8, part)); - } + if (options.explain) { + return try explainJob(allocator, job_name.?, &options); + } + + // Execute + switch (run_mode) { + .remote => { + try remote.execute(allocator, job_name.?, options.priority, &options, args_str, cfg); + }, + .local => { + const run_id = try local.execute(allocator, job_name.?, &options, args_str, cfg); + try local.markForSync(allocator, run_id); + if (!flags.json) { + std.debug.print("\nRun completed locally (run_id: {s})\n", .{run_id[0..@min(8, run_id.len)]}); + std.debug.print("Will sync to server when connection is available\n", .{}); } - - // Append args - for (args) |arg| { - try argv.append(allocator, try allocator.dupe(u8, arg)); - } - - return try argv.toOwnedSlice(allocator); - } + }, } - - // No entrypoint configured and no explicit command - std.log.err("No entrypoint configured. Set entrypoint in .fetchml/config.toml or use: ml run -- ", .{}); - return error.NoEntrypoint; } -/// Free command array -fn freeCommand(allocator: std.mem.Allocator, command: [][]const u8) void { - for (command) |arg| { - allocator.free(arg); - } - allocator.free(command); +fn validateJob(allocator: std.mem.Allocator, job_name: []const u8, options: *const RunOptions) !void { + _ = options; + const train_script_exists = if (std.fs.cwd().access("train.py", .{})) true else |_| false; + const requirements_exists = if (std.fs.cwd().access("requirements.txt", .{})) true else |_| false; + const overall_valid = train_script_exists and requirements_exists; + + std.debug.print("Validation Results for '{s}':\n", .{job_name}); + std.debug.print(" train.py: {s}\n", .{if (train_script_exists) "yes" else "no"}); + std.debug.print(" requirements.txt: {s}\n", .{if (requirements_exists) "yes" else "no"}); + std.debug.print("\n{s}\n", .{if (overall_valid) "✓ Validation passed" else "✗ Validation failed"}); + _ = allocator; } -/// Duplicate array of strings -fn duplicateStrings(allocator: std.mem.Allocator, strings: []const []const u8) ![][]const u8 { - const result = try allocator.alloc([]const u8, strings.len); - for (strings, 0..) |s, i| { - result[i] = try allocator.dupe(u8, s); - } - return result; -} - -/// Initialize or open database (lazy bootstrap) -fn initOrOpenDB(allocator: std.mem.Allocator, db_path: []const u8) !db.DB { - const db_exists = blk: { - std.fs.accessAbsolute(db_path, .{}) catch |err| { - if (err == error.FileNotFound) break :blk false; - }; - break :blk true; - }; - - const database = try db.DB.init(allocator, db_path); - - if (!db_exists) { - std.log.info("local mode active — tracking to {s}", .{db_path}); - } - - return database; -} - -/// Execute command and capture output, parsing FETCHML_METRIC lines -fn executeAndCapture( - allocator: std.mem.Allocator, - command: []const []const u8, - output_path: []const u8, - database: *db.DB, - run_id: []const u8, -) !i32 { - // Create output file - var output_file = try std.fs.cwd().createFile(output_path, .{}); - defer output_file.close(); - - // Create pipe for stdout - const pipe = try std.posix.pipe(); - defer { - std.posix.close(pipe[0]); - std.posix.close(pipe[1]); - } - - // Fork child process - const pid = try std.posix.fork(); - - if (pid == 0) { - // Child process - std.posix.close(pipe[0]); // Close read end - - // Redirect stdout to pipe - _ = std.posix.dup2(pipe[1], std.posix.STDOUT_FILENO) catch std.process.exit(1); - _ = std.posix.dup2(pipe[1], std.posix.STDERR_FILENO) catch std.process.exit(1); - std.posix.close(pipe[1]); - - // Execute command using execvp (uses current environ) - const c_err = execvp(@ptrCast(command[0].ptr), @ptrCast(command.ptr)); - std.log.err("Failed to execute {s}: {}", .{ command[0], c_err }); - std.process.exit(1); - unreachable; - } - - // Parent process - std.posix.close(pipe[1]); // Close write end - - // Store PID in database - const pid_sql = "UPDATE ml_runs SET pid = ? WHERE run_id = ?;"; - const pid_stmt = try database.prepare(pid_sql); - defer db.DB.finalize(pid_stmt); - try db.DB.bindInt64(pid_stmt, 1, pid); - try db.DB.bindText(pid_stmt, 2, run_id); - _ = try db.DB.step(pid_stmt); - - // Read from pipe and parse FETCHML_METRIC lines - var buf: [4096]u8 = undefined; - var line_buf: [1024]u8 = undefined; - var line_len: usize = 0; - - while (true) { - const bytes_read = std.posix.read(pipe[0], &buf) catch |err| { - if (err == error.WouldBlock or err == error.BrokenPipe) break; - break; - }; - - if (bytes_read == 0) break; - - // Write to output file - try output_file.writeAll(buf[0..bytes_read]); - - // Parse lines - for (buf[0..bytes_read]) |byte| { - if (byte == '\n' or line_len >= line_buf.len - 1) { - if (line_len > 0) { - line_buf[line_len] = 0; - const line = line_buf[0..line_len]; - try parseAndLogMetric(allocator, line, database, run_id); - line_len = 0; - } - } else { - line_buf[line_len] = byte; - line_len += 1; - } - } - } - - // Wait for child - var status: c_int = 0; - _ = waitpid(@intCast(pid), &status, 0); - - // Parse exit code - if (WIFEXITED(status) != 0) { - return WEXITSTATUS(status); - } else if (WIFSIGNALED(status) != 0) { - return 128 + WTERMSIG(status); - } - - return -1; -} - -/// Parse FETCHML_METRIC line and log to database -/// Format: FETCHML_METRIC key=value [step=N] -fn parseAndLogMetric( - allocator: std.mem.Allocator, - line: []const u8, - database: *db.DB, - run_id: []const u8, -) !void { - const trimmed = std.mem.trim(u8, line, " \t\r"); - - // Check prefix - const prefix = "FETCHML_METRIC"; - if (!std.mem.startsWith(u8, trimmed, prefix)) return; - - // Get the rest after prefix - var rest = trimmed[prefix.len..]; - rest = std.mem.trimLeft(u8, rest, " \t"); - - // Parse key=value - var iter = std.mem.splitScalar(u8, rest, ' '); - const kv_part = iter.next() orelse return; - - var kv_iter = std.mem.splitScalar(u8, kv_part, '='); - const key = kv_iter.next() orelse return; - const value_str = kv_iter.next() orelse return; - - // Validate key: [a-zA-Z][a-zA-Z0-9_]* - if (key.len == 0) return; - const first_char = key[0]; - if (!std.ascii.isAlphabetic(first_char)) return; - for (key[1..]) |c| { - if (!std.ascii.isAlphanumeric(c) and c != '_') return; - } - - // Parse value - const value = std.fmt.parseFloat(f64, value_str) catch return; - - // Parse optional step - var step: i64 = 0; - while (iter.next()) |part| { - if (std.mem.startsWith(u8, part, "step=")) { - const step_str = part[5..]; - step = std.fmt.parseInt(i64, step_str, 10) catch 0; - if (step < 0) step = 0; - } - } - - // Insert metric - const sql = "INSERT INTO ml_metrics (run_id, key, value, step) VALUES (?, ?, ?, ?);"; - const stmt = try database.prepare(sql); - defer db.DB.finalize(stmt); - - try db.DB.bindText(stmt, 1, run_id); - try db.DB.bindText(stmt, 2, key); - try db.DB.bindDouble(stmt, 3, value); - try db.DB.bindInt64(stmt, 4, step); - _ = try db.DB.step(stmt); - +fn explainJob(allocator: std.mem.Allocator, job_name: []const u8, options: *const RunOptions) !void { + std.debug.print("Job Explanation for '{s}':\n", .{job_name}); + std.debug.print(" CPU: {d}, Memory: {d}GB, GPU: {d}\n", .{ options.cpu, options.memory, options.gpu }); + if (options.hypothesis) |h| std.debug.print(" Hypothesis: {s}\n", .{h}); + std.debug.print("\n Action: Would execute\n", .{}); _ = allocator; } fn printUsage() !void { - std.debug.print("Usage: ml run [options] [args...]\n", .{}); - std.debug.print("\t\t\tml run -- [args...]\n\n", .{}); - std.debug.print("Execute a run locally with experiment tracking.\n\n", .{}); - std.debug.print("Options:\n", .{}); - std.debug.print("\t--help, -h\tShow this help message\n", .{}); - std.debug.print("\t--json\t\tOutput structured JSON\n\n", .{}); - std.debug.print("Examples:\n", .{}); - std.debug.print("\tml run\t\t\t# Use entrypoint from config\n", .{}); - std.debug.print("\tml run --lr 0.001\t\t# Append args to entrypoint\n", .{}); - std.debug.print("\tml run -- python train.py\t# Run explicit command\n", .{}); + std.debug.print( + \\n + \\ml run [options] [-- ] + \\ + \\Unified run command - handles both local and remote execution. + \\ + \\Options: + \\ --priority <1-10> Job priority (default: 5) + \\ --cpu , --memory , --gpu Resources + \\ --local, --remote Force execution mode + \\ --dry-run, --validate, --explain Preview modes + \\ --hypothesis, --context, --tags Research context + \\ + , .{}); } diff --git a/cli/src/main.zig b/cli/src/main.zig index 80ca7ff..be80af5 100644 --- a/cli/src/main.zig +++ b/cli/src/main.zig @@ -37,9 +37,7 @@ pub fn main() !void { 'a' => if (std.mem.eql(u8, command, "annotate")) { try @import("commands/annotate.zig").execute(allocator, args[2..]); } else handleUnknownCommand(command), - 'e' => if (std.mem.eql(u8, command, "exec")) { - try @import("commands/exec.zig").execute(allocator, args[2..]); - } else if (std.mem.eql(u8, command, "experiment")) { + 'e' => if (std.mem.eql(u8, command, "experiment")) { try @import("commands/experiment.zig").execute(allocator, args[2..]); } else if (std.mem.eql(u8, command, "export")) { try @import("commands/export_cmd.zig").run(allocator, args[2..]); @@ -52,9 +50,6 @@ pub fn main() !void { 'r' => if (std.mem.eql(u8, command, "run")) { try @import("commands/run.zig").execute(allocator, args[2..]); } else handleUnknownCommand(command), - 'q' => if (std.mem.eql(u8, command, "queue")) { - try @import("commands/queue.zig").run(allocator, args[2..]); - } else handleUnknownCommand(command), 'd' => if (std.mem.eql(u8, command, "dataset")) { try @import("commands/dataset.zig").run(allocator, args[2..]); } else handleUnknownCommand(command), @@ -93,10 +88,8 @@ fn printUsage() void { std.debug.print("ML Experiment Manager\n\n", .{}); std.debug.print("Usage: ml [options]\n\n", .{}); std.debug.print("Commands:\n", .{}); - std.debug.print(" exec Execute job (auto local/remote)\n", .{}); + std.debug.print(" run Execute job (auto local/remote)\n", .{}); std.debug.print(" init Initialize project with config\n", .{}); - std.debug.print(" run [args] Execute a run locally\n", .{}); - std.debug.print(" queue Queue job on server\n", .{}); std.debug.print(" annotate Add metadata annotations\n", .{}); std.debug.print(" experiment Manage experiments (create, list, show)\n", .{}); std.debug.print(" logs Fetch or stream run logs\n", .{});