refactor(cli): Update network handlers and info command

Replace Unicode symbols with ASCII in handshake.zig

Add [OK]/[FAIL] status indicators in response_handlers.zig

Simplify info.zig output formatting
This commit is contained in:
Jeremie Fraeys 2026-02-23 14:12:22 -05:00
parent 2b7319dc2e
commit fd317c9791
No known key found for this signature in database
3 changed files with 85 additions and 213 deletions

View file

@ -1,5 +1,4 @@
const std = @import("std");
const colors = @import("../utils/colors.zig");
const Config = @import("../config.zig").Config;
const io = @import("../utils/io.zig");
const json = @import("../utils/json.zig");
@ -27,23 +26,23 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
} else if (std.mem.startsWith(u8, arg, "--help")) {
return printUsage();
} else if (std.mem.startsWith(u8, arg, "--")) {
core.output.errorMsg("info", "Unknown option");
core.output.err("Unknown option");
return error.InvalidArgs;
} else {
target_path = arg;
}
}
core.output.init(if (flags.json) .json else .text);
core.output.setMode(if (flags.json) .json else .text);
if (target_path == null) {
core.output.errorMsg("info", "No target path specified");
core.output.err("No target path specified");
return printUsage();
}
const manifest_path = manifest.resolvePathWithBase(allocator, target_path.?, base) catch |err| {
if (err == error.FileNotFound) {
core.output.errorMsgDetailed("info", "Manifest not found", "Provide a path or use --base <path>");
core.output.err("Manifest not found");
}
return err;
};
@ -64,7 +63,7 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
defer parsed.deinit();
if (parsed.value != .object) {
colors.printError("run manifest is not a JSON object\n", .{});
core.output.err("run manifest is not a JSON object");
return error.InvalidManifest;
}
@ -96,54 +95,51 @@ pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
const finalize_ms = json.getInt(root, "finalize_duration_ms") orelse 0;
const total_ms = json.getInt(root, "total_duration_ms") orelse 0;
colors.printInfo("run_manifest: {s}\n", .{manifest_path});
std.debug.print("run_manifest\t{s}\n", .{manifest_path});
if (job_name.len > 0) colors.printInfo("job_name: {s}\n", .{job_name});
if (run_id.len > 0) colors.printInfo("run_id: {s}\n", .{run_id});
if (task_id.len > 0) colors.printInfo("task_id: {s}\n", .{task_id});
if (job_name.len > 0) std.debug.print("job_name\t{s}\n", .{job_name});
if (run_id.len > 0) std.debug.print("run_id\t{s}\n", .{run_id});
if (task_id.len > 0) std.debug.print("task_id\t{s}\n", .{task_id});
if (commit_id.len > 0) colors.printInfo("commit_id: {s}\n", .{commit_id});
if (worker_version.len > 0) colors.printInfo("worker_version: {s}\n", .{worker_version});
if (podman_image.len > 0) colors.printInfo("podman_image: {s}\n", .{podman_image});
if (commit_id.len > 0) std.debug.print("commit_id\t{s}\n", .{commit_id});
if (worker_version.len > 0) std.debug.print("worker_version\t{s}\n", .{worker_version});
if (podman_image.len > 0) std.debug.print("podman_image\t{s}\n", .{podman_image});
if (snapshot_id.len > 0) colors.printInfo("snapshot_id: {s}\n", .{snapshot_id});
if (snapshot_sha.len > 0) colors.printInfo("snapshot_sha256: {s}\n", .{snapshot_sha});
if (snapshot_id.len > 0) std.debug.print("snapshot_id\t{s}\n", .{snapshot_id});
if (snapshot_sha.len > 0) std.debug.print("snapshot_sha256\t{s}\n", .{snapshot_sha});
if (command.len > 0) {
if (cmd_args.len > 0) {
colors.printInfo("command: {s} {s}\n", .{ command, cmd_args });
std.debug.print("command\t{s} {s}\n", .{ command, cmd_args });
} else {
colors.printInfo("command: {s}\n", .{command});
std.debug.print("command\t{s}\n", .{command});
}
}
if (created_at.len > 0) colors.printInfo("created_at: {s}\n", .{created_at});
if (started_at.len > 0) colors.printInfo("started_at: {s}\n", .{started_at});
if (ended_at.len > 0) colors.printInfo("ended_at: {s}\n", .{ended_at});
if (created_at.len > 0) std.debug.print("created_at\t{s}\n", .{created_at});
if (started_at.len > 0) std.debug.print("started_at\t{s}\n", .{started_at});
if (ended_at.len > 0) std.debug.print("ended_at\t{s}\n", .{ended_at});
if (total_ms > 0 or staging_ms > 0 or exec_ms > 0 or finalize_ms > 0) {
colors.printInfo(
"durations_ms: total={d} staging={d} execution={d} finalize={d}\n",
.{ total_ms, staging_ms, exec_ms, finalize_ms },
);
std.debug.print("durations_ms\ttotal={d}\tstaging={d}\texecution={d}\tfinalize={d}\n", .{ total_ms, staging_ms, exec_ms, finalize_ms });
}
if (exit_code) |ec| {
if (ec == 0 and err_msg.len == 0) {
colors.printSuccess("exit_code: 0\n", .{});
std.debug.print("exit_code\t0\n", .{});
} else {
colors.printWarning("exit_code: {d}\n", .{ec});
std.debug.print("exit_code\t{d}\n", .{ec});
}
}
if (err_msg.len > 0) {
colors.printWarning("error: {s}\n", .{err_msg});
std.debug.print("error\t{s}\n", .{err_msg});
}
}
fn printUsage() !void {
colors.printInfo("Usage:\n", .{});
std.debug.print(" ml info <run_dir_or_manifest_path_or_id> [--json] [--base <path>]\n", .{});
std.debug.print("Usage:\n", .{});
std.debug.print("\tml info <run_dir_or_manifest_path_or_id> [--json] [--base <path>]\n", .{});
}
test "resolveManifestPath uses run_manifest.json for directories" {

View file

@ -54,48 +54,48 @@ pub fn handshake(
if (std.mem.indexOf(u8, response, "101 Switching Protocols") == null) {
if (std.mem.indexOf(u8, response, "404 Not Found") != null) {
std.debug.print("\nWebSocket Connection Failed\n", .{});
std.debug.print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n", .{});
std.debug.print("\nWebSocket Connection Failed\n", .{});
std.debug.print("-------------------------------------------------------\n\n", .{});
std.debug.print("The WebSocket endpoint '/ws' was not found on the server.\n\n", .{});
std.debug.print("This usually means:\n", .{});
std.debug.print(" API server is not running\n", .{});
std.debug.print(" Incorrect server address in config\n", .{});
std.debug.print(" Different service running on that port\n\n", .{});
std.debug.print("\t* API server is not running\n", .{});
std.debug.print("\t* Incorrect server address in config\n", .{});
std.debug.print("\t* Different service running on that port\n\n", .{});
std.debug.print("To diagnose:\n", .{});
std.debug.print(" Verify server address: Check ~/.ml/config.toml\n", .{});
std.debug.print(" Test connectivity: curl http://<server>:<port>/health\n", .{});
std.debug.print(" Contact your server administrator if the issue persists\n\n", .{});
std.debug.print("\t* Verify server address: Check ~/.ml/config.toml\n", .{});
std.debug.print("\t* Test connectivity: curl http://<server>:<port>/health\n", .{});
std.debug.print("\t* Contact your server administrator if the issue persists\n\n", .{});
return error.EndpointNotFound;
} else if (std.mem.indexOf(u8, response, "401 Unauthorized") != null) {
std.debug.print("\nAuthentication Failed\n", .{});
std.debug.print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n", .{});
std.debug.print("\nAuthentication Failed\n", .{});
std.debug.print("-------------------------------------------------------\n\n", .{});
std.debug.print("Invalid or missing API key.\n\n", .{});
std.debug.print("To fix:\n", .{});
std.debug.print(" Verify API key in ~/.ml/config.toml matches server configuration\n", .{});
std.debug.print(" Request a new API key from your administrator if needed\n\n", .{});
std.debug.print("\t* Verify API key in ~/.ml/config.toml matches server configuration\n", .{});
std.debug.print("\t* Request a new API key from your administrator if needed\n\n", .{});
return error.AuthenticationFailed;
} else if (std.mem.indexOf(u8, response, "403 Forbidden") != null) {
std.debug.print("\nAccess Denied\n", .{});
std.debug.print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n", .{});
std.debug.print("\nAccess Denied\n", .{});
std.debug.print("-------------------------------------------------------\n\n", .{});
std.debug.print("Your API key doesn't have permission for this operation.\n\n", .{});
std.debug.print("To fix:\n", .{});
std.debug.print(" Contact your administrator to grant necessary permissions\n\n", .{});
std.debug.print("\t* Contact your administrator to grant necessary permissions\n\n", .{});
return error.PermissionDenied;
} else if (std.mem.indexOf(u8, response, "503 Service Unavailable") != null) {
std.debug.print("\nServer Unavailable\n", .{});
std.debug.print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n", .{});
std.debug.print("\nServer Unavailable\n", .{});
std.debug.print("-------------------------------------------------------\n\n", .{});
std.debug.print("The server is temporarily unavailable.\n\n", .{});
std.debug.print("This could be due to:\n", .{});
std.debug.print(" Server maintenance\n", .{});
std.debug.print(" High load\n", .{});
std.debug.print(" Server restart\n\n", .{});
std.debug.print("\t* Server maintenance\n", .{});
std.debug.print("\t* High load\n", .{});
std.debug.print("\t* Server restart\n\n", .{});
std.debug.print("To resolve:\n", .{});
std.debug.print(" Wait a moment and try again\n", .{});
std.debug.print(" Contact administrator if the issue persists\n\n", .{});
std.debug.print("\t* Wait a moment and try again\n", .{});
std.debug.print("\t* Contact administrator if the issue persists\n\n", .{});
return error.ServerUnavailable;
} else {
std.debug.print("\nWebSocket Handshake Failed\n", .{});
std.debug.print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n", .{});
std.debug.print("\nWebSocket Handshake Failed\n", .{});
std.debug.print("-------------------------------------------------------\n\n", .{});
std.debug.print("Expected HTTP 101 Switching Protocols, but received:\n", .{});
const newline_pos = std.mem.indexOf(u8, response, "\r\n") orelse response.len;
@ -103,9 +103,9 @@ pub fn handshake(
std.debug.print(" {s}\n\n", .{status_line});
std.debug.print("To diagnose:\n", .{});
std.debug.print(" Verify server address in ~/.ml/config.toml\n", .{});
std.debug.print(" Check network connectivity to the server\n", .{});
std.debug.print(" Contact your administrator for assistance\n\n", .{});
std.debug.print("\t* Verify server address in ~/.ml/config.toml\n", .{});
std.debug.print("\t* Check network connectivity to the server\n", .{});
std.debug.print("\t* Contact your administrator for assistance\n\n", .{});
return error.HandshakeFailed;
}
}

View file

@ -2,7 +2,6 @@ const deps = @import("deps.zig");
const std = deps.std;
const io = deps.io;
const protocol = deps.protocol;
const colors = deps.colors;
const Client = @import("client.zig").Client;
const utils = @import("utils.zig");
@ -42,7 +41,7 @@ pub fn receiveAndHandleStatusResponse(self: *Client, allocator: std.mem.Allocato
try parseAndDisplayStatusJson(allocator, json_data, options);
}
} else if (packet.packet_type == .error_packet) {
colors.printError("Error: {s}\n", .{packet.error_message orelse "Unknown error"});
std.debug.print("Error: {s}\n", .{packet.error_message orelse "Unknown error"});
} else {
std.debug.print("Unexpected packet type: {s}\n", .{@tagName(packet.packet_type)});
}
@ -61,156 +60,33 @@ fn parseAndDisplayStatusJson(allocator: std.mem.Allocator, json_data: []const u8
var out = io.stdoutWriter();
try out.print("{s}\n", .{json_data});
} else {
// Display user info
if (root.get("user")) |user_obj| {
const user = user_obj.object;
const name = user.get("name").?.string;
const admin = user.get("admin").?.bool;
colors.printInfo("Status retrieved for user: {s} (admin: {})\n", .{ name, admin });
}
// Display system summary
colors.printInfo("\n=== Queue Summary ===\n", .{});
// Display task summary
if (root.get("tasks")) |tasks_obj| {
const tasks = tasks_obj.object;
const total = tasks.get("total").?.integer;
const queued = tasks.get("queued").?.integer;
const running = tasks.get("running").?.integer;
const failed = tasks.get("failed").?.integer;
const completed = tasks.get("completed").?.integer;
colors.printInfo(
"Total: {d} | Queued: {d} | Running: {d} | Failed: {d} | Completed: {d}\n",
.{ total, queued, running, failed, completed },
);
}
// Display queue depth if available
if (root.get("queue_length")) |ql| {
if (ql == .integer) {
colors.printInfo("Queue depth: {d}\n", .{ql.integer});
}
}
const per_section_limit: usize = options.limit orelse 5;
const TaskStatus = enum { queued, running, failed, completed };
const TaskPrinter = struct {
fn statusLabel(s: TaskStatus) []const u8 {
return switch (s) {
.queued => "Queued",
.running => "Running",
.failed => "Failed",
.completed => "Completed",
};
}
fn statusMatch(s: TaskStatus) []const u8 {
return switch (s) {
.queued => "queued",
.running => "running",
.failed => "failed",
.completed => "completed",
};
}
fn shorten(s: []const u8, max_len: usize) []const u8 {
if (s.len <= max_len) return s;
return s[0..max_len];
}
fn printSection(
allocator2: std.mem.Allocator,
queue_items: []const std.json.Value,
status: TaskStatus,
limit2: usize,
) !void {
_ = allocator2;
const label = statusLabel(status);
const want = statusMatch(status);
colors.printInfo("\n{s}:\n", .{label});
var shown: usize = 0;
var position: usize = 0;
for (queue_items) |item| {
if (item != .object) continue;
const obj = item.object;
const st = utils.jsonGetString(obj, "status") orelse "";
if (!std.mem.eql(u8, st, want)) continue;
position += 1;
if (shown >= limit2) continue;
const id = utils.jsonGetString(obj, "id") orelse "";
const job_name = utils.jsonGetString(obj, "job_name") orelse "";
const worker_id = utils.jsonGetString(obj, "worker_id") orelse "";
const err = utils.jsonGetString(obj, "error") orelse "";
const priority = utils.jsonGetInt(obj, "priority") orelse 5;
// Show queue position for queued jobs
const position_str = if (std.mem.eql(u8, want, "queued"))
try std.fmt.allocPrint(std.heap.page_allocator, " [pos {d}]", .{position})
else
"";
defer if (std.mem.eql(u8, want, "queued")) std.heap.page_allocator.free(position_str);
if (std.mem.eql(u8, want, "failed")) {
colors.printWarning("- {s} {s}{s} (P:{d})", .{ shorten(id, 8), job_name, position_str, priority });
if (worker_id.len > 0) {
std.debug.print(" worker={s}", .{worker_id});
}
std.debug.print("\n", .{});
if (err.len > 0) {
std.debug.print(" error: {s}\n", .{shorten(err, 160)});
}
} else if (std.mem.eql(u8, want, "running")) {
colors.printInfo("- {s} {s}{s} (P:{d})", .{ shorten(id, 8), job_name, position_str, priority });
if (worker_id.len > 0) {
std.debug.print(" worker={s}", .{worker_id});
}
std.debug.print("\n", .{});
} else if (std.mem.eql(u8, want, "queued")) {
std.debug.print("- {s} {s}{s} (P:{d})\n", .{ shorten(id, 8), job_name, position_str, priority });
} else {
colors.printSuccess("- {s} {s}{s} (P:{d})\n", .{ shorten(id, 8), job_name, position_str, priority });
}
shown += 1;
}
if (shown == 0) {
std.debug.print(" (none)\n", .{});
} else {
// Indicate there may be more.
var total_for_status: usize = 0;
for (queue_items) |item| {
if (item != .object) continue;
const obj = item.object;
const st = utils.jsonGetString(obj, "status") orelse "";
if (std.mem.eql(u8, st, want)) total_for_status += 1;
}
if (total_for_status > shown) {
std.debug.print(" ... and {d} more\n", .{total_for_status - shown});
}
}
}
};
// TSV output: one line per task
const per_section_limit: usize = options.limit orelse 1000;
if (root.get("queue")) |queue_val| {
if (queue_val == .array) {
const items = queue_val.array.items;
try TaskPrinter.printSection(allocator, items, .queued, per_section_limit);
try TaskPrinter.printSection(allocator, items, .running, per_section_limit);
try TaskPrinter.printSection(allocator, items, .failed, per_section_limit);
try TaskPrinter.printSection(allocator, items, .completed, per_section_limit);
}
}
var count: usize = 0;
if (try Client.formatPrewarmFromStatusRoot(allocator, root)) |section| {
defer allocator.free(section);
colors.printInfo("\n{s}", .{section});
for (items) |item| {
if (count >= per_section_limit) break;
if (item != .object) continue;
const obj = item.object;
const id = utils.jsonGetString(obj, "id") orelse "";
const job_name = utils.jsonGetString(obj, "job_name") orelse "";
const status = utils.jsonGetString(obj, "status") orelse "unknown";
const priority = utils.jsonGetInt(obj, "priority") orelse 5;
const worker_id = utils.jsonGetString(obj, "worker_id") orelse "";
const err_msg = utils.jsonGetString(obj, "error") orelse "";
// TSV: status, id, job_name, priority, worker_id, error
std.debug.print("{s}\t{s}\t{s}\t{d}\t{s}\t{s}\n", .{
status, id, job_name, priority, worker_id, err_msg,
});
count += 1;
}
}
}
}
}
@ -235,15 +111,15 @@ pub fn receiveAndHandleCancelResponse(self: *Client, allocator: std.mem.Allocato
// Display user-friendly output
if (root.get("success")) |success_val| {
if (success_val.bool) {
colors.printSuccess("Job '{s}' canceled successfully\n", .{job_name});
std.debug.print("Job '{s}' canceled successfully\n", .{job_name});
} else {
colors.printError("Failed to cancel job '{s}'\n", .{job_name});
std.debug.print("Failed to cancel job '{s}'\n", .{job_name});
if (root.get("error")) |error_val| {
colors.printError("Error: {s}\n", .{error_val.string});
std.debug.print("Error: {s}\n", .{error_val.string});
}
}
} else {
colors.printInfo("Job '{s}' cancellation processed for user: {s}\n", .{ job_name, user_context.name });
std.debug.print("Job '{s}' cancellation processed for user: {s}\n", .{ job_name, user_context.name });
}
}
} else {
@ -287,8 +163,8 @@ pub fn receiveAndHandleCancelResponse(self: *Client, allocator: std.mem.Allocato
} else if (std.mem.indexOf(u8, cleaned, "Authentication failed") != null) {
std.debug.print("Authentication failed\n", .{});
} else {
colors.printInfo("Job '{s}' cancellation processed for user: {s}\n", .{ job_name, user_context.name });
colors.printInfo("Response: {s}\n", .{cleaned});
std.debug.print("Job '{s}' cancellation processed for user: {s}\n", .{ job_name, user_context.name });
std.debug.print("Response: {s}\n", .{cleaned});
}
}
} else {
@ -309,17 +185,17 @@ pub fn handleResponsePacket(self: *Client, packet: protocol.ResponsePacket, oper
.success => {
if (packet.success_message) |msg| {
if (msg.len > 0) {
std.debug.print(" {s}: {s}\n", .{ operation, msg });
std.debug.print("[OK] {s}: {s}\n", .{ operation, msg });
} else {
std.debug.print(" {s} completed successfully\n", .{operation});
std.debug.print("[OK] {s} completed successfully\n", .{operation});
}
} else {
std.debug.print(" {s} completed successfully\n", .{operation});
std.debug.print("[OK] {s} completed successfully\n", .{operation});
}
},
.error_packet => {
const error_msg = protocol.ResponsePacket.getErrorMessage(packet.error_code.?);
std.debug.print(" {s} failed: {s}\n", .{ operation, error_msg });
std.debug.print("[FAIL] {s} failed: {s}\n", .{ operation, error_msg });
if (packet.error_message) |msg| {
if (msg.len > 0) {