refactor(cli): consolidate shared utilities and remove code duplication

Extract common helper functions from multiple command files into shared
utility modules:

- Create cli/src/utils/json.zig with json.getString(), getInt(), getFloat(), getBool()
- Create cli/src/utils/manifest.zig with readFileAlloc(), resolvePathWithBase(),
  resolvePathById(), readJobNameFromManifest()
- Add ResponsePacket.deinit() method to net/protocol.zig for consistent cleanup
- Update info.zig, annotate.zig, narrative.zig, requeue.zig to use shared utilities
- Update utils.zig exports for new modules

Eliminates duplicate implementations of:
- jsonGetString() and jsonGetInt() in 4 files
- readFileAlloc() in 4 files
- resolveManifestPath*() functions in 4 files
- ResponsePacket cleanup defer blocks (replaced with .deinit())

Builds cleanly with zig build --release=fast
This commit is contained in:
Jeremie Fraeys 2026-02-18 13:19:40 -05:00
parent 34186675dc
commit 1597c20b73
No known key found for this signature in database
8 changed files with 229 additions and 524 deletions

View file

@ -4,6 +4,8 @@ const Config = @import("../config.zig").Config;
const crypto = @import("../utils/crypto.zig");
const io = @import("../utils/io.zig");
const ws = @import("../net/ws/client.zig");
const protocol = @import("../net/protocol.zig");
const manifest = @import("../utils/manifest.zig");
pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
if (args.len == 0) {
@ -75,7 +77,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
const resolved_base = base_override orelse cfg.worker_base;
const manifest_path = resolveManifestPathWithBase(allocator, target, resolved_base) catch |err| {
const manifest_path = manifest.resolvePathWithBase(allocator, target, resolved_base) catch |err| {
if (err == error.FileNotFound) {
colors.printError(
"Could not locate run_manifest.json for '{s}'. Provide a path, or use --base <path> to scan finished/failed/running/pending.\n",
@ -86,7 +88,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
};
defer allocator.free(manifest_path);
const job_name = try readJobNameFromManifest(allocator, manifest_path);
const job_name = try manifest.readJobNameFromManifest(allocator, manifest_path);
defer allocator.free(job_name);
const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key);
@ -104,16 +106,12 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
const msg = try client.receiveMessage(allocator);
defer allocator.free(msg);
const packet = @import("../net/protocol.zig").ResponsePacket.deserialize(msg, allocator) catch {
const packet = protocol.ResponsePacket.deserialize(msg, allocator) catch {
var out = io.stdoutWriter();
try out.print("{s}\n", .{msg});
return error.InvalidPacket;
};
defer {
if (packet.success_message) |m| allocator.free(m);
if (packet.error_message) |m| allocator.free(m);
if (packet.error_details) |m| allocator.free(m);
}
defer packet.deinit(allocator);
const Result = struct {
ok: bool,
@ -153,127 +151,6 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
colors.printInfo("Job: {s}\n", .{job_name});
}
fn readJobNameFromManifest(allocator: std.mem.Allocator, manifest_path: []const u8) ![]u8 {
const data = try readFileAlloc(allocator, manifest_path);
defer allocator.free(data);
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{});
defer parsed.deinit();
if (parsed.value != .object) return error.InvalidManifest;
const root = parsed.value.object;
const job_name = jsonGetString(root, "job_name") orelse "";
if (std.mem.trim(u8, job_name, " \t\r\n").len == 0) {
return error.InvalidManifest;
}
return allocator.dupe(u8, job_name);
}
fn resolveManifestPathWithBase(
allocator: std.mem.Allocator,
input: []const u8,
base: []const u8,
) ![]u8 {
var cwd = std.fs.cwd();
if (std.fs.path.isAbsolute(input)) {
if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| {
var mutable_dir = dir;
defer mutable_dir.close();
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
if (std.fs.openFileAbsolute(input, .{}) catch null) |file| {
var mutable_file = file;
defer mutable_file.close();
return allocator.dupe(u8, input);
}
return resolveManifestPathById(allocator, input, base);
}
const stat = cwd.statFile(input) catch |err| {
if (err == error.FileNotFound) {
return resolveManifestPathById(allocator, input, base);
}
return err;
};
if (stat.kind == .directory) {
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
return allocator.dupe(u8, input);
}
fn resolveManifestPathById(allocator: std.mem.Allocator, id: []const u8, base_path: []const u8) ![]u8 {
if (std.mem.trim(u8, id, " \t\r\n").len == 0) {
return error.FileNotFound;
}
if (base_path.len == 0) {
return error.FileNotFound;
}
const roots = [_][]const u8{ "finished", "failed", "running", "pending" };
for (roots) |root| {
const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root });
defer allocator.free(root_path);
var dir = if (std.fs.path.isAbsolute(root_path))
(std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue)
else
(std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue);
defer dir.close();
var it = dir.iterate();
while (try it.next()) |entry| {
if (entry.kind != .directory) continue;
const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name });
defer allocator.free(run_dir);
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" });
defer allocator.free(manifest_path);
const file = if (std.fs.path.isAbsolute(manifest_path))
(std.fs.openFileAbsolute(manifest_path, .{}) catch continue)
else
(std.fs.cwd().openFile(manifest_path, .{}) catch continue);
defer file.close();
const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue;
defer allocator.free(data);
const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue;
defer parsed.deinit();
if (parsed.value != .object) continue;
const obj = parsed.value.object;
const run_id = jsonGetString(obj, "run_id") orelse "";
const task_id = jsonGetString(obj, "task_id") orelse "";
if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) {
return allocator.dupe(u8, manifest_path);
}
}
}
return error.FileNotFound;
}
fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
var file = if (std.fs.path.isAbsolute(path))
try std.fs.openFileAbsolute(path, .{})
else
try std.fs.cwd().openFile(path, .{});
defer file.close();
return file.readToEndAlloc(allocator, 1024 * 1024);
}
fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 {
const v = obj.get(key) orelse return null;
if (v != .string) return null;
return v.string;
}
fn printUsage() !void {
colors.printInfo("Usage: ml annotate <path|run_id|task_id> --note <text> [--author <name>] [--base <path>] [--json]\n", .{});
colors.printInfo("\nExamples:\n", .{});

View file

@ -2,6 +2,8 @@ const std = @import("std");
const colors = @import("../utils/colors.zig");
const Config = @import("../config.zig").Config;
const io = @import("../utils/io.zig");
const json = @import("../utils/json.zig");
const manifest = @import("../utils/manifest.zig");
pub const Options = struct {
json: bool = false,
@ -47,7 +49,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
return error.InvalidArgs;
}
const manifest_path = resolveManifestPathWithBase(allocator, target_path.?, opts.base) catch |err| {
const manifest_path = manifest.resolvePathWithBase(allocator, target_path.?, opts.base) catch |err| {
if (err == error.FileNotFound) {
colors.printError(
"Could not locate run_manifest.json for '{s}'. Provide a path, or use --base <path> to scan finished/failed/running/pending.\n",
@ -58,7 +60,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
};
defer allocator.free(manifest_path);
const data = try readFileAlloc(allocator, manifest_path);
const data = try manifest.readFileAlloc(allocator, manifest_path);
defer allocator.free(data);
if (opts.json) {
@ -77,31 +79,31 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
const root = parsed.value.object;
const run_id = jsonGetString(root, "run_id") orelse "";
const task_id = jsonGetString(root, "task_id") orelse "";
const job_name = jsonGetString(root, "job_name") orelse "";
const run_id = json.getString(root, "run_id") orelse "";
const task_id = json.getString(root, "task_id") orelse "";
const job_name = json.getString(root, "job_name") orelse "";
const commit_id = jsonGetString(root, "commit_id") orelse "";
const worker_version = jsonGetString(root, "worker_version") orelse "";
const podman_image = jsonGetString(root, "podman_image") orelse "";
const commit_id = json.getString(root, "commit_id") orelse "";
const worker_version = json.getString(root, "worker_version") orelse "";
const podman_image = json.getString(root, "podman_image") orelse "";
const snapshot_id = jsonGetString(root, "snapshot_id") orelse "";
const snapshot_sha = jsonGetString(root, "snapshot_sha256") orelse "";
const snapshot_id = json.getString(root, "snapshot_id") orelse "";
const snapshot_sha = json.getString(root, "snapshot_sha256") orelse "";
const command = jsonGetString(root, "command") orelse "";
const cmd_args = jsonGetString(root, "args") orelse "";
const command = json.getString(root, "command") orelse "";
const cmd_args = json.getString(root, "args") orelse "";
const exit_code = jsonGetInt(root, "exit_code");
const err_msg = jsonGetString(root, "error") orelse "";
const exit_code = json.getInt(root, "exit_code");
const err_msg = json.getString(root, "error") orelse "";
const created_at = jsonGetString(root, "created_at") orelse "";
const started_at = jsonGetString(root, "started_at") orelse "";
const ended_at = jsonGetString(root, "ended_at") orelse "";
const created_at = json.getString(root, "created_at") orelse "";
const started_at = json.getString(root, "started_at") orelse "";
const ended_at = json.getString(root, "ended_at") orelse "";
const staging_ms = jsonGetInt(root, "staging_duration_ms") orelse 0;
const exec_ms = jsonGetInt(root, "execution_duration_ms") orelse 0;
const finalize_ms = jsonGetInt(root, "finalize_duration_ms") orelse 0;
const total_ms = jsonGetInt(root, "total_duration_ms") orelse 0;
const staging_ms = json.getInt(root, "staging_duration_ms") orelse 0;
const exec_ms = json.getInt(root, "execution_duration_ms") orelse 0;
const finalize_ms = json.getInt(root, "finalize_duration_ms") orelse 0;
const total_ms = json.getInt(root, "total_duration_ms") orelse 0;
colors.printInfo("run_manifest: {s}\n", .{manifest_path});
@ -148,137 +150,6 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
}
}
fn resolveManifestPath(allocator: std.mem.Allocator, input: []const u8) ![]u8 {
return resolveManifestPathWithBase(allocator, input, null);
}
fn resolveManifestPathWithBase(
allocator: std.mem.Allocator,
input: []const u8,
base_override: ?[]const u8,
) ![]u8 {
var cwd = std.fs.cwd();
if (std.fs.path.isAbsolute(input)) {
if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| {
var mutable_dir = dir;
defer mutable_dir.close();
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
if (std.fs.openFileAbsolute(input, .{}) catch null) |file| {
var mutable_file = file;
defer mutable_file.close();
return allocator.dupe(u8, input);
}
return resolveManifestPathById(allocator, input, base_override);
}
const stat = cwd.statFile(input) catch |err| {
if (err == error.FileNotFound) {
return resolveManifestPathById(allocator, input, base_override);
}
return err;
};
if (stat.kind == .directory) {
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
return allocator.dupe(u8, input);
}
fn resolveManifestPathById(
allocator: std.mem.Allocator,
id: []const u8,
base_override: ?[]const u8,
) ![]u8 {
if (std.mem.trim(u8, id, " \t\r\n").len == 0) {
return error.FileNotFound;
}
var cfg: ?Config = null;
defer if (cfg) |*c| c.deinit(allocator);
const base_path: []const u8 = blk: {
if (base_override) |b| break :blk b;
cfg = Config.load(allocator) catch {
break :blk "";
};
break :blk cfg.?.worker_base;
};
if (base_path.len == 0) {
return error.FileNotFound;
}
const roots = [_][]const u8{ "finished", "failed", "running", "pending" };
for (roots) |root| {
const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root });
defer allocator.free(root_path);
var dir = if (std.fs.path.isAbsolute(root_path))
(std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue)
else
(std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue);
defer dir.close();
var it = dir.iterate();
while (try it.next()) |entry| {
if (entry.kind != .directory) continue;
const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name });
defer allocator.free(run_dir);
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" });
defer allocator.free(manifest_path);
const file = if (std.fs.path.isAbsolute(manifest_path))
(std.fs.openFileAbsolute(manifest_path, .{}) catch continue)
else
(std.fs.cwd().openFile(manifest_path, .{}) catch continue);
defer file.close();
const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue;
defer allocator.free(data);
const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue;
defer parsed.deinit();
if (parsed.value != .object) continue;
const obj = parsed.value.object;
const run_id = jsonGetString(obj, "run_id") orelse "";
const task_id = jsonGetString(obj, "task_id") orelse "";
if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) {
return allocator.dupe(u8, manifest_path);
}
}
}
return error.FileNotFound;
}
fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
var file = if (std.fs.path.isAbsolute(path))
try std.fs.openFileAbsolute(path, .{})
else
try std.fs.cwd().openFile(path, .{});
defer file.close();
const max_bytes: usize = 10 * 1024 * 1024;
return file.readToEndAlloc(allocator, max_bytes);
}
fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 {
const v = obj.get(key) orelse return null;
if (v == .string) return v.string;
return null;
}
fn jsonGetInt(obj: std.json.ObjectMap, key: []const u8) ?i64 {
const v = obj.get(key) orelse return null;
return switch (v) {
.integer => v.integer,
else => null,
};
}
fn printUsage() !void {
colors.printInfo("Usage:\n", .{});
std.debug.print(" ml info <run_dir_or_manifest_path_or_id> [--json] [--base <path>]\n", .{});
@ -295,7 +166,7 @@ test "resolveManifestPath uses run_manifest.json for directories" {
try tmp.dir.makeDir("run");
const run_abs = try tmp.dir.realpathAlloc(allocator, "run");
defer allocator.free(run_abs);
const got = try resolveManifestPath(allocator, run_abs);
const got = try manifest.resolvePathWithBase(allocator, run_abs, null);
try std.testing.expect(std.mem.endsWith(u8, got, "run/run_manifest.json"));
}
@ -321,6 +192,6 @@ test "resolveManifestPath resolves by task id when base is provided" {
const base_abs = try tmp.dir.realpathAlloc(allocator, ".");
defer allocator.free(base_abs);
const got = try resolveManifestPathWithBase(allocator, "task-123", base_abs);
const got = try manifest.resolvePathWithBase(allocator, "task-123", base_abs);
try std.testing.expect(std.mem.endsWith(u8, got, "finished/run-a/run_manifest.json"));
}

View file

@ -4,6 +4,8 @@ const Config = @import("../config.zig").Config;
const crypto = @import("../utils/crypto.zig");
const io = @import("../utils/io.zig");
const ws = @import("../net/ws/client.zig");
const protocol = @import("../net/protocol.zig");
const manifest = @import("../utils/manifest.zig");
pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
if (argv.len == 0) {
@ -98,7 +100,7 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
}
const resolved_base = base_override orelse cfg.worker_base;
const manifest_path = resolveManifestPathWithBase(allocator, target, resolved_base) catch |err| {
const manifest_path = manifest.resolvePathWithBase(allocator, target, resolved_base) catch |err| {
if (err == error.FileNotFound) {
colors.printError(
"Could not locate run_manifest.json for '{s}'. Provide a path, or use --base <path> to scan finished/failed/running/pending.\n",
@ -109,7 +111,7 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
};
defer allocator.free(manifest_path);
const job_name = try readJobNameFromManifest(allocator, manifest_path);
const job_name = try manifest.readJobNameFromManifest(allocator, manifest_path);
defer allocator.free(job_name);
const patch_json = try buildPatchJSON(
@ -139,16 +141,12 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
const msg = try client.receiveMessage(allocator);
defer allocator.free(msg);
const packet = @import("../net/protocol.zig").ResponsePacket.deserialize(msg, allocator) catch {
const packet = protocol.ResponsePacket.deserialize(msg, allocator) catch {
var out = io.stdoutWriter();
try out.print("{s}\n", .{msg});
return error.InvalidPacket;
};
defer {
if (packet.success_message) |m| allocator.free(m);
if (packet.error_message) |m| allocator.free(m);
if (packet.error_details) |m| allocator.free(m);
}
defer packet.deinit(allocator);
const Result = struct {
ok: bool,
@ -238,123 +236,6 @@ fn buildPatchJSON(
return out.toOwnedSlice(allocator);
}
fn readJobNameFromManifest(allocator: std.mem.Allocator, manifest_path: []const u8) ![]u8 {
const data = try readFileAlloc(allocator, manifest_path);
defer allocator.free(data);
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{});
defer parsed.deinit();
if (parsed.value != .object) return error.InvalidManifest;
const root = parsed.value.object;
const job_name = jsonGetString(root, "job_name") orelse "";
if (std.mem.trim(u8, job_name, " \t\r\n").len == 0) {
return error.InvalidManifest;
}
return allocator.dupe(u8, job_name);
}
fn resolveManifestPathWithBase(allocator: std.mem.Allocator, input: []const u8, base: []const u8) ![]u8 {
var cwd = std.fs.cwd();
if (std.fs.path.isAbsolute(input)) {
if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| {
var mutable_dir = dir;
defer mutable_dir.close();
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
if (std.fs.openFileAbsolute(input, .{}) catch null) |file| {
var mutable_file = file;
defer mutable_file.close();
return allocator.dupe(u8, input);
}
return resolveManifestPathById(allocator, input, base);
}
const stat = cwd.statFile(input) catch |err| {
if (err == error.FileNotFound) {
return resolveManifestPathById(allocator, input, base);
}
return err;
};
if (stat.kind == .directory) {
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
return allocator.dupe(u8, input);
}
fn resolveManifestPathById(allocator: std.mem.Allocator, id: []const u8, base_path: []const u8) ![]u8 {
if (std.mem.trim(u8, id, " \t\r\n").len == 0) {
return error.FileNotFound;
}
if (base_path.len == 0) {
return error.FileNotFound;
}
const roots = [_][]const u8{ "finished", "failed", "running", "pending" };
for (roots) |root| {
const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root });
defer allocator.free(root_path);
var dir = if (std.fs.path.isAbsolute(root_path))
(std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue)
else
(std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue);
defer dir.close();
var it = dir.iterate();
while (try it.next()) |entry| {
if (entry.kind != .directory) continue;
const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name });
defer allocator.free(run_dir);
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" });
defer allocator.free(manifest_path);
const file = if (std.fs.path.isAbsolute(manifest_path))
(std.fs.openFileAbsolute(manifest_path, .{}) catch continue)
else
(std.fs.cwd().openFile(manifest_path, .{}) catch continue);
defer file.close();
const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue;
defer allocator.free(data);
const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue;
defer parsed.deinit();
if (parsed.value != .object) continue;
const obj = parsed.value.object;
const run_id = jsonGetString(obj, "run_id") orelse "";
const task_id = jsonGetString(obj, "task_id") orelse "";
if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) {
return allocator.dupe(u8, manifest_path);
}
}
}
return error.FileNotFound;
}
fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
var file = if (std.fs.path.isAbsolute(path))
try std.fs.openFileAbsolute(path, .{})
else
try std.fs.cwd().openFile(path, .{});
defer file.close();
return file.readToEndAlloc(allocator, 1024 * 1024);
}
fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 {
const v = obj.get(key) orelse return null;
if (v != .string) return null;
return v.string;
}
fn printUsage() !void {
colors.printInfo("Usage: ml narrative set <path|run_id|task_id> [fields]\n", .{});
colors.printInfo("\nFields:\n", .{});

View file

@ -4,6 +4,8 @@ const Config = @import("../config.zig").Config;
const crypto = @import("../utils/crypto.zig");
const ws = @import("../net/ws/client.zig");
const protocol = @import("../net/protocol.zig");
const manifest = @import("../utils/manifest.zig");
const json = @import("../utils/json.zig");
pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
if (argv.len == 0) {
@ -138,10 +140,10 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
};
if (commit_hex.len == 0) {
const manifest_path = try resolveManifestPath(allocator, target, cfg.worker_base);
const manifest_path = try manifest.resolvePathWithBase(allocator, target, cfg.worker_base);
defer allocator.free(manifest_path);
const data = try readFileAlloc(allocator, manifest_path);
const data = try manifest.readFileAlloc(allocator, manifest_path);
defer allocator.free(data);
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{});
@ -150,14 +152,14 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
if (parsed.value != .object) return error.InvalidManifest;
const root = parsed.value.object;
commit_hex = jsonGetString(root, "commit_id") orelse "";
commit_hex = json.getString(root, "commit_id") orelse "";
if (commit_hex.len != 40) {
colors.printError("run manifest missing commit_id\n", .{});
return error.InvalidManifest;
}
if (job_name_override == null) {
const j = jsonGetString(root, "job_name") orelse "";
const j = json.getString(root, "job_name") orelse "";
if (j.len > 0) job_name = j;
}
@ -220,13 +222,7 @@ pub fn run(allocator: std.mem.Allocator, argv: []const []const u8) !void {
}
return;
};
defer {
if (packet.success_message) |m| allocator.free(m);
if (packet.error_message) |m| allocator.free(m);
if (packet.error_details) |m| allocator.free(m);
if (packet.data_payload) |m| allocator.free(m);
if (packet.data_type) |m| allocator.free(m);
}
defer packet.deinit(allocator);
switch (packet.packet_type) {
.success => {
@ -289,6 +285,11 @@ fn handleDuplicateResponse(
}
}
fn printUsage() !void {
colors.printInfo("Usage:\n", .{});
colors.printInfo(" ml requeue <commit_id|run_id|task_id|path> [--name <job>] [--priority <n>] [--cpu <n>] [--memory <gb>] [--gpu <n>] [--gpu-memory <gb>] [--args <string>] [--note <string>] [--force] -- <args...>\n", .{});
}
fn isHexLowerOrUpper(s: []const u8) bool {
for (s) |c| {
if (!std.ascii.isHex(c)) return false;
@ -325,101 +326,3 @@ fn resolveCommitPrefix(allocator: std.mem.Allocator, base_path: []const u8, pref
colors.printError("No commit matches prefix: {s}\n", .{prefix});
return error.FileNotFound;
}
fn resolveManifestPath(allocator: std.mem.Allocator, input: []const u8, base_path: []const u8) ![]u8 {
var cwd = std.fs.cwd();
if (std.fs.path.isAbsolute(input)) {
if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| {
var mutable_dir = dir;
defer mutable_dir.close();
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
if (std.fs.openFileAbsolute(input, .{}) catch null) |file| {
var mutable_file = file;
defer mutable_file.close();
return allocator.dupe(u8, input);
}
return resolveManifestPathById(allocator, input, base_path);
}
const stat = cwd.statFile(input) catch |err| {
if (err == error.FileNotFound) {
return resolveManifestPathById(allocator, input, base_path);
}
return err;
};
if (stat.kind == .directory) {
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
return allocator.dupe(u8, input);
}
fn resolveManifestPathById(allocator: std.mem.Allocator, id: []const u8, base_path: []const u8) ![]u8 {
const roots = [_][]const u8{ "finished", "failed", "running", "pending" };
for (roots) |root| {
const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root });
defer allocator.free(root_path);
var dir = if (std.fs.path.isAbsolute(root_path))
(std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue)
else
(std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue);
defer dir.close();
var it = dir.iterate();
while (try it.next()) |entry| {
if (entry.kind != .directory) continue;
const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name });
defer allocator.free(run_dir);
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" });
defer allocator.free(manifest_path);
const file = if (std.fs.path.isAbsolute(manifest_path))
(std.fs.openFileAbsolute(manifest_path, .{}) catch continue)
else
(std.fs.cwd().openFile(manifest_path, .{}) catch continue);
defer file.close();
const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue;
defer allocator.free(data);
const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue;
defer parsed.deinit();
if (parsed.value != .object) continue;
const obj = parsed.value.object;
const run_id = jsonGetString(obj, "run_id") orelse "";
const task_id = jsonGetString(obj, "task_id") orelse "";
if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) {
return allocator.dupe(u8, manifest_path);
}
}
}
return error.FileNotFound;
}
fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
var file = if (std.fs.path.isAbsolute(path))
try std.fs.openFileAbsolute(path, .{})
else
try std.fs.cwd().openFile(path, .{});
defer file.close();
return try file.readToEndAlloc(allocator, 1024 * 1024);
}
fn jsonGetString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 {
const v = obj.get(key) orelse return null;
if (v != .string) return null;
return v.string;
}
fn printUsage() !void {
colors.printInfo("Usage:\n", .{});
colors.printInfo(" ml requeue <commit_id|run_id|task_id|path> [--name <job>] [--priority <n>] [--cpu <n>] [--memory <gb>] [--gpu <n>] [--gpu-memory <gb>] [--args <string>] [--note <string>] [--force] -- <args...>\n", .{});
}

View file

@ -295,6 +295,18 @@ pub const ResponsePacket = struct {
else => "UNKNOWN",
};
}
/// Free all allocated memory in the packet
pub fn deinit(self: ResponsePacket, allocator: std.mem.Allocator) void {
if (self.success_message) |msg| allocator.free(msg);
if (self.error_message) |msg| allocator.free(msg);
if (self.error_details) |details| allocator.free(details);
if (self.progress_message) |msg| allocator.free(msg);
if (self.status_data) |data| allocator.free(data);
if (self.data_type) |dtype| allocator.free(dtype);
if (self.data_payload) |payload| allocator.free(payload);
if (self.log_message) |msg| allocator.free(msg);
}
};
/// Helper function to write string with length prefix
@ -390,13 +402,7 @@ test "deserialize data packet (varint lengths)" {
try buf.appendSlice(allocator, "{}");
const packet = try ResponsePacket.deserialize(buf.items, allocator);
defer {
if (packet.success_message) |msg| allocator.free(msg);
if (packet.error_message) |msg| allocator.free(msg);
if (packet.error_details) |details| allocator.free(details);
if (packet.data_type) |dtype| allocator.free(dtype);
if (packet.data_payload) |payload| allocator.free(payload);
}
defer packet.deinit(allocator);
try std.testing.expectEqual(PacketType.data, packet.packet_type);
try std.testing.expectEqual(@as(u64, 1), packet.timestamp);

View file

@ -5,7 +5,9 @@ pub const crypto = @import("utils/crypto.zig");
pub const flags = @import("utils/flags.zig");
pub const history = @import("utils/history.zig");
pub const io = @import("utils/io.zig");
pub const json = @import("utils/json.zig");
pub const logging = @import("utils/logging.zig");
pub const manifest = @import("utils/manifest.zig");
pub const rsync = @import("utils/rsync.zig");
pub const rsync_embedded = @import("utils/rsync_embedded.zig");
pub const rsync_embedded_binary = @import("utils/rsync_embedded_binary.zig");

35
cli/src/utils/json.zig Normal file
View file

@ -0,0 +1,35 @@
const std = @import("std");
/// Get a string value from a JSON object map
pub fn getString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 {
const v = obj.get(key) orelse return null;
if (v != .string) return null;
return v.string;
}
/// Get an integer value from a JSON object map
pub fn getInt(obj: std.json.ObjectMap, key: []const u8) ?i64 {
const v = obj.get(key) orelse return null;
switch (v) {
.integer => |i| return i,
.float => |f| return @intFromFloat(f),
else => return null,
}
}
/// Get a float value from a JSON object map
pub fn getFloat(obj: std.json.ObjectMap, key: []const u8) ?f64 {
const v = obj.get(key) orelse return null;
switch (v) {
.float => |f| return f,
.integer => |i| return @floatFromInt(i),
else => return null,
}
}
/// Get a boolean value from a JSON object map
pub fn getBool(obj: std.json.ObjectMap, key: []const u8) ?bool {
const v = obj.get(key) orelse return null;
if (v != .bool) return null;
return v.bool;
}

130
cli/src/utils/manifest.zig Normal file
View file

@ -0,0 +1,130 @@
const std = @import("std");
const json = @import("json.zig");
/// Read entire file into allocated memory
pub fn readFileAlloc(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
var file = if (std.fs.path.isAbsolute(path))
try std.fs.openFileAbsolute(path, .{})
else
try std.fs.cwd().openFile(path, .{});
defer file.close();
return file.readToEndAlloc(allocator, 1024 * 1024);
}
/// Resolve manifest path from input (path, run_id, or task_id)
pub fn resolvePathWithBase(
allocator: std.mem.Allocator,
input: []const u8,
base: ?[]const u8,
) ![]u8 {
var cwd = std.fs.cwd();
if (std.fs.path.isAbsolute(input)) {
if (std.fs.openDirAbsolute(input, .{}) catch null) |dir| {
var mutable_dir = dir;
defer mutable_dir.close();
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
if (std.fs.openFileAbsolute(input, .{}) catch null) |file| {
var mutable_file = file;
defer mutable_file.close();
return allocator.dupe(u8, input);
}
if (base) |b| {
return resolvePathById(allocator, input, b);
}
return error.FileNotFound;
}
const stat = cwd.statFile(input) catch |err| {
if (err == error.FileNotFound) {
if (base) |b| {
return resolvePathById(allocator, input, b);
}
}
return err;
};
if (stat.kind == .directory) {
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
}
return allocator.dupe(u8, input);
}
/// Resolve manifest path by searching for run_id or task_id in base directory
pub fn resolvePathById(
allocator: std.mem.Allocator,
id: []const u8,
base_path: []const u8,
) ![]u8 {
if (std.mem.trim(u8, id, " \t\r\n").len == 0) {
return error.FileNotFound;
}
if (base_path.len == 0) {
return error.FileNotFound;
}
const roots = [_][]const u8{ "finished", "failed", "running", "pending" };
for (roots) |root| {
const root_path = try std.fs.path.join(allocator, &[_][]const u8{ base_path, root });
defer allocator.free(root_path);
var dir = if (std.fs.path.isAbsolute(root_path))
(std.fs.openDirAbsolute(root_path, .{ .iterate = true }) catch continue)
else
(std.fs.cwd().openDir(root_path, .{ .iterate = true }) catch continue);
defer dir.close();
var it = dir.iterate();
while (try it.next()) |entry| {
if (entry.kind != .directory) continue;
const run_dir = try std.fs.path.join(allocator, &[_][]const u8{ root_path, entry.name });
defer allocator.free(run_dir);
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{ run_dir, "run_manifest.json" });
defer allocator.free(manifest_path);
const file = if (std.fs.path.isAbsolute(manifest_path))
(std.fs.openFileAbsolute(manifest_path, .{}) catch continue)
else
(std.fs.cwd().openFile(manifest_path, .{}) catch continue);
defer file.close();
const data = file.readToEndAlloc(allocator, 1024 * 1024) catch continue;
defer allocator.free(data);
const parsed = std.json.parseFromSlice(std.json.Value, allocator, data, .{}) catch continue;
defer parsed.deinit();
if (parsed.value != .object) continue;
const obj = parsed.value.object;
const run_id = json.getString(obj, "run_id") orelse "";
const task_id = json.getString(obj, "task_id") orelse "";
if (std.mem.eql(u8, run_id, id) or std.mem.eql(u8, task_id, id)) {
return allocator.dupe(u8, manifest_path);
}
}
}
return error.FileNotFound;
}
/// Read job_name from a manifest file
pub fn readJobNameFromManifest(allocator: std.mem.Allocator, manifest_path: []const u8) ![]u8 {
const data = try readFileAlloc(allocator, manifest_path);
defer allocator.free(data);
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, data, .{});
defer parsed.deinit();
if (parsed.value != .object) return error.InvalidManifest;
const root = parsed.value.object;
const job_name = json.getString(root, "job_name") orelse "";
if (std.mem.trim(u8, job_name, " \t\r\n").len == 0) {
return error.InvalidManifest;
}
return allocator.dupe(u8, job_name);
}