fetch_ml/cli/src/commands/watch.zig
Jeremie Fraeys 34186675dc
refactor(cli): use config.getWebSocketUrl() helper across all commands
Replace inline WebSocket URL construction with Config.getWebSocketUrl()
helper method in all command files. This eliminates code duplication
and ensures consistent URL formatting across the CLI.

Files updated:
- annotate.zig, dataset.zig, experiment.zig, logs.zig
- narrative.zig, prune.zig, queue.zig, requeue.zig
- sync.zig, validate.zig, watch.zig

The helper properly handles ws:// vs wss:// based on port (443).
2026-02-18 13:05:43 -05:00

154 lines
5.3 KiB
Zig

const std = @import("std");
const Config = @import("../config.zig").Config;
const crypto = @import("../utils/crypto.zig");
const rsync = @import("../utils/rsync_embedded.zig");
const ws = @import("../net/ws/client.zig");
pub fn run(allocator: std.mem.Allocator, args: []const []const u8) !void {
if (args.len == 0) {
printUsage();
return error.InvalidArgs;
}
// Global flags
for (args) |arg| {
if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
printUsage();
return;
}
}
const path = args[0];
var job_name: ?[]const u8 = null;
var priority: u8 = 5;
var should_queue = false;
var json: bool = false;
// Parse flags
var i: usize = 1;
while (i < args.len) : (i += 1) {
if (std.mem.eql(u8, args[i], "--name") and i + 1 < args.len) {
job_name = args[i + 1];
i += 1;
} else if (std.mem.eql(u8, args[i], "--priority") and i + 1 < args.len) {
priority = try std.fmt.parseInt(u8, args[i + 1], 10);
i += 1;
} else if (std.mem.eql(u8, args[i], "--queue")) {
should_queue = true;
} else if (std.mem.eql(u8, args[i], "--json")) {
json = true;
}
}
const config = try Config.load(allocator);
defer {
var mut_config = config;
mut_config.deinit(allocator);
}
if (json) {
std.debug.print("{\"ok\":true,\"action\":\"watch\",\"path\":\"{s}\",\"queued\":{s}}\n", .{ path, if (should_queue) "true" else "false" });
} else {
std.debug.print("Watching {s} for changes...\n", .{path});
std.debug.print("Press Ctrl+C to stop\n", .{});
}
// Initial sync
var last_commit_id = try syncAndQueue(allocator, path, job_name, priority, should_queue, config);
defer allocator.free(last_commit_id);
// Watch for changes
var watcher = try std.fs.cwd().openDir(path, .{ .iterate = true });
defer watcher.close();
var last_modified: u64 = 0;
while (true) {
// Check for file changes
var modified = false;
var walker = try watcher.walk(allocator);
defer walker.deinit();
while (try walker.next()) |entry| {
if (entry.kind == .file) {
const file = try watcher.openFile(entry.path, .{});
defer file.close();
const stat = try file.stat();
if (stat.mtime > last_modified) {
last_modified = @intCast(stat.mtime);
modified = true;
}
}
}
if (modified) {
if (!json) {
std.debug.print("\nChanges detected, syncing...\n", .{});
}
const new_commit_id = try syncAndQueue(allocator, path, job_name, priority, should_queue, config);
defer allocator.free(new_commit_id);
if (!std.mem.eql(u8, last_commit_id, new_commit_id)) {
allocator.free(last_commit_id);
last_commit_id = try allocator.dupe(u8, new_commit_id);
if (!json) {
std.debug.print("✓ Synced new version: {s}\n", .{last_commit_id[0..8]});
}
}
}
// Wait before checking again
std.Thread.sleep(2_000_000_000); // 2 seconds in nanoseconds
}
}
fn syncAndQueue(allocator: std.mem.Allocator, path: []const u8, job_name: ?[]const u8, priority: u8, should_queue: bool, config: Config) ![]u8 {
// Calculate commit ID
const commit_id = try crypto.hashDirectory(allocator, path);
// Sync files via rsync
const remote_path = try std.fmt.allocPrint(
allocator,
"{s}@{s}:{s}/{s}/files/",
.{ config.worker_user, config.worker_host, config.worker_base, commit_id },
);
defer allocator.free(remote_path);
try rsync.sync(allocator, path, remote_path, config.worker_port);
if (should_queue) {
const actual_job_name = job_name orelse commit_id[0..8];
const api_key_hash = try crypto.hashApiKey(allocator, config.api_key);
defer allocator.free(api_key_hash);
// Connect to WebSocket and queue job
const ws_url = try config.getWebSocketUrl(allocator);
defer allocator.free(ws_url);
var client = try ws.Client.connect(allocator, ws_url, config.api_key);
defer client.close();
try client.sendQueueJob(actual_job_name, commit_id, priority, api_key_hash);
const response = try client.receiveMessage(allocator);
defer allocator.free(response);
if (response.len > 0 and response[0] == 0x00) {
std.debug.print("✓ Job queued successfully: {s}\n", .{actual_job_name});
}
}
return commit_id;
}
fn printUsage() void {
std.debug.print("Usage: ml watch <path> [options]\n\n", .{});
std.debug.print("Options:\n", .{});
std.debug.print(" --name <job> Override job name when used with --queue\n", .{});
std.debug.print(" --priority <N> Priority to use when queueing (default: 5)\n", .{});
std.debug.print(" --queue Queue on every sync\n", .{});
std.debug.print(" --json Emit a single JSON line describing watch start\n", .{});
std.debug.print(" --help, -h Show this help message\n", .{});
}