diff --git a/cli/src/commands/dataset.zig b/cli/src/commands/dataset.zig index cbe6645..e1a3e06 100644 --- a/cli/src/commands/dataset.zig +++ b/cli/src/commands/dataset.zig @@ -3,7 +3,6 @@ const Config = @import("../config.zig").Config; const ws = @import("../net/ws/client.zig"); const crypto = @import("../utils/crypto.zig"); const core = @import("../core.zig"); -const native_hash = @import("../native/hash.zig"); const DatasetOptions = struct { dry_run: bool = false, @@ -197,7 +196,10 @@ fn registerDataset(allocator: std.mem.Allocator, name: []const u8, url: []const if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"validated\":true,\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"validated\":true,\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); } else { std.debug.print("Validation OK\n", .{}); @@ -219,7 +221,10 @@ fn registerDataset(allocator: std.mem.Allocator, name: []const u8, url: []const if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"register\",\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"register\",\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); } else { std.debug.print("Dry run: would register dataset '{s}' -> {s}\n", .{ name, url }); @@ -242,7 +247,10 @@ fn registerDataset(allocator: std.mem.Allocator, name: []const u8, url: []const if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"message\":\"{s}\"}}\n", .{response}) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"message\":\"{s}\"}}\n", .{response}) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); return; } @@ -269,7 +277,10 @@ fn showDatasetInfo(allocator: std.mem.Allocator, name: []const u8, options: *con if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"info\",\"validated\":true,\"name\":\"{s}\"}}\n", .{name}) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"info\",\"validated\":true,\"name\":\"{s}\"}}\n", .{name}) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); } else { std.debug.print("Validation OK\n", .{}); @@ -283,7 +294,10 @@ fn showDatasetInfo(allocator: std.mem.Allocator, name: []const u8, options: *con if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"info\",\"name\":\"{s}\"}}\n", .{name}) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"info\",\"name\":\"{s}\"}}\n", .{name}) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); } else { std.debug.print("Dry run: would request dataset info for '{s}'\n", .{name}); @@ -333,7 +347,10 @@ fn searchDatasets(allocator: std.mem.Allocator, term: []const u8, options: *cons if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"search\",\"validated\":true,\"term\":\"{s}\"}}\n", .{term}) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"search\",\"validated\":true,\"term\":\"{s}\"}}\n", .{term}) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); } else { std.debug.print("Validation OK\n", .{}); @@ -400,23 +417,27 @@ fn verifyDataset(allocator: std.mem.Allocator, target: []const u8, options: *con total_size += stat.size; } - // Compute native SHA256 hash + // Compute SHA256 hash using pure Zig implementation const hash = blk: { - break :blk native_hash.hashDirectory(allocator, path) catch |err| { + const hash_mod = @import("../utils/hash.zig"); + break :blk hash_mod.hashDirectoryToHex(allocator, path) catch |err| { std.debug.print("Hash computation failed: {s}\n", .{@errorName(err)}); // Continue without hash - verification still succeeded break :blk null; }; }; - defer if (hash) |h| allocator.free(h); + // hash is [64]u8 array (stack allocated), not heap allocated - no need to free if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const hash_str = if (hash) |h| h else "null"; + const hash_str: []const u8 = if (hash) |h| &h else "null"; const formatted = std.fmt.bufPrint(&buffer, "{{\"path\":\"{s}\",\"files\":{d},\"size\":{d},\"hash\":\"{s}\",\"ok\":true}}\n", .{ target, file_count, total_size, hash_str, - }) catch unreachable; + }) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); } else if (options.csv) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; @@ -444,35 +465,15 @@ fn verifyDataset(allocator: std.mem.Allocator, target: []const u8, options: *con } fn hashDataset(allocator: std.mem.Allocator, path: []const u8) !void { - std.debug.print("Computing native SHA256 hash for: {s}\n", .{path}); + std.debug.print("Computing SHA256 hash for: {s}\n", .{path}); - // Check SIMD availability - if (!native_hash.hasSimdSha256()) { - std.debug.print("SIMD SHA256 not available, using generic implementation\n", .{}); - } else { - const impl_name = native_hash.getSimdImplName(); - std.debug.print("Using {s} SHA256 implementation\n", .{impl_name}); - } + const hash_mod = @import("../utils/hash.zig"); - // Compute hash using native library - const hash = native_hash.hashDirectory(allocator, path) catch |err| { - switch (err) { - error.ContextInitFailed => { - std.debug.print("Failed to initialize native hash context\n", .{}); - }, - error.HashFailed => { - std.debug.print("Hash computation failed\n", .{}); - }, - error.InvalidPath => { - std.debug.print("Invalid path: {s}\n", .{path}); - }, - error.OutOfMemory => { - std.debug.print("Out of memory\n", .{}); - }, - } + // Compute hash using pure Zig implementation + const hash = hash_mod.hashDirectoryToHex(allocator, path) catch |err| { + std.debug.print("Hash computation failed: {s}\n", .{@errorName(err)}); return err; }; - defer allocator.free(hash); // Print result std.debug.print("SHA256: {s}\n", .{hash}); diff --git a/cli/src/commands/dataset_hash.zig b/cli/src/commands/dataset_hash.zig index 9c1cb92..fa2618b 100644 --- a/cli/src/commands/dataset_hash.zig +++ b/cli/src/commands/dataset_hash.zig @@ -1,11 +1,11 @@ const std = @import("std"); const cli = @import("../../main.zig"); -const native_hash = @import("../../native/hash.zig"); +const hash_mod = @import("../../utils/hash.zig"); const ui = @import("../../ui/ui.zig"); const colors = @import("../../ui/colors.zig"); pub const name = "dataset hash"; -pub const description = "Hash a dataset directory using native SHA256 library"; +pub const description = "Hash a dataset directory using SHA256"; pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { // Parse arguments @@ -18,31 +18,29 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { const path = args[0]; - // Check if native library is available - if (!native_hash.hasSimdSha256()) { - colors.printWarning("SIMD SHA256 not available, using generic implementation\n", .{}); - } else { - const impl_name = native_hash.getSimdImplName(); - colors.printInfo("Using {s} SHA256 implementation\n", .{impl_name}); - } - // Hash the directory colors.printInfo("Hashing dataset at: {s}\n", .{path}); - - const hash = native_hash.hashDirectory(allocator, path) catch |err| { + + const hash = hash_mod.hashDirectoryToHex(allocator, path) catch |err| { switch (err) { - error.ContextInitFailed => { - colors.printError("Failed to initialize native hash context\n", .{}); + error.PathTraversalAttempt => { + colors.printError("Invalid path (path traversal detected): {s}\n", .{path}); }, - error.HashFailed => { - colors.printError("Hash computation failed\n", .{}); + error.NotAFile => { + colors.printError("Not a regular file: {s}\n", .{path}); }, - error.InvalidPath => { - colors.printError("Invalid path: {s}\n", .{path}); + error.EmptyDirectory => { + colors.printError("Directory is empty or contains no files: {s}\n", .{path}); + }, + error.MaxDepthExceeded => { + colors.printError("Max directory depth exceeded (32): {s}\n", .{path}); }, error.OutOfMemory => { colors.printError("Out of memory\n", .{}); }, + else => { + colors.printError("Hash computation failed: {s}\n", .{@errorName(err)}); + }, } return err; }; diff --git a/cli/src/commands/exec.zig b/cli/src/commands/exec.zig new file mode 100644 index 0000000..21ee6ef --- /dev/null +++ b/cli/src/commands/exec.zig @@ -0,0 +1,532 @@ +const std = @import("std"); +const db = @import("../db.zig"); +const manifest_lib = @import("../manifest.zig"); +const core = @import("../core.zig"); +const config = @import("../config.zig"); +const mode = @import("../mode.zig"); +const queue = @import("queue.zig"); +const crypto = @import("../utils/crypto.zig"); +const ws = @import("../net/ws/client.zig"); +const protocol = @import("../net/protocol.zig"); +const history = @import("../utils/history.zig"); + +extern fn execvp(path: [*:0]const u8, argv: [*]const ?[*:0]const u8) c_int; +extern fn waitpid(pid: c_int, status: *c_int, flags: c_int) c_int; +extern fn fork() c_int; +extern var environ: [*]const ?[*:0]const u8; + +fn WIFEXITED(status: c_int) c_int { + return if ((status & 0x7F) == 0) 1 else 0; +} +fn WEXITSTATUS(status: c_int) c_int { + return (status >> 8) & 0xFF; +} +fn WIFSIGNALED(status: c_int) c_int { + return if (((status & 0x7F) != 0x7F) and ((status & 0x7F) != 0)) 1 else 0; +} +fn WTERMSIG(status: c_int) c_int { + return status & 0x7F; +} + +const Manifest = manifest_lib.RunManifest; + +/// Unified exec command - works both locally and remotely +/// Transparently handles connectivity - user can't tell the difference +/// +/// Usage: +/// ml exec [options] # Execute (auto-detect local/remote) +/// ml exec -- [args] # Pass args to job +/// +/// Options: +/// --priority <1-10> Job priority (default: 5) +/// --cpu CPU cores requested +/// --memory Memory GB requested +/// --gpu GPU devices requested +/// --dry-run Show what would happen +/// --local Force local execution +/// --remote Force remote execution (fail if offline) +/// +/// Environment detection (auto): +/// FETCHML_LOCAL=1 Force local mode +/// Configured host Auto-detect via ping +pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void { + var flags = core.flags.CommonFlags{}; + var command_args = try core.flags.parseCommon(allocator, args, &flags); + defer command_args.deinit(allocator); + + core.output.setMode(if (flags.json) .json else .text); + + if (flags.help) { + return printUsage(); + } + + if (command_args.items.len == 0) { + std.debug.print("Error: No job name specified\n", .{}); + return printUsage(); + } + + const cfg = try config.Config.load(allocator); + defer { + var mut_cfg = cfg; + mut_cfg.deinit(allocator); + } + + // Parse job name and options + const job_name = command_args.items[0]; + + // Parse exec-specific flags + var force_local = false; + var force_remote = false; + var priority: u8 = 5; + var options = ExecOptions{ + .cpu = cfg.default_cpu, + .memory = cfg.default_memory, + .gpu = cfg.default_gpu, + .gpu_memory = cfg.default_gpu_memory, + }; + + // Support passing args after "--" + var sep_index: ?usize = null; + for (command_args.items, 0..) |a, idx| { + if (std.mem.eql(u8, a, "--")) { + sep_index = idx; + break; + } + } + const pre = command_args.items[0..(sep_index orelse command_args.items.len)]; + const post = if (sep_index) |si| command_args.items[(si + 1)..] else command_args.items[0..0]; + + // Parse flags before "--" + var i: usize = 1; // Skip job name + while (i < pre.len) : (i += 1) { + const arg = pre[i]; + if (std.mem.eql(u8, arg, "--local")) { + force_local = true; + } else if (std.mem.eql(u8, arg, "--remote")) { + force_remote = true; + } else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) { + priority = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) { + options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) { + options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) { + options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10); + i += 1; + } else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) { + options.gpu_memory = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--dry-run")) { + options.dry_run = true; + } else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) { + options.hypothesis = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) { + options.context = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) { + options.intent = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) { + options.expected_outcome = pre[i + 1]; + i += 1; + } else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) { + options.tags = pre[i + 1]; + i += 1; + } + } + + // Join post args if any + var args_str: []const u8 = ""; + if (post.len > 0) { + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); + defer buf.deinit(allocator); + for (post, 0..) |a, j| { + if (j > 0) try buf.append(allocator, ' '); + try buf.appendSlice(allocator, a); + } + args_str = try buf.toOwnedSlice(allocator); + } + defer if (post.len > 0) allocator.free(args_str); + + // Determine execution mode + var exec_mode: ExecMode = undefined; + + if (force_local) { + exec_mode = .local; + } else if (force_remote) { + exec_mode = .remote; + } else { + // Auto-detect + const mode_result = try mode.detect(allocator, cfg); + exec_mode = if (mode.isOnline(mode_result.mode)) .remote else .local; + + if (mode_result.warning) |warn| { + std.log.info("{s}", .{warn}); + } + } + + if (options.dry_run) { + return try dryRun(allocator, job_name, exec_mode, &options, args_str); + } + + // Execute based on mode + switch (exec_mode) { + .remote => { + try executeRemote(allocator, job_name, priority, &options, args_str, cfg); + }, + .local => { + const run_id = try executeLocal(allocator, job_name, &options, args_str, cfg); + // Mark for sync + try markForSync(allocator, run_id); + + if (!flags.json) { + std.debug.print("\nRun completed locally (run_id: {s})\n", .{run_id[0..@min(8, run_id.len)]}); + std.debug.print("Will sync to server when connection is available\n", .{}); + std.debug.print("Use 'ml sync' to sync manually\n", .{}); + } + }, + } +} + +const ExecMode = enum { + local, + remote, +}; + +const ExecOptions = struct { + cpu: u8 = 1, + memory: u8 = 4, + gpu: u8 = 0, + gpu_memory: ?[]const u8 = null, + dry_run: bool = false, + hypothesis: ?[]const u8 = null, + context: ?[]const u8 = null, + intent: ?[]const u8 = null, + expected_outcome: ?[]const u8 = null, + tags: ?[]const u8 = null, +}; + +fn executeRemote( + allocator: std.mem.Allocator, + job_name: []const u8, + priority: u8, + options: *const ExecOptions, + args_str: []const u8, + cfg: config.Config, +) !void { + // Use queue command logic for remote execution + std.log.info("Queueing job on remote server: {s}", .{job_name}); + + const ws_url = try cfg.getWebSocketUrl(allocator); + defer allocator.free(ws_url); + + var client = try ws.Client.connect(allocator, ws_url, cfg.api_key); + defer client.close(); + + const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key); + defer allocator.free(api_key_hash); + + // Generate commit ID + var commit_bytes: [20]u8 = undefined; + std.crypto.random.bytes(&commit_bytes); + + // Build narrative JSON if provided + const narrative_json = buildNarrativeJson(allocator, options) catch null; + defer if (narrative_json) |j| allocator.free(j); + + // Send queue request + try client.sendQueueJobWithArgsAndResources( + job_name, + &commit_bytes, + priority, + api_key_hash, + args_str, + false, // force + options.cpu, + options.memory, + options.gpu, + options.gpu_memory, + ); + + // Receive response + const message = try client.receiveMessage(allocator); + defer allocator.free(message); + + const packet = protocol.ResponsePacket.deserialize(message, allocator) catch { + std.debug.print("Server response: {s}\n", .{message}); + return; + }; + defer packet.deinit(allocator); + + switch (packet.packet_type) { + .success => { + const commit_hex = try crypto.encodeHexLower(allocator, &commit_bytes); + defer allocator.free(commit_hex); + history.record(allocator, job_name, commit_hex) catch {}; + std.debug.print("Job queued: {s} (commit: {s})\n", .{ job_name, commit_hex[0..8] }); + }, + .error_packet => { + const err_msg = packet.error_message orelse "Unknown error"; + std.debug.print("Error: {s}\n", .{err_msg}); + return error.ServerError; + }, + else => { + std.debug.print("Job queued: {s}\n", .{job_name}); + }, + } +} + +fn executeLocal( + allocator: std.mem.Allocator, + job_name: []const u8, + options: *const ExecOptions, + args_str: []const u8, + cfg: config.Config, +) ![]const u8 { + // Resolve command + const command = blk: { + if (cfg.experiment) |exp| { + const ep = exp.entrypoint; + var parts = try std.ArrayList([]const u8).initCapacity(allocator, 16); + defer parts.deinit(allocator); + + var it = std.mem.splitScalar(u8, ep, ' '); + while (it.next()) |part| { + try parts.append(allocator, part); + } + + if (args_str.len > 0) { + var arg_it = std.mem.splitScalar(u8, args_str, ' '); + while (arg_it.next()) |arg| { + try parts.append(allocator, arg); + } + } + + break :blk try parts.toOwnedSlice(allocator); + } else { + // Default: python job_name + const has_args = args_str.len > 0; + const parts = try allocator.alloc([]const u8, if (has_args) 2 else 1); + parts[0] = job_name; + if (has_args) { + parts[1] = args_str; + } + break :blk parts; + } + }; + defer { + for (command) |c| allocator.free(c); + allocator.free(command); + } + + // Generate run_id + const run_id = try db.generateUUID(allocator); + errdefer allocator.free(run_id); + + // Build paths + const experiment_name = if (cfg.experiment) |exp| exp.name else "default"; + const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{ + cfg.artifact_path, + experiment_name, + run_id, + }); + defer allocator.free(artifact_path); + + // Create run directory + std.fs.makeDirAbsolute(artifact_path) catch |err| { + if (err != error.PathAlreadyExists) { + std.log.err("Failed to create run directory: {}", .{err}); + return error.MkdirFailed; + } + }; + + // Write manifest + const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ + artifact_path, + "run_manifest.json", + }); + defer allocator.free(manifest_path); + + const timestamp = try db.currentTimestamp(allocator); + defer allocator.free(timestamp); + + var manifest = Manifest.init(allocator); + manifest.run_id = run_id; + manifest.experiment = experiment_name; + manifest.command = try std.mem.join(allocator, " ", command); + manifest.args = command; + manifest.started_at = try allocator.dupe(u8, timestamp); + manifest.status = "RUNNING"; + + // Note: narrative fields stored via params HashMap + if (options.hypothesis) |h| try manifest.params.put("hypothesis", h); + if (options.context) |c| try manifest.params.put("context", c); + if (options.intent) |i| try manifest.params.put("intent", i); + if (options.expected_outcome) |eo| try manifest.params.put("expected_outcome", eo); + if (options.tags) |t| try manifest.params.put("tags", t); + + // Mark as pending sync via params + try manifest.params.put("sync_pending", "true"); + + try manifest_lib.writeManifest(manifest, manifest_path, allocator); + + // Execute the command + std.log.info("Executing locally: {s}", .{manifest.command}); + + const argv_z = try allocator.alloc(?[*:0]const u8, command.len + 1); + defer allocator.free(argv_z); + for (command, 0..) |arg, idx| { + const arg_copy = try allocator.alloc(u8, arg.len + 1); + @memcpy(arg_copy[0..arg.len], arg); + arg_copy[arg.len] = 0; + argv_z[idx] = @ptrCast(arg_copy.ptr); + } + argv_z[command.len] = null; + + const pid = fork(); + if (pid < 0) { + return error.ForkFailed; + } else if (pid == 0) { + // Child process + const result = execvp(argv_z[0].?, argv_z.ptr); + std.log.err("Failed to exec: {}", .{result}); + std.process.exit(1); + } + + // Parent: wait for child + var status: c_int = 0; + const wait_result = waitpid(pid, &status, 0); + if (wait_result < 0) { + return error.WaitFailed; + } + + // Update manifest + const end_timestamp = try db.currentTimestamp(allocator); + defer allocator.free(end_timestamp); + + var updated_manifest = try manifest_lib.readManifest(manifest_path, allocator); + defer updated_manifest.deinit(allocator); + + updated_manifest.ended_at = try allocator.dupe(u8, end_timestamp); + updated_manifest.exit_code = if (WIFEXITED(status) != 0) @intCast(WEXITSTATUS(status)) else null; + updated_manifest.status = if (WIFEXITED(status) != 0 and WEXITSTATUS(status) == 0) "COMPLETED" else "FAILED"; + + try manifest_lib.writeManifest(updated_manifest, manifest_path, allocator); + + return run_id; +} + +fn markForSync(allocator: std.mem.Allocator, run_id: []const u8) !void { + // Store in sync queue database + const db_path = try getSyncDBPath(allocator); + defer allocator.free(db_path); + + var database = try db.initOrOpenSyncDB(allocator, db_path); + defer database.close(); + + try database.markForSync(run_id); +} + +fn getSyncDBPath(allocator: std.mem.Allocator) ![]const u8 { + const home = std.posix.getenv("HOME") orelse "."; + return std.fs.path.join(allocator, &[_][]const u8{ home, ".ml", "sync.db" }); +} + +fn dryRun( + _allocator: std.mem.Allocator, + job_name: []const u8, + exec_mode: ExecMode, + options: *const ExecOptions, + args_str: []const u8, +) !void { + _ = _allocator; + std.debug.print("Dry run for job: {s}\n", .{job_name}); + std.debug.print(" Mode: {s}\n", .{@tagName(exec_mode)}); + std.debug.print(" CPU: {d}, Memory: {d}GB, GPU: {d}\n", .{ options.cpu, options.memory, options.gpu }); + if (args_str.len > 0) { + std.debug.print(" Args: {s}\n", .{args_str}); + } + if (options.hypothesis) |h| { + std.debug.print(" Hypothesis: {s}\n", .{h}); + } + std.debug.print("\n Action: Would {s}\n", .{ + switch (exec_mode) { + .local => "execute locally and mark for sync", + .remote => "queue on remote server", + }, + }); +} + +fn buildNarrativeJson(allocator: std.mem.Allocator, options: *const ExecOptions) !?[]const u8 { + if (options.hypothesis == null and options.context == null and + options.intent == null and options.expected_outcome == null) + { + return null; + } + + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); + defer buf.deinit(allocator); + const writer = buf.writer(allocator); + + try writer.writeAll("{"); + var first = true; + + if (options.hypothesis) |h| { + if (!first) try writer.writeAll(","); + try writer.print("\"hypothesis\":\"{s}\"", .{h}); + first = false; + } + if (options.context) |c| { + if (!first) try writer.writeAll(","); + try writer.print("\"context\":\"{s}\"", .{c}); + first = false; + } + if (options.intent) |i| { + if (!first) try writer.writeAll(","); + try writer.print("\"intent\":\"{s}\"", .{i}); + first = false; + } + if (options.expected_outcome) |eo| { + if (!first) try writer.writeAll(","); + try writer.print("\"expected_outcome\":\"{s}\"", .{eo}); + first = false; + } + + try writer.writeAll("}"); + return try buf.toOwnedSlice(allocator); +} + +fn printUsage() !void { + std.debug.print( + \\n + \\ml exec [options] [-- ] + \\ + \\Unified execution - works locally or remotely with transparent fallback. + \\n \\Options: + \\ --priority <1-10> Job priority (default: 5) + \\ --cpu CPU cores requested (default: 1) + \\ --memory Memory GB requested (default: 4) + \\ --gpu GPU devices requested (default: 0) + \\ --gpu-memory GPU memory spec + \\n \\Execution mode: + \\ --local Force local execution + \\ --remote Force remote (fail if offline) + \\ (auto-detect if neither flag set) + \\n \\Research context: + \\ --hypothesis What you're testing + \\ --context Background information + \\ --intent What you're trying to do + \\ --expected-outcome What you expect to happen + \\ --tags Comma-separated tags + \\n \\Examples: + \\ ml exec train.py + \\ ml exec train.py -- --lr 0.001 --epochs 10 + \\ ml exec train.py --priority 8 --gpu 1 + \\ ml exec train.py --hypothesis "LR scaling helps" + \\ + , .{}); +} diff --git a/cli/src/commands/queue.zig b/cli/src/commands/queue.zig index a007947..eb9cd97 100644 --- a/cli/src/commands/queue.zig +++ b/cli/src/commands/queue.zig @@ -173,7 +173,7 @@ fn executeQueue(allocator: std.mem.Allocator, args: []const []const u8, config: .dry_run = config.default_dry_run, .validate = config.default_validate, .json = config.default_json, - .secrets = std.ArrayList([]const u8).empty, + .secrets = try std.ArrayList([]const u8).initCapacity(allocator, 4), }; defer options.secrets.deinit(allocator); priority = config.default_priority; @@ -544,7 +544,7 @@ fn queueSingleJob( const combined_json = blk: { if (tracking_json.len > 0 and narrative_json != null) { // Merge tracking and narrative - var buf = std.ArrayList(u8).empty; + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); defer buf.deinit(allocator); const writer = buf.writer(allocator); try writer.writeAll("{"); @@ -557,7 +557,7 @@ fn queueSingleJob( } else if (tracking_json.len > 0) { break :blk try allocator.dupe(u8, tracking_json); } else if (narrative_json) |nj| { - var buf = std.ArrayList(u8).empty; + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); defer buf.deinit(allocator); const writer = buf.writer(allocator); try writer.writeAll("{\"narrative\":"); @@ -744,7 +744,7 @@ fn printUsage() !void { } pub fn formatNextSteps(allocator: std.mem.Allocator, job_name: []const u8, commit_hex: []const u8) ![]u8 { - var out = std.ArrayList(u8){}; + var out = try std.ArrayList(u8).initCapacity(allocator, 128); errdefer out.deinit(allocator); const writer = out.writer(allocator); @@ -779,7 +779,10 @@ fn explainJob( if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"explain\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"explain\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); try writeJSONNullableString(&stdout_file, options.gpu_memory); if (narrative_json) |nj| { @@ -904,7 +907,10 @@ fn dryRunJob( if (options.json) { const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO }; var buffer: [4096]u8 = undefined; - const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"dry_run\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch unreachable; + const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"dry_run\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch |err| { + std.log.err("Failed to format output: {}", .{err}); + return error.FormatError; + }; try stdout_file.writeAll(formatted); try writeJSONNullableString(&stdout_file, options.gpu_memory); if (narrative_json) |nj| { @@ -1178,7 +1184,7 @@ fn buildNarrativeJson(allocator: std.mem.Allocator, options: *const QueueOptions return null; } - var buf = std.ArrayList(u8).empty; + var buf = try std.ArrayList(u8).initCapacity(allocator, 256); defer buf.deinit(allocator); const writer = buf.writer(allocator); diff --git a/cli/src/commands/run.zig b/cli/src/commands/run.zig index 59edd0e..7638282 100644 --- a/cli/src/commands/run.zig +++ b/cli/src/commands/run.zig @@ -195,7 +195,7 @@ fn resolveCommand(allocator: std.mem.Allocator, cfg: *const config.Config, args: if (cfg.experiment) |exp| { if (exp.entrypoint.len > 0) { // Split entrypoint on spaces - var argv: std.ArrayList([]const u8) = .empty; + var argv = try std.ArrayList([]const u8).initCapacity(allocator, 8); // Parse entrypoint (split on spaces) var iter = std.mem.splitScalar(u8, exp.entrypoint, ' ');