diff --git a/cli/src/commands/exec.zig b/cli/src/commands/exec.zig index 21ee6ef..8bf91e8 100644 --- a/cli/src/commands/exec.zig +++ b/cli/src/commands/exec.zig @@ -1,532 +1,14 @@ const std = @import("std"); -const db = @import("../db.zig"); -const manifest_lib = @import("../manifest.zig"); const core = @import("../core.zig"); -const config = @import("../config.zig"); -const mode = @import("../mode.zig"); -const queue = @import("queue.zig"); -const crypto = @import("../utils/crypto.zig"); -const ws = @import("../net/ws/client.zig"); -const protocol = @import("../net/protocol.zig"); -const history = @import("../utils/history.zig"); -extern fn execvp(path: [*:0]const u8, argv: [*]const ?[*:0]const u8) c_int; -extern fn waitpid(pid: c_int, status: *c_int, flags: c_int) c_int; -extern fn fork() c_int; -extern var environ: [*]const ?[*:0]const u8; +// Import modular exec structure +const exec_mod = @import("exec/mod.zig"); -fn WIFEXITED(status: c_int) c_int { - return if ((status & 0x7F) == 0) 1 else 0; -} -fn WEXITSTATUS(status: c_int) c_int { - return (status >> 8) & 0xFF; -} -fn WIFSIGNALED(status: c_int) c_int { - return if (((status & 0x7F) != 0x7F) and ((status & 0x7F) != 0)) 1 else 0; -} -fn WTERMSIG(status: c_int) c_int { - return status & 0x7F; -} - -const Manifest = manifest_lib.RunManifest; - -/// Unified exec command - works both locally and remotely -/// Transparently handles connectivity - user can't tell the difference -/// -/// Usage: -/// ml exec [options] # Execute (auto-detect local/remote) -/// ml exec -- [args] # Pass args to job -/// -/// Options: -/// --priority <1-10> Job priority (default: 5) -/// --cpu CPU cores requested -/// --memory Memory GB requested -/// --gpu GPU devices requested -/// --dry-run Show what would happen -/// --local Force local execution -/// --remote Force remote execution (fail if offline) -/// -/// Environment detection (auto): -/// FETCHML_LOCAL=1 Force local mode -/// Configured host Auto-detect via ping -pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void { - var flags = core.flags.CommonFlags{}; - var command_args = try core.flags.parseCommon(allocator, args, &flags); - defer command_args.deinit(allocator); - - core.output.setMode(if (flags.json) .json else .text); - - if (flags.help) { - return printUsage(); - } - - if (command_args.items.len == 0) { - std.debug.print("Error: No job name specified\n", .{}); - return printUsage(); - } - - const cfg = try config.Config.load(allocator); - defer { - var mut_cfg = cfg; - mut_cfg.deinit(allocator); - } - - // Parse job name and options - const job_name = command_args.items[0]; - - // Parse exec-specific flags - var force_local = false; - var force_remote = false; - var priority: u8 = 5; - var options = ExecOptions{ - .cpu = cfg.default_cpu, - .memory = cfg.default_memory, - .gpu = cfg.default_gpu, - .gpu_memory = cfg.default_gpu_memory, - }; - - // Support passing args after "--" - var sep_index: ?usize = null; - for (command_args.items, 0..) |a, idx| { - if (std.mem.eql(u8, a, "--")) { - sep_index = idx; - break; - } - } - const pre = command_args.items[0..(sep_index orelse command_args.items.len)]; - const post = if (sep_index) |si| command_args.items[(si + 1)..] else command_args.items[0..0]; - - // Parse flags before "--" - var i: usize = 1; // Skip job name - while (i < pre.len) : (i += 1) { - const arg = pre[i]; - if (std.mem.eql(u8, arg, "--local")) { - force_local = true; - } else if (std.mem.eql(u8, arg, "--remote")) { - force_remote = true; - } else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) { - priority = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) { - options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) { - options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) { - options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10); - i += 1; - } else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) { - options.gpu_memory = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--dry-run")) { - options.dry_run = true; - } else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) { - options.hypothesis = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) { - options.context = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) { - options.intent = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) { - options.expected_outcome = pre[i + 1]; - i += 1; - } else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) { - options.tags = pre[i + 1]; - i += 1; - } - } - - // Join post args if any - var args_str: []const u8 = ""; - if (post.len > 0) { - var buf = try std.ArrayList(u8).initCapacity(allocator, 256); - defer buf.deinit(allocator); - for (post, 0..) |a, j| { - if (j > 0) try buf.append(allocator, ' '); - try buf.appendSlice(allocator, a); - } - args_str = try buf.toOwnedSlice(allocator); - } - defer if (post.len > 0) allocator.free(args_str); - - // Determine execution mode - var exec_mode: ExecMode = undefined; - - if (force_local) { - exec_mode = .local; - } else if (force_remote) { - exec_mode = .remote; - } else { - // Auto-detect - const mode_result = try mode.detect(allocator, cfg); - exec_mode = if (mode.isOnline(mode_result.mode)) .remote else .local; - - if (mode_result.warning) |warn| { - std.log.info("{s}", .{warn}); - } - } - - if (options.dry_run) { - return try dryRun(allocator, job_name, exec_mode, &options, args_str); - } - - // Execute based on mode - switch (exec_mode) { - .remote => { - try executeRemote(allocator, job_name, priority, &options, args_str, cfg); - }, - .local => { - const run_id = try executeLocal(allocator, job_name, &options, args_str, cfg); - // Mark for sync - try markForSync(allocator, run_id); - - if (!flags.json) { - std.debug.print("\nRun completed locally (run_id: {s})\n", .{run_id[0..@min(8, run_id.len)]}); - std.debug.print("Will sync to server when connection is available\n", .{}); - std.debug.print("Use 'ml sync' to sync manually\n", .{}); - } - }, - } -} - -const ExecMode = enum { - local, - remote, -}; - -const ExecOptions = struct { - cpu: u8 = 1, - memory: u8 = 4, - gpu: u8 = 0, - gpu_memory: ?[]const u8 = null, - dry_run: bool = false, - hypothesis: ?[]const u8 = null, - context: ?[]const u8 = null, - intent: ?[]const u8 = null, - expected_outcome: ?[]const u8 = null, - tags: ?[]const u8 = null, -}; - -fn executeRemote( - allocator: std.mem.Allocator, - job_name: []const u8, - priority: u8, - options: *const ExecOptions, - args_str: []const u8, - cfg: config.Config, -) !void { - // Use queue command logic for remote execution - std.log.info("Queueing job on remote server: {s}", .{job_name}); - - const ws_url = try cfg.getWebSocketUrl(allocator); - defer allocator.free(ws_url); - - var client = try ws.Client.connect(allocator, ws_url, cfg.api_key); - defer client.close(); - - const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key); - defer allocator.free(api_key_hash); - - // Generate commit ID - var commit_bytes: [20]u8 = undefined; - std.crypto.random.bytes(&commit_bytes); - - // Build narrative JSON if provided - const narrative_json = buildNarrativeJson(allocator, options) catch null; - defer if (narrative_json) |j| allocator.free(j); - - // Send queue request - try client.sendQueueJobWithArgsAndResources( - job_name, - &commit_bytes, - priority, - api_key_hash, - args_str, - false, // force - options.cpu, - options.memory, - options.gpu, - options.gpu_memory, - ); - - // Receive response - const message = try client.receiveMessage(allocator); - defer allocator.free(message); - - const packet = protocol.ResponsePacket.deserialize(message, allocator) catch { - std.debug.print("Server response: {s}\n", .{message}); - return; - }; - defer packet.deinit(allocator); - - switch (packet.packet_type) { - .success => { - const commit_hex = try crypto.encodeHexLower(allocator, &commit_bytes); - defer allocator.free(commit_hex); - history.record(allocator, job_name, commit_hex) catch {}; - std.debug.print("Job queued: {s} (commit: {s})\n", .{ job_name, commit_hex[0..8] }); - }, - .error_packet => { - const err_msg = packet.error_message orelse "Unknown error"; - std.debug.print("Error: {s}\n", .{err_msg}); - return error.ServerError; - }, - else => { - std.debug.print("Job queued: {s}\n", .{job_name}); - }, - } -} - -fn executeLocal( - allocator: std.mem.Allocator, - job_name: []const u8, - options: *const ExecOptions, - args_str: []const u8, - cfg: config.Config, -) ![]const u8 { - // Resolve command - const command = blk: { - if (cfg.experiment) |exp| { - const ep = exp.entrypoint; - var parts = try std.ArrayList([]const u8).initCapacity(allocator, 16); - defer parts.deinit(allocator); - - var it = std.mem.splitScalar(u8, ep, ' '); - while (it.next()) |part| { - try parts.append(allocator, part); - } - - if (args_str.len > 0) { - var arg_it = std.mem.splitScalar(u8, args_str, ' '); - while (arg_it.next()) |arg| { - try parts.append(allocator, arg); - } - } - - break :blk try parts.toOwnedSlice(allocator); - } else { - // Default: python job_name - const has_args = args_str.len > 0; - const parts = try allocator.alloc([]const u8, if (has_args) 2 else 1); - parts[0] = job_name; - if (has_args) { - parts[1] = args_str; - } - break :blk parts; - } - }; - defer { - for (command) |c| allocator.free(c); - allocator.free(command); - } - - // Generate run_id - const run_id = try db.generateUUID(allocator); - errdefer allocator.free(run_id); - - // Build paths - const experiment_name = if (cfg.experiment) |exp| exp.name else "default"; - const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{ - cfg.artifact_path, - experiment_name, - run_id, - }); - defer allocator.free(artifact_path); - - // Create run directory - std.fs.makeDirAbsolute(artifact_path) catch |err| { - if (err != error.PathAlreadyExists) { - std.log.err("Failed to create run directory: {}", .{err}); - return error.MkdirFailed; - } - }; - - // Write manifest - const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ - artifact_path, - "run_manifest.json", - }); - defer allocator.free(manifest_path); - - const timestamp = try db.currentTimestamp(allocator); - defer allocator.free(timestamp); - - var manifest = Manifest.init(allocator); - manifest.run_id = run_id; - manifest.experiment = experiment_name; - manifest.command = try std.mem.join(allocator, " ", command); - manifest.args = command; - manifest.started_at = try allocator.dupe(u8, timestamp); - manifest.status = "RUNNING"; - - // Note: narrative fields stored via params HashMap - if (options.hypothesis) |h| try manifest.params.put("hypothesis", h); - if (options.context) |c| try manifest.params.put("context", c); - if (options.intent) |i| try manifest.params.put("intent", i); - if (options.expected_outcome) |eo| try manifest.params.put("expected_outcome", eo); - if (options.tags) |t| try manifest.params.put("tags", t); - - // Mark as pending sync via params - try manifest.params.put("sync_pending", "true"); - - try manifest_lib.writeManifest(manifest, manifest_path, allocator); - - // Execute the command - std.log.info("Executing locally: {s}", .{manifest.command}); - - const argv_z = try allocator.alloc(?[*:0]const u8, command.len + 1); - defer allocator.free(argv_z); - for (command, 0..) |arg, idx| { - const arg_copy = try allocator.alloc(u8, arg.len + 1); - @memcpy(arg_copy[0..arg.len], arg); - arg_copy[arg.len] = 0; - argv_z[idx] = @ptrCast(arg_copy.ptr); - } - argv_z[command.len] = null; - - const pid = fork(); - if (pid < 0) { - return error.ForkFailed; - } else if (pid == 0) { - // Child process - const result = execvp(argv_z[0].?, argv_z.ptr); - std.log.err("Failed to exec: {}", .{result}); - std.process.exit(1); - } - - // Parent: wait for child - var status: c_int = 0; - const wait_result = waitpid(pid, &status, 0); - if (wait_result < 0) { - return error.WaitFailed; - } - - // Update manifest - const end_timestamp = try db.currentTimestamp(allocator); - defer allocator.free(end_timestamp); - - var updated_manifest = try manifest_lib.readManifest(manifest_path, allocator); - defer updated_manifest.deinit(allocator); - - updated_manifest.ended_at = try allocator.dupe(u8, end_timestamp); - updated_manifest.exit_code = if (WIFEXITED(status) != 0) @intCast(WEXITSTATUS(status)) else null; - updated_manifest.status = if (WIFEXITED(status) != 0 and WEXITSTATUS(status) == 0) "COMPLETED" else "FAILED"; - - try manifest_lib.writeManifest(updated_manifest, manifest_path, allocator); - - return run_id; -} - -fn markForSync(allocator: std.mem.Allocator, run_id: []const u8) !void { - // Store in sync queue database - const db_path = try getSyncDBPath(allocator); - defer allocator.free(db_path); - - var database = try db.initOrOpenSyncDB(allocator, db_path); - defer database.close(); - - try database.markForSync(run_id); -} - -fn getSyncDBPath(allocator: std.mem.Allocator) ![]const u8 { - const home = std.posix.getenv("HOME") orelse "."; - return std.fs.path.join(allocator, &[_][]const u8{ home, ".ml", "sync.db" }); -} - -fn dryRun( - _allocator: std.mem.Allocator, - job_name: []const u8, - exec_mode: ExecMode, - options: *const ExecOptions, - args_str: []const u8, -) !void { - _ = _allocator; - std.debug.print("Dry run for job: {s}\n", .{job_name}); - std.debug.print(" Mode: {s}\n", .{@tagName(exec_mode)}); - std.debug.print(" CPU: {d}, Memory: {d}GB, GPU: {d}\n", .{ options.cpu, options.memory, options.gpu }); - if (args_str.len > 0) { - std.debug.print(" Args: {s}\n", .{args_str}); - } - if (options.hypothesis) |h| { - std.debug.print(" Hypothesis: {s}\n", .{h}); - } - std.debug.print("\n Action: Would {s}\n", .{ - switch (exec_mode) { - .local => "execute locally and mark for sync", - .remote => "queue on remote server", - }, - }); -} - -fn buildNarrativeJson(allocator: std.mem.Allocator, options: *const ExecOptions) !?[]const u8 { - if (options.hypothesis == null and options.context == null and - options.intent == null and options.expected_outcome == null) - { - return null; - } - - var buf = try std.ArrayList(u8).initCapacity(allocator, 256); - defer buf.deinit(allocator); - const writer = buf.writer(allocator); - - try writer.writeAll("{"); - var first = true; - - if (options.hypothesis) |h| { - if (!first) try writer.writeAll(","); - try writer.print("\"hypothesis\":\"{s}\"", .{h}); - first = false; - } - if (options.context) |c| { - if (!first) try writer.writeAll(","); - try writer.print("\"context\":\"{s}\"", .{c}); - first = false; - } - if (options.intent) |i| { - if (!first) try writer.writeAll(","); - try writer.print("\"intent\":\"{s}\"", .{i}); - first = false; - } - if (options.expected_outcome) |eo| { - if (!first) try writer.writeAll(","); - try writer.print("\"expected_outcome\":\"{s}\"", .{eo}); - first = false; - } - - try writer.writeAll("}"); - return try buf.toOwnedSlice(allocator); -} - -fn printUsage() !void { - std.debug.print( - \\n - \\ml exec [options] [-- ] - \\ - \\Unified execution - works locally or remotely with transparent fallback. - \\n \\Options: - \\ --priority <1-10> Job priority (default: 5) - \\ --cpu CPU cores requested (default: 1) - \\ --memory Memory GB requested (default: 4) - \\ --gpu GPU devices requested (default: 0) - \\ --gpu-memory GPU memory spec - \\n \\Execution mode: - \\ --local Force local execution - \\ --remote Force remote (fail if offline) - \\ (auto-detect if neither flag set) - \\n \\Research context: - \\ --hypothesis What you're testing - \\ --context Background information - \\ --intent What you're trying to do - \\ --expected-outcome What you expect to happen - \\ --tags Comma-separated tags - \\n \\Examples: - \\ ml exec train.py - \\ ml exec train.py -- --lr 0.001 --epochs 10 - \\ ml exec train.py --priority 8 --gpu 1 - \\ ml exec train.py --hypothesis "LR scaling helps" - \\ - , .{}); +// Re-export for backward compatibility +pub const ExecMode = exec_mod.ExecMode; +pub const ExecOptions = exec_mod.ExecOptions; + +// Main entry point - delegates to modular implementation +pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { + return exec_mod.run(allocator, args); } diff --git a/cli/src/commands/exec/dryrun.zig b/cli/src/commands/exec/dryrun.zig new file mode 100644 index 0000000..dd299a2 --- /dev/null +++ b/cli/src/commands/exec/dryrun.zig @@ -0,0 +1,67 @@ +const std = @import("std"); + +/// Show dry run preview +pub fn dryRun( + _allocator: std.mem.Allocator, + job_name: []const u8, + exec_mode: anytype, + options: anytype, + args_str: []const u8, +) !void { + _ = _allocator; + std.debug.print("Dry run for job: {s}\n", .{job_name}); + std.debug.print(" Mode: {s}\n", .{@tagName(exec_mode)}); + std.debug.print(" CPU: {d}, Memory: {d}GB, GPU: {d}\n", .{ options.cpu, options.memory, options.gpu }); + if (args_str.len > 0) { + std.debug.print(" Args: {s}\n", .{args_str}); + } + if (options.hypothesis) |h| { + std.debug.print(" Hypothesis: {s}\n", .{h}); + } + std.debug.print("\n Action: Would {s}\n", .{ + switch (exec_mode) { + .local => "execute locally and mark for sync", + .remote => "queue on remote server", + }, + }); +} + +/// Build narrative JSON from options +pub fn buildNarrativeJson(allocator: std.mem.Allocator, options: anytype) !?[]const u8 { + if (options.hypothesis == null and options.context == null and + options.intent == null and options.expected_outcome == null) + { + return null; + } + + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); + defer buf.deinit(allocator); + const writer = buf.writer(allocator); + + try writer.writeAll("{"); + var first = true; + + if (options.hypothesis) |h| { + if (!first) try writer.writeAll(","); + try writer.print("\"hypothesis\":\"{s}\"", .{h}); + first = false; + } + if (options.context) |c| { + if (!first) try writer.writeAll(","); + try writer.print("\"context\":\"{s}\"", .{c}); + first = false; + } + if (options.intent) |i| { + if (!first) try writer.writeAll(","); + try writer.print("\"intent\":\"{s}\"", .{i}); + first = false; + } + if (options.expected_outcome) |eo| { + if (!first) try writer.writeAll(","); + try writer.print("\"expected_outcome\":\"{s}\"", .{eo}); + first = false; + } + + try writer.writeAll("}"); + return try buf.toOwnedSlice(allocator); +} diff --git a/cli/src/commands/exec/local.zig b/cli/src/commands/exec/local.zig new file mode 100644 index 0000000..fcfcb8a --- /dev/null +++ b/cli/src/commands/exec/local.zig @@ -0,0 +1,175 @@ +const std = @import("std"); +const db = @import("../../db.zig"); +const manifest_lib = @import("../../manifest.zig"); +const config = @import("../../config.zig"); + +extern fn execvp(path: [*:0]const u8, argv: [*]const ?[*:0]const u8) c_int; +extern fn waitpid(pid: c_int, status: *c_int, flags: c_int) c_int; +extern fn fork() c_int; + +fn WIFEXITED(status: c_int) c_int { + return if ((status & 0x7F) == 0) 1 else 0; +} +fn WEXITSTATUS(status: c_int) c_int { + return (status >> 8) & 0xFF; +} + +const Manifest = manifest_lib.RunManifest; + +/// Execute job locally +pub fn execute( + allocator: std.mem.Allocator, + job_name: []const u8, + options: anytype, + args_str: []const u8, + cfg: config.Config, +) ![]const u8 { + // Resolve command + const command = blk: { + if (cfg.experiment) |exp| { + const ep = exp.entrypoint; + var parts = try std.ArrayList([]const u8).initCapacity(allocator, 16); + defer parts.deinit(allocator); + + var it = std.mem.splitScalar(u8, ep, ' '); + while (it.next()) |part| { + try parts.append(allocator, part); + } + + if (args_str.len > 0) { + var arg_it = std.mem.splitScalar(u8, args_str, ' '); + while (arg_it.next()) |arg| { + try parts.append(allocator, arg); + } + } + + break :blk try parts.toOwnedSlice(allocator); + } else { + // Default: python job_name + const has_args = args_str.len > 0; + const parts = try allocator.alloc([]const u8, if (has_args) 2 else 1); + parts[0] = job_name; + if (has_args) { + parts[1] = args_str; + } + break :blk parts; + } + }; + defer { + for (command) |c| allocator.free(c); + allocator.free(command); + } + + // Generate run_id + const run_id = try db.generateUUID(allocator); + errdefer allocator.free(run_id); + + // Build paths + const experiment_name = if (cfg.experiment) |exp| exp.name else "default"; + const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{ + cfg.artifact_path, + experiment_name, + run_id, + }); + defer allocator.free(artifact_path); + + // Create run directory + std.fs.makeDirAbsolute(artifact_path) catch |err| { + if (err != error.PathAlreadyExists) { + std.log.err("Failed to create run directory: {}", .{err}); + return error.MkdirFailed; + } + }; + + // Write manifest + const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ + artifact_path, + "run_manifest.json", + }); + defer allocator.free(manifest_path); + + const timestamp = try db.currentTimestamp(allocator); + defer allocator.free(timestamp); + + var manifest = Manifest.init(allocator); + manifest.run_id = run_id; + manifest.experiment = experiment_name; + manifest.command = try std.mem.join(allocator, " ", command); + manifest.args = command; + manifest.started_at = try allocator.dupe(u8, timestamp); + manifest.status = "RUNNING"; + + // Note: narrative fields stored via params HashMap + if (options.hypothesis) |h| try manifest.params.put("hypothesis", h); + if (options.context) |c| try manifest.params.put("context", c); + if (options.intent) |i| try manifest.params.put("intent", i); + if (options.expected_outcome) |eo| try manifest.params.put("expected_outcome", eo); + if (options.tags) |t| try manifest.params.put("tags", t); + + // Mark as pending sync via params + try manifest.params.put("sync_pending", "true"); + + try manifest_lib.writeManifest(manifest, manifest_path, allocator); + + // Execute the command + std.log.info("Executing locally: {s}", .{manifest.command}); + + const argv_z = try allocator.alloc(?[*:0]const u8, command.len + 1); + defer allocator.free(argv_z); + for (command, 0..) |arg, idx| { + const arg_copy = try allocator.alloc(u8, arg.len + 1); + @memcpy(arg_copy[0..arg.len], arg); + arg_copy[arg.len] = 0; + argv_z[idx] = @ptrCast(arg_copy.ptr); + } + argv_z[command.len] = null; + + const pid = fork(); + if (pid < 0) { + return error.ForkFailed; + } else if (pid == 0) { + // Child process + const result = execvp(argv_z[0].?, argv_z.ptr); + std.log.err("Failed to exec: {}", .{result}); + std.process.exit(1); + } + + // Parent: wait for child + var status: c_int = 0; + const wait_result = waitpid(pid, &status, 0); + if (wait_result < 0) { + return error.WaitFailed; + } + + // Update manifest + const end_timestamp = try db.currentTimestamp(allocator); + defer allocator.free(end_timestamp); + + var updated_manifest = try manifest_lib.readManifest(manifest_path, allocator); + defer updated_manifest.deinit(allocator); + + updated_manifest.ended_at = try allocator.dupe(u8, end_timestamp); + updated_manifest.exit_code = if (WIFEXITED(status) != 0) @intCast(WEXITSTATUS(status)) else null; + updated_manifest.status = if (WIFEXITED(status) != 0 and WEXITSTATUS(status) == 0) "COMPLETED" else "FAILED"; + + try manifest_lib.writeManifest(updated_manifest, manifest_path, allocator); + + return run_id; +} + +/// Mark run for later sync to server +pub fn markForSync(allocator: std.mem.Allocator, run_id: []const u8) !void { + // Store in sync queue database + const db_path = try getSyncDBPath(allocator); + defer allocator.free(db_path); + + var database = try db.initOrOpenSyncDB(allocator, db_path); + defer database.close(); + + try database.markForSync(run_id); +} + +fn getSyncDBPath(allocator: std.mem.Allocator) ![]const u8 { + const home = std.posix.getenv("HOME") orelse "."; + return std.fs.path.join(allocator, &[_][]const u8{ home, ".ml", "sync.db" }); +} diff --git a/cli/src/commands/exec/mod.zig b/cli/src/commands/exec/mod.zig new file mode 100644 index 0000000..dfb7930 --- /dev/null +++ b/cli/src/commands/exec/mod.zig @@ -0,0 +1,204 @@ +const std = @import("std"); +const core = @import("../../core.zig"); +const config = @import("../../config.zig"); +const mode = @import("../../mode.zig"); + +const remote = @import("remote.zig"); +const local = @import("local.zig"); +const dryrun = @import("dryrun.zig"); + +pub const ExecMode = enum { + local, + remote, +}; + +pub const ExecOptions = struct { + cpu: u8 = 1, + memory: u8 = 4, + gpu: u8 = 0, + gpu_memory: ?[]const u8 = null, + dry_run: bool = false, + hypothesis: ?[]const u8 = null, + context: ?[]const u8 = null, + intent: ?[]const u8 = null, + expected_outcome: ?[]const u8 = null, + tags: ?[]const u8 = null, +}; + +/// Main entry point for exec command +pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { + var flags = core.flags.CommonFlags{}; + var priority: u8 = 5; + var force_local = false; + var force_remote = false; + + // Find "--" separator + var sep_index: ?usize = null; + for (args, 0..) |a, idx| { + if (std.mem.eql(u8, a, "--")) { + sep_index = idx; + break; + } + } + + const pre = args[0..(sep_index orelse args.len)]; + + // Parse options + var job_name: ?[]const u8 = null; + var options = ExecOptions{}; + + var i: usize = 0; + while (i < pre.len) : (i += 1) { + const arg = pre[i]; + + if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { + try printUsage(); + return; + } else if (std.mem.eql(u8, arg, "--json")) { + flags.json = true; + } else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) { + priority = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) { + options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) { + options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) { + options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) { + options.gpu_memory = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--dry-run")) { + options.dry_run = true; + } else if (std.mem.eql(u8, arg, "--local")) { + force_local = true; + } else if (std.mem.eql(u8, arg, "--remote")) { + force_remote = true; + } else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) { + options.hypothesis = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) { + options.context = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) { + options.intent = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) { + options.expected_outcome = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) { + options.tags = pre[i + 1]; + i += 1; + } else if (!std.mem.startsWith(u8, arg, "-")) { + // First positional arg is job name + if (job_name == null) { + job_name = arg; + } + } + } + + if (job_name == null) { + try printUsage(); + return error.InvalidArgs; + } + + // Build args string from post-separator + var args_str: []const u8 = ""; + if (sep_index) |si| { + const post = args[(si + 1)..]; + if (post.len > 0) { + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); + defer buf.deinit(allocator); + for (post, 0..) |a, j| { + if (j > 0) try buf.append(allocator, ' '); + try buf.appendSlice(allocator, a); + } + args_str = try buf.toOwnedSlice(allocator); + } + } + defer if (sep_index != null and args_str.len > 0) allocator.free(args_str); + + // Load config + const cfg = try config.Config.load(allocator); + defer { + var mut = cfg; + mut.deinit(allocator); + } + + // Determine execution mode + var exec_mode: ExecMode = undefined; + + if (force_local) { + exec_mode = .local; + } else if (force_remote) { + exec_mode = .remote; + } else { + // Auto-detect + const mode_result = try mode.detect(allocator, cfg); + exec_mode = if (mode.isOnline(mode_result.mode)) .remote else .local; + + if (mode_result.warning) |warn| { + std.log.info("{s}", .{warn}); + } + } + + if (options.dry_run) { + return try dryrun.dryRun(allocator, job_name.?, exec_mode, &options, args_str); + } + + // Execute based on mode + switch (exec_mode) { + .remote => { + try remote.execute(allocator, job_name.?, priority, &options, args_str, cfg); + }, + .local => { + const run_id = try local.execute(allocator, job_name.?, &options, args_str, cfg); + // Mark for sync + try local.markForSync(allocator, run_id); + + if (!flags.json) { + std.debug.print("\nRun completed locally (run_id: {s})\n", .{run_id[0..@min(8, run_id.len)]}); + std.debug.print("Will sync to server when connection is available\n", .{}); + std.debug.print("Use 'ml sync' to sync manually\n", .{}); + } + }, + } +} + +fn printUsage() !void { + std.debug.print( + \\n + \\ml exec [options] [-- ] + \\ + \\Unified execution - works locally or remotely with transparent fallback. + \\ + \\Options: + \\ --priority <1-10> Job priority (default: 5) + \\ --cpu CPU cores requested (default: 1) + \\ --memory Memory GB requested (default: 4) + \\ --gpu GPU devices requested (default: 0) + \\ --gpu-memory GPU memory spec + \\ + \\Execution mode: + \\ --local Force local execution + \\ --remote Force remote (fail if offline) + \\ (auto-detect if neither flag set) + \\ + \\Research context: + \\ --hypothesis What you're testing + \\ --context Background information + \\ --intent What you're trying to do + \\ --expected-outcome What you expect to happen + \\ --tags Comma-separated tags + \\ + \\Examples: + \\ ml exec train.py + \\ ml exec train.py -- --lr 0.001 --epochs 10 + \\ ml exec train.py --priority 8 --gpu 1 + \\ ml exec train.py --hypothesis "LR scaling helps" + \\ + , .{}); +} diff --git a/cli/src/commands/exec/remote.zig b/cli/src/commands/exec/remote.zig new file mode 100644 index 0000000..0ac580f --- /dev/null +++ b/cli/src/commands/exec/remote.zig @@ -0,0 +1,116 @@ +const std = @import("std"); +const config = @import("../../config.zig"); +const ws = @import("../../net/ws/client.zig"); +const crypto = @import("../../utils/crypto.zig"); +const protocol = @import("../../net/protocol.zig"); +const history = @import("../../utils/history.zig"); + +/// Execute job on remote server +pub fn execute( + allocator: std.mem.Allocator, + job_name: []const u8, + priority: u8, + options: anytype, + args_str: []const u8, + cfg: config.Config, +) !void { + // Use queue command logic for remote execution + std.log.info("Queueing job on remote server: {s}", .{job_name}); + + const ws_url = try cfg.getWebSocketUrl(allocator); + defer allocator.free(ws_url); + + var client = try ws.Client.connect(allocator, ws_url, cfg.api_key); + defer client.close(); + + const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key); + defer allocator.free(api_key_hash); + + // Generate commit ID + var commit_bytes: [20]u8 = undefined; + std.crypto.random.bytes(&commit_bytes); + + // Build narrative JSON if provided + const narrative_json = buildNarrativeJson(allocator, options) catch null; + defer if (narrative_json) |j| allocator.free(j); + + // Send queue request + try client.sendQueueJobWithArgsAndResources( + job_name, + &commit_bytes, + priority, + api_key_hash, + args_str, + false, // force + options.cpu, + options.memory, + options.gpu, + options.gpu_memory, + ); + + // Receive response + const message = try client.receiveMessage(allocator); + defer allocator.free(message); + + const packet = protocol.ResponsePacket.deserialize(message, allocator) catch { + std.debug.print("Server response: {s}\n", .{message}); + return; + }; + defer packet.deinit(allocator); + + switch (packet.packet_type) { + .success => { + const commit_hex = try crypto.encodeHexLower(allocator, &commit_bytes); + defer allocator.free(commit_hex); + history.record(allocator, job_name, commit_hex) catch {}; + std.debug.print("Job queued: {s} (commit: {s})\n", .{ job_name, commit_hex[0..8] }); + }, + .error_packet => { + const err_msg = packet.error_message orelse "Unknown error"; + std.debug.print("Error: {s}\n", .{err_msg}); + return error.ServerError; + }, + else => { + std.debug.print("Job queued: {s}\n", .{job_name}); + }, + } +} + +fn buildNarrativeJson(allocator: std.mem.Allocator, options: anytype) !?[]const u8 { + if (options.hypothesis == null and options.context == null and + options.intent == null and options.expected_outcome == null) + { + return null; + } + + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); + defer buf.deinit(allocator); + const writer = buf.writer(allocator); + + try writer.writeAll("{"); + var first = true; + + if (options.hypothesis) |h| { + if (!first) try writer.writeAll(","); + try writer.print("\"hypothesis\":\"{s}\"", .{h}); + first = false; + } + if (options.context) |c| { + if (!first) try writer.writeAll(","); + try writer.print("\"context\":\"{s}\"", .{c}); + first = false; + } + if (options.intent) |i| { + if (!first) try writer.writeAll(","); + try writer.print("\"intent\":\"{s}\"", .{i}); + first = false; + } + if (options.expected_outcome) |eo| { + if (!first) try writer.writeAll(","); + try writer.print("\"expected_outcome\":\"{s}\"", .{eo}); + first = false; + } + + try writer.writeAll("}"); + return try buf.toOwnedSlice(allocator); +}