diff --git a/cli/src/commands/annotate.zig b/cli/src/commands/annotate.zig index a300c97..686fe0a 100644 --- a/cli/src/commands/annotate.zig +++ b/cli/src/commands/annotate.zig @@ -4,6 +4,8 @@ const Config = @import("../config.zig").Config; const crypto = @import("../utils/crypto.zig"); const io = @import("../utils/io.zig"); const ws = @import("../net/ws/client.zig"); +const protocol = @import("../net/protocol.zig"); +const manifest = @import("../utils/manifest.zig"); pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { if (args.len == 0) { @@ -75,7 +77,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { const resolved_base = base_override orelse cfg.worker_base; - const manifest_path = resolveManifestPathWithBase(allocator, target, resolved_base) catch |err| { + const manifest_path = manifest.resolvePathWithBase(allocator, target, resolved_base) catch |err| { if (err == error.FileNotFound) { colors.printError( "Could not locate run_manifest.json for '{s}'. Provide a path, or use --base to scan finished/failed/running/pending.\n", @@ -86,7 +88,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { }; defer allocator.free(manifest_path); - const job_name = try readJobNameFromManifest(allocator, manifest_path); + const job_name = try manifest.readJobNameFromManifest(allocator, manifest_path); defer allocator.free(job_name); const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key); @@ -104,16 +106,12 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { const msg = try client.receiveMessage(allocator); defer allocator.free(msg); - const packet = @import("../net/protocol.zig").ResponsePacket.deserialize(msg, allocator) catch { + const packet = protocol.ResponsePacket.deserialize(msg, allocator) catch { var out = io.stdoutWriter(); try out.print("{s}\n", .{msg}); return error.InvalidPacket; }; - defer { - if (packet.success_message) |m| allocator.free(m); - if (packet.error_message) |m| allocator.free(m); - if (packet.error_details) |m| allocator.free(m); - } + defer packet.deinit(allocator); const Result = struct { ok: bool, @@ -153,127 +151,6 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { colors.printInfo("Job: {s}\n", .{job_name}); } -fn readJobNameFromManifest(allocator: std.mem.Allocator, manifest_path: []const u8) ![]u8 { - const data = try readFileAlloc(allocator, manifest_path); - defer allocator.free(data); - - const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{}); - defer parsed.deinit(); - - if (parsed.value != .object) return error.InvalidManifest; - const root = parsed.value.object; - - const job_name = jsonGetString(root, "job_name") orelse ""; - if (std.mem.trim(u8, job_name, " \t\r\n").len == 0) { - return error.InvalidManifest; - } - return allocator.dupe(u8, job_name); -} - -fn resolveManifestPathWithBase( - allocator: std.mem.Allocator, - input: []const u8, - base: []const u8, -) ![]u8 { - var cwd = std.fs.cwd(); - - if (std.fs.path.isAbsolute(input)) { - if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| { - var mutable_dir = dir; - defer mutable_dir.close(); - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - if (std.fs.openFileAbsolute(input, .{}) catch null) |file| { - var mutable_file = file; - defer mutable_file.close(); - return allocator.dupe(u8, input); - } - return resolveManifestPathById(allocator, input, base); - } - - const stat = cwd.statFile(input) catch |err| { - if (err == error.FileNotFound) { - return resolveManifestPathById(allocator, input, base); - } - return err; - }; - - if (stat.kind == .directory) { - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - - return allocator.dupe(u8, input); -} - -fn resolveManifestPathById(allocator: std.mem.Allocator, id: []const u8, base_path: []const u8) ![]u8 { - if (std.mem.trim(u8, id, " \t\r\n").len == 0) { - return error.FileNotFound; - } - if (base_path.len == 0) { - return error.FileNotFound; - } - - const roots = [_][]const u8{ "finished", "failed", "running", "pending" }; - for (roots) |root| { - const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root }); - defer allocator.free(root_path); - - var dir = if (std.fs.path.isAbsolute(root_path)) - (std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue) - else - (std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue); - defer dir.close(); - - var it = dir.iterate(); - while (try it.next()) |entry| { - if (entry.kind != .directory) continue; - - const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name }); - defer allocator.free(run_dir); - const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" }); - defer allocator.free(manifest_path); - - const file = if (std.fs.path.isAbsolute(manifest_path)) - (std.fs.openFileAbsolute(manifest_path, .{}) catch continue) - else - (std.fs.cwd().openFile(manifest_path, .{}) catch continue); - defer file.close(); - - const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue; - defer allocator.free(data); - - const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue; - defer parsed.deinit(); - if (parsed.value != .object) continue; - - const obj = parsed.value.object; - const run_id = jsonGetString(obj, "run_id") orelse ""; - const task_id = jsonGetString(obj, "task_id") orelse ""; - if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) { - return allocator.dupe(u8, manifest_path); - } - } - } - - return error.FileNotFound; -} - -fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 { - var file = if (std.fs.path.isAbsolute(path)) - try std.fs.openFileAbsolute(path, .{}) - else - try std.fs.cwd().openFile(path, .{}); - defer file.close(); - - return file.readToEndAlloc(allocator, 1024 * 1024); -} - -fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 { - const v = obj.get(key) orelse return null; - if (v != .string) return null; - return v.string; -} - fn printUsage() !void { colors.printInfo("Usage: ml annotate --note [--author ] [--base ] [--json]\n", .{}); colors.printInfo("\nExamples:\n", .{}); diff --git a/cli/src/commands/info.zig b/cli/src/commands/info.zig index 126743b..a34f53a 100644 --- a/cli/src/commands/info.zig +++ b/cli/src/commands/info.zig @@ -2,6 +2,8 @@ const std = @import("std"); const colors = @import("../utils/colors.zig"); const Config = @import("../config.zig").Config; const io = @import("../utils/io.zig"); +const json = @import("../utils/json.zig"); +const manifest = @import("../utils/manifest.zig"); pub const Options = struct { json: bool = false, @@ -47,7 +49,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { return error.InvalidArgs; } - const manifest_path = resolveManifestPathWithBase(allocator, target_path.?, opts.base) catch |err| { + const manifest_path = manifest.resolvePathWithBase(allocator, target_path.?, opts.base) catch |err| { if (err == error.FileNotFound) { colors.printError( "Could not locate run_manifest.json for '{s}'. Provide a path, or use --base to scan finished/failed/running/pending.\n", @@ -58,7 +60,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { }; defer allocator.free(manifest_path); - const data = try readFileAlloc(allocator, manifest_path); + const data = try manifest.readFileAlloc(allocator, manifest_path); defer allocator.free(data); if (opts.json) { @@ -77,31 +79,31 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { const root = parsed.value.object; - const run_id = jsonGetString(root, "run_id") orelse ""; - const task_id = jsonGetString(root, "task_id") orelse ""; - const job_name = jsonGetString(root, "job_name") orelse ""; + const run_id = json.getString(root, "run_id") orelse ""; + const task_id = json.getString(root, "task_id") orelse ""; + const job_name = json.getString(root, "job_name") orelse ""; - const commit_id = jsonGetString(root, "commit_id") orelse ""; - const worker_version = jsonGetString(root, "worker_version") orelse ""; - const podman_image = jsonGetString(root, "podman_image") orelse ""; + const commit_id = json.getString(root, "commit_id") orelse ""; + const worker_version = json.getString(root, "worker_version") orelse ""; + const podman_image = json.getString(root, "podman_image") orelse ""; - const snapshot_id = jsonGetString(root, "snapshot_id") orelse ""; - const snapshot_sha = jsonGetString(root, "snapshot_sha256") orelse ""; + const snapshot_id = json.getString(root, "snapshot_id") orelse ""; + const snapshot_sha = json.getString(root, "snapshot_sha256") orelse ""; - const command = jsonGetString(root, "command") orelse ""; - const cmd_args = jsonGetString(root, "args") orelse ""; + const command = json.getString(root, "command") orelse ""; + const cmd_args = json.getString(root, "args") orelse ""; - const exit_code = jsonGetInt(root, "exit_code"); - const err_msg = jsonGetString(root, "error") orelse ""; + const exit_code = json.getInt(root, "exit_code"); + const err_msg = json.getString(root, "error") orelse ""; - const created_at = jsonGetString(root, "created_at") orelse ""; - const started_at = jsonGetString(root, "started_at") orelse ""; - const ended_at = jsonGetString(root, "ended_at") orelse ""; + const created_at = json.getString(root, "created_at") orelse ""; + const started_at = json.getString(root, "started_at") orelse ""; + const ended_at = json.getString(root, "ended_at") orelse ""; - const staging_ms = jsonGetInt(root, "staging_duration_ms") orelse 0; - const exec_ms = jsonGetInt(root, "execution_duration_ms") orelse 0; - const finalize_ms = jsonGetInt(root, "finalize_duration_ms") orelse 0; - const total_ms = jsonGetInt(root, "total_duration_ms") orelse 0; + const staging_ms = json.getInt(root, "staging_duration_ms") orelse 0; + const exec_ms = json.getInt(root, "execution_duration_ms") orelse 0; + const finalize_ms = json.getInt(root, "finalize_duration_ms") orelse 0; + const total_ms = json.getInt(root, "total_duration_ms") orelse 0; colors.printInfo("run_manifest: {s}\n", .{manifest_path}); @@ -148,137 +150,6 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void { } } -fn resolveManifestPath(allocator: std.mem.Allocator, input: []const u8) ![]u8 { - return resolveManifestPathWithBase(allocator, input, null); -} - -fn resolveManifestPathWithBase( - allocator: std.mem.Allocator, - input: []const u8, - base_override: ?[]const u8, -) ![]u8 { - var cwd = std.fs.cwd(); - - if (std.fs.path.isAbsolute(input)) { - if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| { - var mutable_dir = dir; - defer mutable_dir.close(); - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - if (std.fs.openFileAbsolute(input, .{}) catch null) |file| { - var mutable_file = file; - defer mutable_file.close(); - return allocator.dupe(u8, input); - } - return resolveManifestPathById(allocator, input, base_override); - } - - const stat = cwd.statFile(input) catch |err| { - if (err == error.FileNotFound) { - return resolveManifestPathById(allocator, input, base_override); - } - return err; - }; - - if (stat.kind == .directory) { - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - - return allocator.dupe(u8, input); -} - -fn resolveManifestPathById( - allocator: std.mem.Allocator, - id: []const u8, - base_override: ?[]const u8, -) ![]u8 { - if (std.mem.trim(u8, id, " \t\r\n").len == 0) { - return error.FileNotFound; - } - - var cfg: ?Config = null; - defer if (cfg) |*c| c.deinit(allocator); - - const base_path: []const u8 = blk: { - if (base_override) |b| break :blk b; - cfg = Config.load(allocator) catch { - break :blk ""; - }; - break :blk cfg.?.worker_base; - }; - if (base_path.len == 0) { - return error.FileNotFound; - } - - const roots = [_][]const u8{ "finished", "failed", "running", "pending" }; - for (roots) |root| { - const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root }); - defer allocator.free(root_path); - - var dir = if (std.fs.path.isAbsolute(root_path)) - (std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue) - else - (std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue); - defer dir.close(); - - var it = dir.iterate(); - while (try it.next()) |entry| { - if (entry.kind != .directory) continue; - - const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name }); - defer allocator.free(run_dir); - const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" }); - defer allocator.free(manifest_path); - - const file = if (std.fs.path.isAbsolute(manifest_path)) - (std.fs.openFileAbsolute(manifest_path, .{}) catch continue) - else - (std.fs.cwd().openFile(manifest_path, .{}) catch continue); - defer file.close(); - - const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue; - defer allocator.free(data); - - const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue; - defer parsed.deinit(); - if (parsed.value != .object) continue; - - const obj = parsed.value.object; - const run_id = jsonGetString(obj, "run_id") orelse ""; - const task_id = jsonGetString(obj, "task_id") orelse ""; - if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) { - return allocator.dupe(u8, manifest_path); - } - } - } - - return error.FileNotFound; -} - -fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 { - var file = if (std.fs.path.isAbsolute(path)) - try std.fs.openFileAbsolute(path, .{}) - else - try std.fs.cwd().openFile(path, .{}); - defer file.close(); - const max_bytes: usize = 10 * 1024 * 1024; - return file.readToEndAlloc(allocator, max_bytes); -} - -fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 { - const v = obj.get(key) orelse return null; - if (v == .string) return v.string; - return null; -} - -fn jsonGetInt(obj: std.json.ObjectMap, key: []const u8) ?i64 { - const v = obj.get(key) orelse return null; - return switch (v) { - .integer => v.integer, - else => null, - }; -} - fn printUsage() !void { colors.printInfo("Usage:\n", .{}); std.debug.print(" ml info [--json] [--base ]\n", .{}); @@ -295,7 +166,7 @@ test "resolveManifestPath uses run_manifest.json for directories" { try tmp.dir.makeDir("run"); const run_abs = try tmp.dir.realpathAlloc(allocator, "run"); defer allocator.free(run_abs); - const got = try resolveManifestPath(allocator, run_abs); + const got = try manifest.resolvePathWithBase(allocator, run_abs, null); try std.testing.expect(std.mem.endsWith(u8, got, "run/run_manifest.json")); } @@ -321,6 +192,6 @@ test "resolveManifestPath resolves by task id when base is provided" { const base_abs = try tmp.dir.realpathAlloc(allocator, "."); defer allocator.free(base_abs); - const got = try resolveManifestPathWithBase(allocator, "task-123", base_abs); + const got = try manifest.resolvePathWithBase(allocator, "task-123", base_abs); try std.testing.expect(std.mem.endsWith(u8, got, "finished/run-a/run_manifest.json")); } diff --git a/cli/src/commands/narrative.zig b/cli/src/commands/narrative.zig index 3e81fd0..908b7c6 100644 --- a/cli/src/commands/narrative.zig +++ b/cli/src/commands/narrative.zig @@ -4,6 +4,8 @@ const Config = @import("../config.zig").Config; const crypto = @import("../utils/crypto.zig"); const io = @import("../utils/io.zig"); const ws = @import("../net/ws/client.zig"); +const protocol = @import("../net/protocol.zig"); +const manifest = @import("../utils/manifest.zig"); pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { if (argv.len == 0) { @@ -98,7 +100,7 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { } const resolved_base = base_override orelse cfg.worker_base; - const manifest_path = resolveManifestPathWithBase(allocator, target, resolved_base) catch |err| { + const manifest_path = manifest.resolvePathWithBase(allocator, target, resolved_base) catch |err| { if (err == error.FileNotFound) { colors.printError( "Could not locate run_manifest.json for '{s}'. Provide a path, or use --base to scan finished/failed/running/pending.\n", @@ -109,7 +111,7 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { }; defer allocator.free(manifest_path); - const job_name = try readJobNameFromManifest(allocator, manifest_path); + const job_name = try manifest.readJobNameFromManifest(allocator, manifest_path); defer allocator.free(job_name); const patch_json = try buildPatchJSON( @@ -139,16 +141,12 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { const msg = try client.receiveMessage(allocator); defer allocator.free(msg); - const packet = @import("../net/protocol.zig").ResponsePacket.deserialize(msg, allocator) catch { + const packet = protocol.ResponsePacket.deserialize(msg, allocator) catch { var out = io.stdoutWriter(); try out.print("{s}\n", .{msg}); return error.InvalidPacket; }; - defer { - if (packet.success_message) |m| allocator.free(m); - if (packet.error_message) |m| allocator.free(m); - if (packet.error_details) |m| allocator.free(m); - } + defer packet.deinit(allocator); const Result = struct { ok: bool, @@ -238,123 +236,6 @@ fn buildPatchJSON( return out.toOwnedSlice(allocator); } -fn readJobNameFromManifest(allocator: std.mem.Allocator, manifest_path: []const u8) ![]u8 { - const data = try readFileAlloc(allocator, manifest_path); - defer allocator.free(data); - - const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{}); - defer parsed.deinit(); - - if (parsed.value != .object) return error.InvalidManifest; - const root = parsed.value.object; - - const job_name = jsonGetString(root, "job_name") orelse ""; - if (std.mem.trim(u8, job_name, " \t\r\n").len == 0) { - return error.InvalidManifest; - } - return allocator.dupe(u8, job_name); -} - -fn resolveManifestPathWithBase(allocator: std.mem.Allocator, input: []const u8, base: []const u8) ![]u8 { - var cwd = std.fs.cwd(); - - if (std.fs.path.isAbsolute(input)) { - if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| { - var mutable_dir = dir; - defer mutable_dir.close(); - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - if (std.fs.openFileAbsolute(input, .{}) catch null) |file| { - var mutable_file = file; - defer mutable_file.close(); - return allocator.dupe(u8, input); - } - return resolveManifestPathById(allocator, input, base); - } - - const stat = cwd.statFile(input) catch |err| { - if (err == error.FileNotFound) { - return resolveManifestPathById(allocator, input, base); - } - return err; - }; - - if (stat.kind == .directory) { - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - - return allocator.dupe(u8, input); -} - -fn resolveManifestPathById(allocator: std.mem.Allocator, id: []const u8, base_path: []const u8) ![]u8 { - if (std.mem.trim(u8, id, " \t\r\n").len == 0) { - return error.FileNotFound; - } - if (base_path.len == 0) { - return error.FileNotFound; - } - - const roots = [_][]const u8{ "finished", "failed", "running", "pending" }; - for (roots) |root| { - const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root }); - defer allocator.free(root_path); - - var dir = if (std.fs.path.isAbsolute(root_path)) - (std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue) - else - (std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue); - defer dir.close(); - - var it = dir.iterate(); - while (try it.next()) |entry| { - if (entry.kind != .directory) continue; - - const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name }); - defer allocator.free(run_dir); - const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" }); - defer allocator.free(manifest_path); - - const file = if (std.fs.path.isAbsolute(manifest_path)) - (std.fs.openFileAbsolute(manifest_path, .{}) catch continue) - else - (std.fs.cwd().openFile(manifest_path, .{}) catch continue); - defer file.close(); - - const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue; - defer allocator.free(data); - - const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue; - defer parsed.deinit(); - if (parsed.value != .object) continue; - - const obj = parsed.value.object; - const run_id = jsonGetString(obj, "run_id") orelse ""; - const task_id = jsonGetString(obj, "task_id") orelse ""; - if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) { - return allocator.dupe(u8, manifest_path); - } - } - } - - return error.FileNotFound; -} - -fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 { - var file = if (std.fs.path.isAbsolute(path)) - try std.fs.openFileAbsolute(path, .{}) - else - try std.fs.cwd().openFile(path, .{}); - defer file.close(); - - return file.readToEndAlloc(allocator, 1024 * 1024); -} - -fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 { - const v = obj.get(key) orelse return null; - if (v != .string) return null; - return v.string; -} - fn printUsage() !void { colors.printInfo("Usage: ml narrative set [fields]\n", .{}); colors.printInfo("\nFields:\n", .{}); diff --git a/cli/src/commands/requeue.zig b/cli/src/commands/requeue.zig index 92ab0ec..a4304b9 100644 --- a/cli/src/commands/requeue.zig +++ b/cli/src/commands/requeue.zig @@ -4,6 +4,8 @@ const Config = @import("../config.zig").Config; const crypto = @import("../utils/crypto.zig"); const ws = @import("../net/ws/client.zig"); const protocol = @import("../net/protocol.zig"); +const manifest = @import("../utils/manifest.zig"); +const json = @import("../utils/json.zig"); pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { if (argv.len == 0) { @@ -138,10 +140,10 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { }; if (commit_hex.len == 0) { - const manifest_path = try resolveManifestPath(allocator, target, cfg.worker_base); + const manifest_path = try manifest.resolvePathWithBase(allocator, target, cfg.worker_base); defer allocator.free(manifest_path); - const data = try readFileAlloc(allocator, manifest_path); + const data = try manifest.readFileAlloc(allocator, manifest_path); defer allocator.free(data); const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{}); @@ -150,14 +152,14 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { if (parsed.value != .object) return error.InvalidManifest; const root = parsed.value.object; - commit_hex = jsonGetString(root, "commit_id") orelse ""; + commit_hex = json.getString(root, "commit_id") orelse ""; if (commit_hex.len != 40) { colors.printError("run manifest missing commit_id\n", .{}); return error.InvalidManifest; } if (job_name_override == null) { - const j = jsonGetString(root, "job_name") orelse ""; + const j = json.getString(root, "job_name") orelse ""; if (j.len > 0) job_name = j; } @@ -220,13 +222,7 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void { } return; }; - defer { - if (packet.success_message) |m| allocator.free(m); - if (packet.error_message) |m| allocator.free(m); - if (packet.error_details) |m| allocator.free(m); - if (packet.data_payload) |m| allocator.free(m); - if (packet.data_type) |m| allocator.free(m); - } + defer packet.deinit(allocator); switch (packet.packet_type) { .success => { @@ -289,6 +285,11 @@ fn handleDuplicateResponse( } } +fn printUsage() !void { + colors.printInfo("Usage:\n", .{}); + colors.printInfo(" ml requeue [--name ] [--priority ] [--cpu ] [--memory ] [--gpu ] [--gpu-memory ] [--args ] [--note ] [--force] -- \n", .{}); +} + fn isHexLowerOrUpper(s: []const u8) bool { for (s) |c| { if (!std.ascii.isHex(c)) return false; @@ -325,101 +326,3 @@ fn resolveCommitPrefix(allocator: std.mem.Allocator, base_path: []const u8, pref colors.printError("No commit matches prefix: {s}\n", .{prefix}); return error.FileNotFound; } - -fn resolveManifestPath(allocator: std.mem.Allocator, input: []const u8, base_path: []const u8) ![]u8 { - var cwd = std.fs.cwd(); - - if (std.fs.path.isAbsolute(input)) { - if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| { - var mutable_dir = dir; - defer mutable_dir.close(); - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - if (std.fs.openFileAbsolute(input, .{}) catch null) |file| { - var mutable_file = file; - defer mutable_file.close(); - return allocator.dupe(u8, input); - } - return resolveManifestPathById(allocator, input, base_path); - } - - const stat = cwd.statFile(input) catch |err| { - if (err == error.FileNotFound) { - return resolveManifestPathById(allocator, input, base_path); - } - return err; - }; - - if (stat.kind == .directory) { - return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); - } - - return allocator.dupe(u8, input); -} - -fn resolveManifestPathById(allocator: std.mem.Allocator, id: []const u8, base_path: []const u8) ![]u8 { - const roots = [_][]const u8{ "finished", "failed", "running", "pending" }; - for (roots) |root| { - const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root }); - defer allocator.free(root_path); - - var dir = if (std.fs.path.isAbsolute(root_path)) - (std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue) - else - (std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue); - defer dir.close(); - - var it = dir.iterate(); - while (try it.next()) |entry| { - if (entry.kind != .directory) continue; - - const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name }); - defer allocator.free(run_dir); - const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" }); - defer allocator.free(manifest_path); - - const file = if (std.fs.path.isAbsolute(manifest_path)) - (std.fs.openFileAbsolute(manifest_path, .{}) catch continue) - else - (std.fs.cwd().openFile(manifest_path, .{}) catch continue); - defer file.close(); - - const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue; - defer allocator.free(data); - - const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue; - defer parsed.deinit(); - if (parsed.value != .object) continue; - - const obj = parsed.value.object; - const run_id = jsonGetString(obj, "run_id") orelse ""; - const task_id = jsonGetString(obj, "task_id") orelse ""; - if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) { - return allocator.dupe(u8, manifest_path); - } - } - } - - return error.FileNotFound; -} - -fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 { - var file = if (std.fs.path.isAbsolute(path)) - try std.fs.openFileAbsolute(path, .{}) - else - try std.fs.cwd().openFile(path, .{}); - defer file.close(); - - return try file.readToEndAlloc(allocator, 1024 * 1024); -} - -fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 { - const v = obj.get(key) orelse return null; - if (v != .string) return null; - return v.string; -} - -fn printUsage() !void { - colors.printInfo("Usage:\n", .{}); - colors.printInfo(" ml requeue [--name ] [--priority ] [--cpu ] [--memory ] [--gpu ] [--gpu-memory ] [--args ] [--note ] [--force] -- \n", .{}); -} diff --git a/cli/src/net/protocol.zig b/cli/src/net/protocol.zig index 7ed4491..ec1b966 100644 --- a/cli/src/net/protocol.zig +++ b/cli/src/net/protocol.zig @@ -295,6 +295,18 @@ pub const ResponsePacket = struct { else => "UNKNOWN", }; } + + /// Free all allocated memory in the packet + pub fn deinit(self: ResponsePacket, allocator: std.mem.Allocator) void { + if (self.success_message) |msg| allocator.free(msg); + if (self.error_message) |msg| allocator.free(msg); + if (self.error_details) |details| allocator.free(details); + if (self.progress_message) |msg| allocator.free(msg); + if (self.status_data) |data| allocator.free(data); + if (self.data_type) |dtype| allocator.free(dtype); + if (self.data_payload) |payload| allocator.free(payload); + if (self.log_message) |msg| allocator.free(msg); + } }; /// Helper function to write string with length prefix @@ -390,13 +402,7 @@ test "deserialize data packet (varint lengths)" { try buf.appendSlice(allocator, "{}"); const packet = try ResponsePacket.deserialize(buf.items, allocator); - defer { - if (packet.success_message) |msg| allocator.free(msg); - if (packet.error_message) |msg| allocator.free(msg); - if (packet.error_details) |details| allocator.free(details); - if (packet.data_type) |dtype| allocator.free(dtype); - if (packet.data_payload) |payload| allocator.free(payload); - } + defer packet.deinit(allocator); try std.testing.expectEqual(PacketType.data, packet.packet_type); try std.testing.expectEqual(@as(u64, 1), packet.timestamp); diff --git a/cli/src/utils.zig b/cli/src/utils.zig index b1f6d38..384e7e0 100644 --- a/cli/src/utils.zig +++ b/cli/src/utils.zig @@ -5,7 +5,9 @@ pub const crypto = @import("utils/crypto.zig"); pub const flags = @import("utils/flags.zig"); pub const history = @import("utils/history.zig"); pub const io = @import("utils/io.zig"); +pub const json = @import("utils/json.zig"); pub const logging = @import("utils/logging.zig"); +pub const manifest = @import("utils/manifest.zig"); pub const rsync = @import("utils/rsync.zig"); pub const rsync_embedded = @import("utils/rsync_embedded.zig"); pub const rsync_embedded_binary = @import("utils/rsync_embedded_binary.zig"); diff --git a/cli/src/utils/json.zig b/cli/src/utils/json.zig new file mode 100644 index 0000000..5b3699c --- /dev/null +++ b/cli/src/utils/json.zig @@ -0,0 +1,35 @@ +const std = @import("std"); + +/// Get a string value from a JSON object map +pub fn getString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 { + const v = obj.get(key) orelse return null; + if (v != .string) return null; + return v.string; +} + +/// Get an integer value from a JSON object map +pub fn getInt(obj: std.json.ObjectMap, key: []const u8) ?i64 { + const v = obj.get(key) orelse return null; + switch (v) { + .integer => |i| return i, + .float => |f| return @intFromFloat(f), + else => return null, + } +} + +/// Get a float value from a JSON object map +pub fn getFloat(obj: std.json.ObjectMap, key: []const u8) ?f64 { + const v = obj.get(key) orelse return null; + switch (v) { + .float => |f| return f, + .integer => |i| return @floatFromInt(i), + else => return null, + } +} + +/// Get a boolean value from a JSON object map +pub fn getBool(obj: std.json.ObjectMap, key: []const u8) ?bool { + const v = obj.get(key) orelse return null; + if (v != .bool) return null; + return v.bool; +} diff --git a/cli/src/utils/manifest.zig b/cli/src/utils/manifest.zig new file mode 100644 index 0000000..2338163 --- /dev/null +++ b/cli/src/utils/manifest.zig @@ -0,0 +1,130 @@ +const std = @import("std"); +const json = @import("json.zig"); + +/// Read entire file into allocated memory +pub fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 { + var file = if (std.fs.path.isAbsolute(path)) + try std.fs.openFileAbsolute(path, .{}) + else + try std.fs.cwd().openFile(path, .{}); + defer file.close(); + + return file.readToEndAlloc(allocator, 1024 * 1024); +} + +/// Resolve manifest path from input (path, run_id, or task_id) +pub fn resolvePathWithBase( + allocator: std.mem.Allocator, + input: []const u8, + base: ?[]const u8, +) ![]u8 { + var cwd = std.fs.cwd(); + + if (std.fs.path.isAbsolute(input)) { + if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| { + var mutable_dir = dir; + defer mutable_dir.close(); + return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); + } + if (std.fs.openFileAbsolute(input, .{}) catch null) |file| { + var mutable_file = file; + defer mutable_file.close(); + return allocator.dupe(u8, input); + } + if (base) |b| { + return resolvePathById(allocator, input, b); + } + return error.FileNotFound; + } + + const stat = cwd.statFile(input) catch |err| { + if (err == error.FileNotFound) { + if (base) |b| { + return resolvePathById(allocator, input, b); + } + } + return err; + }; + + if (stat.kind == .directory) { + return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" }); + } + + return allocator.dupe(u8, input); +} + +/// Resolve manifest path by searching for run_id or task_id in base directory +pub fn resolvePathById( + allocator: std.mem.Allocator, + id: []const u8, + base_path: []const u8, +) ![]u8 { + if (std.mem.trim(u8, id, " \t\r\n").len == 0) { + return error.FileNotFound; + } + if (base_path.len == 0) { + return error.FileNotFound; + } + + const roots = [_][]const u8{ "finished", "failed", "running", "pending" }; + for (roots) |root| { + const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root }); + defer allocator.free(root_path); + + var dir = if (std.fs.path.isAbsolute(root_path)) + (std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue) + else + (std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue); + defer dir.close(); + + var it = dir.iterate(); + while (try it.next()) |entry| { + if (entry.kind != .directory) continue; + + const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name }); + defer allocator.free(run_dir); + const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" }); + defer allocator.free(manifest_path); + + const file = if (std.fs.path.isAbsolute(manifest_path)) + (std.fs.openFileAbsolute(manifest_path, .{}) catch continue) + else + (std.fs.cwd().openFile(manifest_path, .{}) catch continue); + defer file.close(); + + const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue; + defer allocator.free(data); + + const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue; + defer parsed.deinit(); + if (parsed.value != .object) continue; + + const obj = parsed.value.object; + const run_id = json.getString(obj, "run_id") orelse ""; + const task_id = json.getString(obj, "task_id") orelse ""; + if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) { + return allocator.dupe(u8, manifest_path); + } + } + } + + return error.FileNotFound; +} + +/// Read job_name from a manifest file +pub fn readJobNameFromManifest(allocator: std.mem.Allocator, manifest_path: []const u8) ![]u8 { + const data = try readFileAlloc(allocator, manifest_path); + defer allocator.free(data); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{}); + defer parsed.deinit(); + + if (parsed.value != .object) return error.InvalidManifest; + const root = parsed.value.object; + + const job_name = json.getString(root, "job_name") orelse ""; + if (std.mem.trim(u8, job_name, " \t\r\n").len == 0) { + return error.InvalidManifest; + } + return allocator.dupe(u8, job_name); +}