feat(cli): consolidate and improve command implementations
Update command structure with improved implementations: - exec.zig: consolidated command execution - queue.zig: improved job queuing with narrative support - run.zig: enhanced local run execution - dataset.zig, dataset_hash.zig: improved dataset management Part of CLI hardening for better UX and reliability.
This commit is contained in:
parent
f1965b99bd
commit
4c2af17ad6
5 changed files with 600 additions and 63 deletions
|
|
@ -3,7 +3,6 @@ const Config = @import("../config.zig").Config;
|
|||
const ws = @import("../net/ws/client.zig");
|
||||
const crypto = @import("../utils/crypto.zig");
|
||||
const core = @import("../core.zig");
|
||||
const native_hash = @import("../native/hash.zig");
|
||||
|
||||
const DatasetOptions = struct {
|
||||
dry_run: bool = false,
|
||||
|
|
@ -197,7 +196,10 @@ fn registerDataset(allocator: std.mem.Allocator, name: []const u8, url: []const
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"validated\":true,\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"validated\":true,\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
} else {
|
||||
std.debug.print("Validation OK\n", .{});
|
||||
|
|
@ -219,7 +221,10 @@ fn registerDataset(allocator: std.mem.Allocator, name: []const u8, url: []const
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"register\",\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"register\",\"name\":\"{s}\",\"url\":\"{s}\"}}\n", .{ name, url }) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
} else {
|
||||
std.debug.print("Dry run: would register dataset '{s}' -> {s}\n", .{ name, url });
|
||||
|
|
@ -242,7 +247,10 @@ fn registerDataset(allocator: std.mem.Allocator, name: []const u8, url: []const
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"message\":\"{s}\"}}\n", .{response}) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"register\",\"message\":\"{s}\"}}\n", .{response}) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
return;
|
||||
}
|
||||
|
|
@ -269,7 +277,10 @@ fn showDatasetInfo(allocator: std.mem.Allocator, name: []const u8, options: *con
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"info\",\"validated\":true,\"name\":\"{s}\"}}\n", .{name}) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"info\",\"validated\":true,\"name\":\"{s}\"}}\n", .{name}) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
} else {
|
||||
std.debug.print("Validation OK\n", .{});
|
||||
|
|
@ -283,7 +294,10 @@ fn showDatasetInfo(allocator: std.mem.Allocator, name: []const u8, options: *con
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"info\",\"name\":\"{s}\"}}\n", .{name}) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"dry_run\":true,\"action\":\"info\",\"name\":\"{s}\"}}\n", .{name}) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
} else {
|
||||
std.debug.print("Dry run: would request dataset info for '{s}'\n", .{name});
|
||||
|
|
@ -333,7 +347,10 @@ fn searchDatasets(allocator: std.mem.Allocator, term: []const u8, options: *cons
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"search\",\"validated\":true,\"term\":\"{s}\"}}\n", .{term}) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"ok\":true,\"action\":\"search\",\"validated\":true,\"term\":\"{s}\"}}\n", .{term}) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
} else {
|
||||
std.debug.print("Validation OK\n", .{});
|
||||
|
|
@ -400,23 +417,27 @@ fn verifyDataset(allocator: std.mem.Allocator, target: []const u8, options: *con
|
|||
total_size += stat.size;
|
||||
}
|
||||
|
||||
// Compute native SHA256 hash
|
||||
// Compute SHA256 hash using pure Zig implementation
|
||||
const hash = blk: {
|
||||
break :blk native_hash.hashDirectory(allocator, path) catch |err| {
|
||||
const hash_mod = @import("../utils/hash.zig");
|
||||
break :blk hash_mod.hashDirectoryToHex(allocator, path) catch |err| {
|
||||
std.debug.print("Hash computation failed: {s}\n", .{@errorName(err)});
|
||||
// Continue without hash - verification still succeeded
|
||||
break :blk null;
|
||||
};
|
||||
};
|
||||
defer if (hash) |h| allocator.free(h);
|
||||
// hash is [64]u8 array (stack allocated), not heap allocated - no need to free
|
||||
|
||||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const hash_str = if (hash) |h| h else "null";
|
||||
const hash_str: []const u8 = if (hash) |h| &h else "null";
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"path\":\"{s}\",\"files\":{d},\"size\":{d},\"hash\":\"{s}\",\"ok\":true}}\n", .{
|
||||
target, file_count, total_size, hash_str,
|
||||
}) catch unreachable;
|
||||
}) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
} else if (options.csv) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
|
|
@ -444,35 +465,15 @@ fn verifyDataset(allocator: std.mem.Allocator, target: []const u8, options: *con
|
|||
}
|
||||
|
||||
fn hashDataset(allocator: std.mem.Allocator, path: []const u8) !void {
|
||||
std.debug.print("Computing native SHA256 hash for: {s}\n", .{path});
|
||||
std.debug.print("Computing SHA256 hash for: {s}\n", .{path});
|
||||
|
||||
// Check SIMD availability
|
||||
if (!native_hash.hasSimdSha256()) {
|
||||
std.debug.print("SIMD SHA256 not available, using generic implementation\n", .{});
|
||||
} else {
|
||||
const impl_name = native_hash.getSimdImplName();
|
||||
std.debug.print("Using {s} SHA256 implementation\n", .{impl_name});
|
||||
}
|
||||
const hash_mod = @import("../utils/hash.zig");
|
||||
|
||||
// Compute hash using native library
|
||||
const hash = native_hash.hashDirectory(allocator, path) catch |err| {
|
||||
switch (err) {
|
||||
error.ContextInitFailed => {
|
||||
std.debug.print("Failed to initialize native hash context\n", .{});
|
||||
},
|
||||
error.HashFailed => {
|
||||
std.debug.print("Hash computation failed\n", .{});
|
||||
},
|
||||
error.InvalidPath => {
|
||||
std.debug.print("Invalid path: {s}\n", .{path});
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
std.debug.print("Out of memory\n", .{});
|
||||
},
|
||||
}
|
||||
// Compute hash using pure Zig implementation
|
||||
const hash = hash_mod.hashDirectoryToHex(allocator, path) catch |err| {
|
||||
std.debug.print("Hash computation failed: {s}\n", .{@errorName(err)});
|
||||
return err;
|
||||
};
|
||||
defer allocator.free(hash);
|
||||
|
||||
// Print result
|
||||
std.debug.print("SHA256: {s}\n", .{hash});
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
const std = @import("std");
|
||||
const cli = @import("../../main.zig");
|
||||
const native_hash = @import("../../native/hash.zig");
|
||||
const hash_mod = @import("../../utils/hash.zig");
|
||||
const ui = @import("../../ui/ui.zig");
|
||||
const colors = @import("../../ui/colors.zig");
|
||||
|
||||
pub const name = "dataset hash";
|
||||
pub const description = "Hash a dataset directory using native SHA256 library";
|
||||
pub const description = "Hash a dataset directory using SHA256";
|
||||
|
||||
pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
|
||||
// Parse arguments
|
||||
|
|
@ -18,31 +18,29 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
|
|||
|
||||
const path = args[0];
|
||||
|
||||
// Check if native library is available
|
||||
if (!native_hash.hasSimdSha256()) {
|
||||
colors.printWarning("SIMD SHA256 not available, using generic implementation\n", .{});
|
||||
} else {
|
||||
const impl_name = native_hash.getSimdImplName();
|
||||
colors.printInfo("Using {s} SHA256 implementation\n", .{impl_name});
|
||||
}
|
||||
|
||||
// Hash the directory
|
||||
colors.printInfo("Hashing dataset at: {s}\n", .{path});
|
||||
|
||||
const hash = native_hash.hashDirectory(allocator, path) catch |err| {
|
||||
|
||||
const hash = hash_mod.hashDirectoryToHex(allocator, path) catch |err| {
|
||||
switch (err) {
|
||||
error.ContextInitFailed => {
|
||||
colors.printError("Failed to initialize native hash context\n", .{});
|
||||
error.PathTraversalAttempt => {
|
||||
colors.printError("Invalid path (path traversal detected): {s}\n", .{path});
|
||||
},
|
||||
error.HashFailed => {
|
||||
colors.printError("Hash computation failed\n", .{});
|
||||
error.NotAFile => {
|
||||
colors.printError("Not a regular file: {s}\n", .{path});
|
||||
},
|
||||
error.InvalidPath => {
|
||||
colors.printError("Invalid path: {s}\n", .{path});
|
||||
error.EmptyDirectory => {
|
||||
colors.printError("Directory is empty or contains no files: {s}\n", .{path});
|
||||
},
|
||||
error.MaxDepthExceeded => {
|
||||
colors.printError("Max directory depth exceeded (32): {s}\n", .{path});
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
colors.printError("Out of memory\n", .{});
|
||||
},
|
||||
else => {
|
||||
colors.printError("Hash computation failed: {s}\n", .{@errorName(err)});
|
||||
},
|
||||
}
|
||||
return err;
|
||||
};
|
||||
|
|
|
|||
532
cli/src/commands/exec.zig
Normal file
532
cli/src/commands/exec.zig
Normal file
|
|
@ -0,0 +1,532 @@
|
|||
const std = @import("std");
|
||||
const db = @import("../db.zig");
|
||||
const manifest_lib = @import("../manifest.zig");
|
||||
const core = @import("../core.zig");
|
||||
const config = @import("../config.zig");
|
||||
const mode = @import("../mode.zig");
|
||||
const queue = @import("queue.zig");
|
||||
const crypto = @import("../utils/crypto.zig");
|
||||
const ws = @import("../net/ws/client.zig");
|
||||
const protocol = @import("../net/protocol.zig");
|
||||
const history = @import("../utils/history.zig");
|
||||
|
||||
extern fn execvp(path: [*:0]const u8, argv: [*]const ?[*:0]const u8) c_int;
|
||||
extern fn waitpid(pid: c_int, status: *c_int, flags: c_int) c_int;
|
||||
extern fn fork() c_int;
|
||||
extern var environ: [*]const ?[*:0]const u8;
|
||||
|
||||
fn WIFEXITED(status: c_int) c_int {
|
||||
return if ((status & 0x7F) == 0) 1 else 0;
|
||||
}
|
||||
fn WEXITSTATUS(status: c_int) c_int {
|
||||
return (status >> 8) & 0xFF;
|
||||
}
|
||||
fn WIFSIGNALED(status: c_int) c_int {
|
||||
return if (((status & 0x7F) != 0x7F) and ((status & 0x7F) != 0)) 1 else 0;
|
||||
}
|
||||
fn WTERMSIG(status: c_int) c_int {
|
||||
return status & 0x7F;
|
||||
}
|
||||
|
||||
const Manifest = manifest_lib.RunManifest;
|
||||
|
||||
/// Unified exec command - works both locally and remotely
|
||||
/// Transparently handles connectivity - user can't tell the difference
|
||||
///
|
||||
/// Usage:
|
||||
/// ml exec <job_name> [options] # Execute (auto-detect local/remote)
|
||||
/// ml exec <job_name> -- [args] # Pass args to job
|
||||
///
|
||||
/// Options:
|
||||
/// --priority <1-10> Job priority (default: 5)
|
||||
/// --cpu <n> CPU cores requested
|
||||
/// --memory <n> Memory GB requested
|
||||
/// --gpu <n> GPU devices requested
|
||||
/// --dry-run Show what would happen
|
||||
/// --local Force local execution
|
||||
/// --remote Force remote execution (fail if offline)
|
||||
///
|
||||
/// Environment detection (auto):
|
||||
/// FETCHML_LOCAL=1 Force local mode
|
||||
/// Configured host Auto-detect via ping
|
||||
pub fn execute(allocator: std.mem.Allocator, args: []const []const u8) !void {
|
||||
var flags = core.flags.CommonFlags{};
|
||||
var command_args = try core.flags.parseCommon(allocator, args, &flags);
|
||||
defer command_args.deinit(allocator);
|
||||
|
||||
core.output.setMode(if (flags.json) .json else .text);
|
||||
|
||||
if (flags.help) {
|
||||
return printUsage();
|
||||
}
|
||||
|
||||
if (command_args.items.len == 0) {
|
||||
std.debug.print("Error: No job name specified\n", .{});
|
||||
return printUsage();
|
||||
}
|
||||
|
||||
const cfg = try config.Config.load(allocator);
|
||||
defer {
|
||||
var mut_cfg = cfg;
|
||||
mut_cfg.deinit(allocator);
|
||||
}
|
||||
|
||||
// Parse job name and options
|
||||
const job_name = command_args.items[0];
|
||||
|
||||
// Parse exec-specific flags
|
||||
var force_local = false;
|
||||
var force_remote = false;
|
||||
var priority: u8 = 5;
|
||||
var options = ExecOptions{
|
||||
.cpu = cfg.default_cpu,
|
||||
.memory = cfg.default_memory,
|
||||
.gpu = cfg.default_gpu,
|
||||
.gpu_memory = cfg.default_gpu_memory,
|
||||
};
|
||||
|
||||
// Support passing args after "--"
|
||||
var sep_index: ?usize = null;
|
||||
for (command_args.items, 0..) |a, idx| {
|
||||
if (std.mem.eql(u8, a, "--")) {
|
||||
sep_index = idx;
|
||||
break;
|
||||
}
|
||||
}
|
||||
const pre = command_args.items[0..(sep_index orelse command_args.items.len)];
|
||||
const post = if (sep_index) |si| command_args.items[(si + 1)..] else command_args.items[0..0];
|
||||
|
||||
// Parse flags before "--"
|
||||
var i: usize = 1; // Skip job name
|
||||
while (i < pre.len) : (i += 1) {
|
||||
const arg = pre[i];
|
||||
if (std.mem.eql(u8, arg, "--local")) {
|
||||
force_local = true;
|
||||
} else if (std.mem.eql(u8, arg, "--remote")) {
|
||||
force_remote = true;
|
||||
} else if (std.mem.eql(u8, arg, "--priority") and i + 1 < pre.len) {
|
||||
priority = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--cpu") and i + 1 < pre.len) {
|
||||
options.cpu = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--memory") and i + 1 < pre.len) {
|
||||
options.memory = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--gpu") and i + 1 < pre.len) {
|
||||
options.gpu = try std.fmt.parseInt(u8, pre[i + 1], 10);
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--gpu-memory") and i + 1 < pre.len) {
|
||||
options.gpu_memory = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--dry-run")) {
|
||||
options.dry_run = true;
|
||||
} else if (std.mem.eql(u8, arg, "--hypothesis") and i + 1 < pre.len) {
|
||||
options.hypothesis = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--context") and i + 1 < pre.len) {
|
||||
options.context = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--intent") and i + 1 < pre.len) {
|
||||
options.intent = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--expected-outcome") and i + 1 < pre.len) {
|
||||
options.expected_outcome = pre[i + 1];
|
||||
i += 1;
|
||||
} else if (std.mem.eql(u8, arg, "--tags") and i + 1 < pre.len) {
|
||||
options.tags = pre[i + 1];
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Join post args if any
|
||||
var args_str: []const u8 = "";
|
||||
if (post.len > 0) {
|
||||
var buf = try std.ArrayList(u8).initCapacity(allocator, 256);
|
||||
defer buf.deinit(allocator);
|
||||
for (post, 0..) |a, j| {
|
||||
if (j > 0) try buf.append(allocator, ' ');
|
||||
try buf.appendSlice(allocator, a);
|
||||
}
|
||||
args_str = try buf.toOwnedSlice(allocator);
|
||||
}
|
||||
defer if (post.len > 0) allocator.free(args_str);
|
||||
|
||||
// Determine execution mode
|
||||
var exec_mode: ExecMode = undefined;
|
||||
|
||||
if (force_local) {
|
||||
exec_mode = .local;
|
||||
} else if (force_remote) {
|
||||
exec_mode = .remote;
|
||||
} else {
|
||||
// Auto-detect
|
||||
const mode_result = try mode.detect(allocator, cfg);
|
||||
exec_mode = if (mode.isOnline(mode_result.mode)) .remote else .local;
|
||||
|
||||
if (mode_result.warning) |warn| {
|
||||
std.log.info("{s}", .{warn});
|
||||
}
|
||||
}
|
||||
|
||||
if (options.dry_run) {
|
||||
return try dryRun(allocator, job_name, exec_mode, &options, args_str);
|
||||
}
|
||||
|
||||
// Execute based on mode
|
||||
switch (exec_mode) {
|
||||
.remote => {
|
||||
try executeRemote(allocator, job_name, priority, &options, args_str, cfg);
|
||||
},
|
||||
.local => {
|
||||
const run_id = try executeLocal(allocator, job_name, &options, args_str, cfg);
|
||||
// Mark for sync
|
||||
try markForSync(allocator, run_id);
|
||||
|
||||
if (!flags.json) {
|
||||
std.debug.print("\nRun completed locally (run_id: {s})\n", .{run_id[0..@min(8, run_id.len)]});
|
||||
std.debug.print("Will sync to server when connection is available\n", .{});
|
||||
std.debug.print("Use 'ml sync' to sync manually\n", .{});
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const ExecMode = enum {
|
||||
local,
|
||||
remote,
|
||||
};
|
||||
|
||||
const ExecOptions = struct {
|
||||
cpu: u8 = 1,
|
||||
memory: u8 = 4,
|
||||
gpu: u8 = 0,
|
||||
gpu_memory: ?[]const u8 = null,
|
||||
dry_run: bool = false,
|
||||
hypothesis: ?[]const u8 = null,
|
||||
context: ?[]const u8 = null,
|
||||
intent: ?[]const u8 = null,
|
||||
expected_outcome: ?[]const u8 = null,
|
||||
tags: ?[]const u8 = null,
|
||||
};
|
||||
|
||||
fn executeRemote(
|
||||
allocator: std.mem.Allocator,
|
||||
job_name: []const u8,
|
||||
priority: u8,
|
||||
options: *const ExecOptions,
|
||||
args_str: []const u8,
|
||||
cfg: config.Config,
|
||||
) !void {
|
||||
// Use queue command logic for remote execution
|
||||
std.log.info("Queueing job on remote server: {s}", .{job_name});
|
||||
|
||||
const ws_url = try cfg.getWebSocketUrl(allocator);
|
||||
defer allocator.free(ws_url);
|
||||
|
||||
var client = try ws.Client.connect(allocator, ws_url, cfg.api_key);
|
||||
defer client.close();
|
||||
|
||||
const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key);
|
||||
defer allocator.free(api_key_hash);
|
||||
|
||||
// Generate commit ID
|
||||
var commit_bytes: [20]u8 = undefined;
|
||||
std.crypto.random.bytes(&commit_bytes);
|
||||
|
||||
// Build narrative JSON if provided
|
||||
const narrative_json = buildNarrativeJson(allocator, options) catch null;
|
||||
defer if (narrative_json) |j| allocator.free(j);
|
||||
|
||||
// Send queue request
|
||||
try client.sendQueueJobWithArgsAndResources(
|
||||
job_name,
|
||||
&commit_bytes,
|
||||
priority,
|
||||
api_key_hash,
|
||||
args_str,
|
||||
false, // force
|
||||
options.cpu,
|
||||
options.memory,
|
||||
options.gpu,
|
||||
options.gpu_memory,
|
||||
);
|
||||
|
||||
// Receive response
|
||||
const message = try client.receiveMessage(allocator);
|
||||
defer allocator.free(message);
|
||||
|
||||
const packet = protocol.ResponsePacket.deserialize(message, allocator) catch {
|
||||
std.debug.print("Server response: {s}\n", .{message});
|
||||
return;
|
||||
};
|
||||
defer packet.deinit(allocator);
|
||||
|
||||
switch (packet.packet_type) {
|
||||
.success => {
|
||||
const commit_hex = try crypto.encodeHexLower(allocator, &commit_bytes);
|
||||
defer allocator.free(commit_hex);
|
||||
history.record(allocator, job_name, commit_hex) catch {};
|
||||
std.debug.print("Job queued: {s} (commit: {s})\n", .{ job_name, commit_hex[0..8] });
|
||||
},
|
||||
.error_packet => {
|
||||
const err_msg = packet.error_message orelse "Unknown error";
|
||||
std.debug.print("Error: {s}\n", .{err_msg});
|
||||
return error.ServerError;
|
||||
},
|
||||
else => {
|
||||
std.debug.print("Job queued: {s}\n", .{job_name});
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn executeLocal(
|
||||
allocator: std.mem.Allocator,
|
||||
job_name: []const u8,
|
||||
options: *const ExecOptions,
|
||||
args_str: []const u8,
|
||||
cfg: config.Config,
|
||||
) ![]const u8 {
|
||||
// Resolve command
|
||||
const command = blk: {
|
||||
if (cfg.experiment) |exp| {
|
||||
const ep = exp.entrypoint;
|
||||
var parts = try std.ArrayList([]const u8).initCapacity(allocator, 16);
|
||||
defer parts.deinit(allocator);
|
||||
|
||||
var it = std.mem.splitScalar(u8, ep, ' ');
|
||||
while (it.next()) |part| {
|
||||
try parts.append(allocator, part);
|
||||
}
|
||||
|
||||
if (args_str.len > 0) {
|
||||
var arg_it = std.mem.splitScalar(u8, args_str, ' ');
|
||||
while (arg_it.next()) |arg| {
|
||||
try parts.append(allocator, arg);
|
||||
}
|
||||
}
|
||||
|
||||
break :blk try parts.toOwnedSlice(allocator);
|
||||
} else {
|
||||
// Default: python job_name
|
||||
const has_args = args_str.len > 0;
|
||||
const parts = try allocator.alloc([]const u8, if (has_args) 2 else 1);
|
||||
parts[0] = job_name;
|
||||
if (has_args) {
|
||||
parts[1] = args_str;
|
||||
}
|
||||
break :blk parts;
|
||||
}
|
||||
};
|
||||
defer {
|
||||
for (command) |c| allocator.free(c);
|
||||
allocator.free(command);
|
||||
}
|
||||
|
||||
// Generate run_id
|
||||
const run_id = try db.generateUUID(allocator);
|
||||
errdefer allocator.free(run_id);
|
||||
|
||||
// Build paths
|
||||
const experiment_name = if (cfg.experiment) |exp| exp.name else "default";
|
||||
const artifact_path = try std.fs.path.join(allocator, &[_][]const u8{
|
||||
cfg.artifact_path,
|
||||
experiment_name,
|
||||
run_id,
|
||||
});
|
||||
defer allocator.free(artifact_path);
|
||||
|
||||
// Create run directory
|
||||
std.fs.makeDirAbsolute(artifact_path) catch |err| {
|
||||
if (err != error.PathAlreadyExists) {
|
||||
std.log.err("Failed to create run directory: {}", .{err});
|
||||
return error.MkdirFailed;
|
||||
}
|
||||
};
|
||||
|
||||
// Write manifest
|
||||
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{
|
||||
artifact_path,
|
||||
"run_manifest.json",
|
||||
});
|
||||
defer allocator.free(manifest_path);
|
||||
|
||||
const timestamp = try db.currentTimestamp(allocator);
|
||||
defer allocator.free(timestamp);
|
||||
|
||||
var manifest = Manifest.init(allocator);
|
||||
manifest.run_id = run_id;
|
||||
manifest.experiment = experiment_name;
|
||||
manifest.command = try std.mem.join(allocator, " ", command);
|
||||
manifest.args = command;
|
||||
manifest.started_at = try allocator.dupe(u8, timestamp);
|
||||
manifest.status = "RUNNING";
|
||||
|
||||
// Note: narrative fields stored via params HashMap
|
||||
if (options.hypothesis) |h| try manifest.params.put("hypothesis", h);
|
||||
if (options.context) |c| try manifest.params.put("context", c);
|
||||
if (options.intent) |i| try manifest.params.put("intent", i);
|
||||
if (options.expected_outcome) |eo| try manifest.params.put("expected_outcome", eo);
|
||||
if (options.tags) |t| try manifest.params.put("tags", t);
|
||||
|
||||
// Mark as pending sync via params
|
||||
try manifest.params.put("sync_pending", "true");
|
||||
|
||||
try manifest_lib.writeManifest(manifest, manifest_path, allocator);
|
||||
|
||||
// Execute the command
|
||||
std.log.info("Executing locally: {s}", .{manifest.command});
|
||||
|
||||
const argv_z = try allocator.alloc(?[*:0]const u8, command.len + 1);
|
||||
defer allocator.free(argv_z);
|
||||
for (command, 0..) |arg, idx| {
|
||||
const arg_copy = try allocator.alloc(u8, arg.len + 1);
|
||||
@memcpy(arg_copy[0..arg.len], arg);
|
||||
arg_copy[arg.len] = 0;
|
||||
argv_z[idx] = @ptrCast(arg_copy.ptr);
|
||||
}
|
||||
argv_z[command.len] = null;
|
||||
|
||||
const pid = fork();
|
||||
if (pid < 0) {
|
||||
return error.ForkFailed;
|
||||
} else if (pid == 0) {
|
||||
// Child process
|
||||
const result = execvp(argv_z[0].?, argv_z.ptr);
|
||||
std.log.err("Failed to exec: {}", .{result});
|
||||
std.process.exit(1);
|
||||
}
|
||||
|
||||
// Parent: wait for child
|
||||
var status: c_int = 0;
|
||||
const wait_result = waitpid(pid, &status, 0);
|
||||
if (wait_result < 0) {
|
||||
return error.WaitFailed;
|
||||
}
|
||||
|
||||
// Update manifest
|
||||
const end_timestamp = try db.currentTimestamp(allocator);
|
||||
defer allocator.free(end_timestamp);
|
||||
|
||||
var updated_manifest = try manifest_lib.readManifest(manifest_path, allocator);
|
||||
defer updated_manifest.deinit(allocator);
|
||||
|
||||
updated_manifest.ended_at = try allocator.dupe(u8, end_timestamp);
|
||||
updated_manifest.exit_code = if (WIFEXITED(status) != 0) @intCast(WEXITSTATUS(status)) else null;
|
||||
updated_manifest.status = if (WIFEXITED(status) != 0 and WEXITSTATUS(status) == 0) "COMPLETED" else "FAILED";
|
||||
|
||||
try manifest_lib.writeManifest(updated_manifest, manifest_path, allocator);
|
||||
|
||||
return run_id;
|
||||
}
|
||||
|
||||
fn markForSync(allocator: std.mem.Allocator, run_id: []const u8) !void {
|
||||
// Store in sync queue database
|
||||
const db_path = try getSyncDBPath(allocator);
|
||||
defer allocator.free(db_path);
|
||||
|
||||
var database = try db.initOrOpenSyncDB(allocator, db_path);
|
||||
defer database.close();
|
||||
|
||||
try database.markForSync(run_id);
|
||||
}
|
||||
|
||||
fn getSyncDBPath(allocator: std.mem.Allocator) ![]const u8 {
|
||||
const home = std.posix.getenv("HOME") orelse ".";
|
||||
return std.fs.path.join(allocator, &[_][]const u8{ home, ".ml", "sync.db" });
|
||||
}
|
||||
|
||||
fn dryRun(
|
||||
_allocator: std.mem.Allocator,
|
||||
job_name: []const u8,
|
||||
exec_mode: ExecMode,
|
||||
options: *const ExecOptions,
|
||||
args_str: []const u8,
|
||||
) !void {
|
||||
_ = _allocator;
|
||||
std.debug.print("Dry run for job: {s}\n", .{job_name});
|
||||
std.debug.print(" Mode: {s}\n", .{@tagName(exec_mode)});
|
||||
std.debug.print(" CPU: {d}, Memory: {d}GB, GPU: {d}\n", .{ options.cpu, options.memory, options.gpu });
|
||||
if (args_str.len > 0) {
|
||||
std.debug.print(" Args: {s}\n", .{args_str});
|
||||
}
|
||||
if (options.hypothesis) |h| {
|
||||
std.debug.print(" Hypothesis: {s}\n", .{h});
|
||||
}
|
||||
std.debug.print("\n Action: Would {s}\n", .{
|
||||
switch (exec_mode) {
|
||||
.local => "execute locally and mark for sync",
|
||||
.remote => "queue on remote server",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
fn buildNarrativeJson(allocator: std.mem.Allocator, options: *const ExecOptions) !?[]const u8 {
|
||||
if (options.hypothesis == null and options.context == null and
|
||||
options.intent == null and options.expected_outcome == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var buf = try std.ArrayList(u8).initCapacity(allocator, 256);
|
||||
defer buf.deinit(allocator);
|
||||
const writer = buf.writer(allocator);
|
||||
|
||||
try writer.writeAll("{");
|
||||
var first = true;
|
||||
|
||||
if (options.hypothesis) |h| {
|
||||
if (!first) try writer.writeAll(",");
|
||||
try writer.print("\"hypothesis\":\"{s}\"", .{h});
|
||||
first = false;
|
||||
}
|
||||
if (options.context) |c| {
|
||||
if (!first) try writer.writeAll(",");
|
||||
try writer.print("\"context\":\"{s}\"", .{c});
|
||||
first = false;
|
||||
}
|
||||
if (options.intent) |i| {
|
||||
if (!first) try writer.writeAll(",");
|
||||
try writer.print("\"intent\":\"{s}\"", .{i});
|
||||
first = false;
|
||||
}
|
||||
if (options.expected_outcome) |eo| {
|
||||
if (!first) try writer.writeAll(",");
|
||||
try writer.print("\"expected_outcome\":\"{s}\"", .{eo});
|
||||
first = false;
|
||||
}
|
||||
|
||||
try writer.writeAll("}");
|
||||
return try buf.toOwnedSlice(allocator);
|
||||
}
|
||||
|
||||
fn printUsage() !void {
|
||||
std.debug.print(
|
||||
\\n
|
||||
\\ml exec <job_name> [options] [-- <args>]
|
||||
\\
|
||||
\\Unified execution - works locally or remotely with transparent fallback.
|
||||
\\n \\Options:
|
||||
\\ --priority <1-10> Job priority (default: 5)
|
||||
\\ --cpu <n> CPU cores requested (default: 1)
|
||||
\\ --memory <n> Memory GB requested (default: 4)
|
||||
\\ --gpu <n> GPU devices requested (default: 0)
|
||||
\\ --gpu-memory <spec> GPU memory spec
|
||||
\\n \\Execution mode:
|
||||
\\ --local Force local execution
|
||||
\\ --remote Force remote (fail if offline)
|
||||
\\ (auto-detect if neither flag set)
|
||||
\\n \\Research context:
|
||||
\\ --hypothesis <text> What you're testing
|
||||
\\ --context <text> Background information
|
||||
\\ --intent <text> What you're trying to do
|
||||
\\ --expected-outcome <t> What you expect to happen
|
||||
\\ --tags <csv> Comma-separated tags
|
||||
\\n \\Examples:
|
||||
\\ ml exec train.py
|
||||
\\ ml exec train.py -- --lr 0.001 --epochs 10
|
||||
\\ ml exec train.py --priority 8 --gpu 1
|
||||
\\ ml exec train.py --hypothesis "LR scaling helps"
|
||||
\\
|
||||
, .{});
|
||||
}
|
||||
|
|
@ -173,7 +173,7 @@ fn executeQueue(allocator: std.mem.Allocator, args: []const []const u8, config:
|
|||
.dry_run = config.default_dry_run,
|
||||
.validate = config.default_validate,
|
||||
.json = config.default_json,
|
||||
.secrets = std.ArrayList([]const u8).empty,
|
||||
.secrets = try std.ArrayList([]const u8).initCapacity(allocator, 4),
|
||||
};
|
||||
defer options.secrets.deinit(allocator);
|
||||
priority = config.default_priority;
|
||||
|
|
@ -544,7 +544,7 @@ fn queueSingleJob(
|
|||
const combined_json = blk: {
|
||||
if (tracking_json.len > 0 and narrative_json != null) {
|
||||
// Merge tracking and narrative
|
||||
var buf = std.ArrayList(u8).empty;
|
||||
var buf = try std.ArrayList(u8).initCapacity(allocator, 256);
|
||||
defer buf.deinit(allocator);
|
||||
const writer = buf.writer(allocator);
|
||||
try writer.writeAll("{");
|
||||
|
|
@ -557,7 +557,7 @@ fn queueSingleJob(
|
|||
} else if (tracking_json.len > 0) {
|
||||
break :blk try allocator.dupe(u8, tracking_json);
|
||||
} else if (narrative_json) |nj| {
|
||||
var buf = std.ArrayList(u8).empty;
|
||||
var buf = try std.ArrayList(u8).initCapacity(allocator, 256);
|
||||
defer buf.deinit(allocator);
|
||||
const writer = buf.writer(allocator);
|
||||
try writer.writeAll("{\"narrative\":");
|
||||
|
|
@ -744,7 +744,7 @@ fn printUsage() !void {
|
|||
}
|
||||
|
||||
pub fn formatNextSteps(allocator: std.mem.Allocator, job_name: []const u8, commit_hex: []const u8) ![]u8 {
|
||||
var out = std.ArrayList(u8){};
|
||||
var out = try std.ArrayList(u8).initCapacity(allocator, 128);
|
||||
errdefer out.deinit(allocator);
|
||||
|
||||
const writer = out.writer(allocator);
|
||||
|
|
@ -779,7 +779,10 @@ fn explainJob(
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"explain\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"explain\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
try writeJSONNullableString(&stdout_file, options.gpu_memory);
|
||||
if (narrative_json) |nj| {
|
||||
|
|
@ -904,7 +907,10 @@ fn dryRunJob(
|
|||
if (options.json) {
|
||||
const stdout_file = std.fs.File{ .handle = std.posix.STDOUT_FILENO };
|
||||
var buffer: [4096]u8 = undefined;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"dry_run\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch unreachable;
|
||||
const formatted = std.fmt.bufPrint(&buffer, "{{\"action\":\"dry_run\",\"job_name\":\"{s}\",\"commit_id\":\"{s}\",\"priority\":{d},\"resources\":{{\"cpu\":{d},\"memory_gb\":{d},\"gpu\":{d},\"gpu_memory\":", .{ job_name, commit_display, priority, options.cpu, options.memory, options.gpu }) catch |err| {
|
||||
std.log.err("Failed to format output: {}", .{err});
|
||||
return error.FormatError;
|
||||
};
|
||||
try stdout_file.writeAll(formatted);
|
||||
try writeJSONNullableString(&stdout_file, options.gpu_memory);
|
||||
if (narrative_json) |nj| {
|
||||
|
|
@ -1178,7 +1184,7 @@ fn buildNarrativeJson(allocator: std.mem.Allocator, options: *const QueueOptions
|
|||
return null;
|
||||
}
|
||||
|
||||
var buf = std.ArrayList(u8).empty;
|
||||
var buf = try std.ArrayList(u8).initCapacity(allocator, 256);
|
||||
defer buf.deinit(allocator);
|
||||
|
||||
const writer = buf.writer(allocator);
|
||||
|
|
|
|||
|
|
@ -195,7 +195,7 @@ fn resolveCommand(allocator: std.mem.Allocator, cfg: *const config.Config, args:
|
|||
if (cfg.experiment) |exp| {
|
||||
if (exp.entrypoint.len > 0) {
|
||||
// Split entrypoint on spaces
|
||||
var argv: std.ArrayList([]const u8) = .empty;
|
||||
var argv = try std.ArrayList([]const u8).initCapacity(allocator, 8);
|
||||
|
||||
// Parse entrypoint (split on spaces)
|
||||
var iter = std.mem.splitScalar(u8, exp.entrypoint, ' ');
|
||||
|
|
|
|||
Loading…
Reference in a new issue