From f1965b99bd25c05e5cfa7367d0b1aac4d5b99caf Mon Sep 17 00:00:00 2001 From: Jeremie Fraeys Date: Wed, 4 Mar 2026 20:23:17 -0500 Subject: [PATCH] feat(cli): add progress tracking and sync management Add progress reporting and offline sync infrastructure: - progress.zig: progress bars and status reporting - sync_manager.zig: offline run synchronization manager Supports resilient operation with server connectivity issues. --- cli/src/sync_manager.zig | 193 +++++++++++++++++++++++++++++++++++++ cli/src/utils/progress.zig | 165 +++++++++++++++++++++++++++++++ 2 files changed, 358 insertions(+) create mode 100644 cli/src/sync_manager.zig create mode 100644 cli/src/utils/progress.zig diff --git a/cli/src/sync_manager.zig b/cli/src/sync_manager.zig new file mode 100644 index 0000000..1237112 --- /dev/null +++ b/cli/src/sync_manager.zig @@ -0,0 +1,193 @@ +const std = @import("std"); +const db = @import("db.zig"); +const manifest_lib = @import("manifest.zig"); +const config = @import("config.zig"); +const ws = @import("net/ws/client.zig"); +const crypto = @import("utils/crypto.zig"); +const protocol = @import("net/protocol.zig"); +const mode = @import("mode.zig"); + +/// Auto-sync manager for handling offline runs +/// Automatically syncs pending runs when connection is restored +pub const AutoSync = struct { + allocator: std.mem.Allocator, + + const Self = @This(); + + pub fn init(allocator: std.mem.Allocator) Self { + return .{ + .allocator = allocator, + }; + } + + /// Check for and sync any pending runs + /// Should be called periodically or when connection is restored + pub fn syncPendingRuns(self: Self) !SyncResult { + const cfg = try config.Config.load(self.allocator); + defer { + var mut_cfg = cfg; + mut_cfg.deinit(self.allocator); + } + + // Check if we're online + const mode_result = try mode.detect(self.allocator, cfg); + if (mode.isOffline(mode_result.mode)) { + return .{ .synced = 0, .failed = 0, .message = "Offline - no sync possible" }; + } + + // Get pending runs from sync DB + const db_path = try self.getSyncDBPath(); + defer self.allocator.free(db_path); + + var database = try db.initOrOpenSyncDB(self.allocator, db_path); + defer database.close(); + + const pending = try database.getPendingRuns(self.allocator); + defer { + for (pending) |run_id| { + self.allocator.free(run_id); + } + self.allocator.free(pending); + } + + if (pending.len == 0) { + return .{ .synced = 0, .failed = 0, .message = "No pending runs to sync" }; + } + + std.log.info("Found {d} pending run(s) to sync", .{pending.len}); + + // Connect to server + const ws_url = try cfg.getWebSocketUrl(self.allocator); + defer self.allocator.free(ws_url); + + var client = try ws.Client.connect(self.allocator, ws_url, cfg.api_key); + defer client.close(); + + const api_key_hash = try crypto.hashApiKey(self.allocator, cfg.api_key); + defer self.allocator.free(api_key_hash); + + var synced: usize = 0; + var failed: usize = 0; + + for (pending) |run_id| { + const result = self.syncSingleRun(run_id, cfg, &client, api_key_hash) catch |err| { + std.log.err("Failed to sync run {s}: {}", .{ run_id[0..@min(8, run_id.len)], err }); + failed += 1; + continue; + }; + + if (result) { + // Mark as synced in database + try database.markAsSynced(run_id); + synced += 1; + std.log.info("Synced run {s}", .{run_id[0..@min(8, run_id.len)]}); + } else { + failed += 1; + } + } + + const msg = try std.fmt.allocPrint(self.allocator, "Synced {d}/{d} runs", .{ synced, pending.len }); + return .{ + .synced = synced, + .failed = failed, + .message = msg, + }; + } + + /// Sync a single run to the server + fn syncSingleRun( + self: Self, + run_id: []const u8, + cfg: config.Config, + client: *ws.Client, + api_key_hash: []const u8, + ) !bool { + // Find the run manifest + const manifest_path = try self.findManifestPath(run_id, cfg); + defer if (manifest_path) |p| self.allocator.free(p); + + if (manifest_path == null) { + std.log.warn("Could not find manifest for run {s}", .{run_id[0..@min(8, run_id.len)]}); + return false; + } + + // Read manifest + var manifest = try manifest_lib.readManifest(manifest_path.?, self.allocator); + defer manifest.deinit(self.allocator); + + // Send sync request to server + try client.sendSyncRunRequest( + run_id, + manifest.job_name orelse "unnamed", + manifest.command, + manifest.status orelse "UNKNOWN", + manifest.exit_code, + api_key_hash, + ); + + // Wait for response + const response = try client.receiveMessage(self.allocator); + defer self.allocator.free(response); + + // Parse response + if (std.mem.indexOf(u8, response, "success") != null) { + return true; + } else { + std.log.warn("Server rejected sync: {s}", .{response}); + return false; + } + } + + /// Find the manifest path for a run + fn findManifestPath(self: Self, run_id: []const u8, cfg: config.Config) !?[]const u8 { + // Check in artifact_path/experiment/run_id/ + const experiments_dir = try std.fs.openDirAbsolute(cfg.artifact_path, .{ .iterate = true }); + defer experiments_dir.close(); + + var iter = experiments_dir.iterate(); + while (try iter.next()) |entry| { + if (entry.kind != .directory) continue; + + const run_dir_path = try std.fs.path.join(self.allocator, &[_][]const u8{ + cfg.artifact_path, + entry.name, + run_id, + }); + + const manifest_path = try std.fs.path.join(self.allocator, &[_][]const u8{ + run_dir_path, + "run_manifest.json", + }); + defer self.allocator.free(manifest_path); + + if (std.fs.accessAbsolute(manifest_path, .{})) { + return run_dir_path; // Return the directory path + } else |_| { + self.allocator.free(run_dir_path); + } + } + + return null; + } + + fn getSyncDBPath(self: Self) ![]const u8 { + const home = std.posix.getenv("HOME") orelse "."; + return std.fs.path.join(self.allocator, &[_][]const u8{ home, ".ml", "sync.db" }); + } +}; + +pub const SyncResult = struct { + synced: usize, + failed: usize, + message: []const u8, + + pub fn deinit(self: SyncResult, allocator: std.mem.Allocator) void { + allocator.free(self.message); + } +}; + +/// Convenience function to sync pending runs +pub fn syncPendingRuns(allocator: std.mem.Allocator) !SyncResult { + const auto_sync = AutoSync.init(allocator); + return auto_sync.syncPendingRuns(); +} diff --git a/cli/src/utils/progress.zig b/cli/src/utils/progress.zig new file mode 100644 index 0000000..de65c04 --- /dev/null +++ b/cli/src/utils/progress.zig @@ -0,0 +1,165 @@ +const std = @import("std"); +const colors = @import("colors.zig"); + +/// Progress bar for long-running operations +pub const ProgressBar = struct { + allocator: std.mem.Allocator, + total: u64, + current: u64, + width: usize, + start_time: i64, + label: []const u8, + + const Self = @This(); + + pub fn init(allocator: std.mem.Allocator, total: u64, label: []const u8) Self { + return .{ + .allocator = allocator, + .total = total, + .current = 0, + .width = 40, + .start_time = std.time.timestamp(), + .label = label, + }; + } + + /// Update progress and redraw + pub fn update(self: *Self, current: u64) void { + self.current = current; + self.draw(); + } + + /// Increment progress by amount + pub fn increment(self: *Self, amount: u64) void { + self.current += amount; + self.draw(); + } + + /// Draw progress bar to stdout + pub fn draw(self: *Self) void { + const percent = if (self.total > 0) + @min(100, @as(u64, self.current * 100 / self.total)) + else + 0; + + const filled = if (self.total > 0) + @min(self.width, @as(usize, self.current * self.width / self.total)) + else + 0; + + const elapsed = std.time.timestamp() - self.start_time; + const rate = if (elapsed > 0) self.current / @as(u64, @intCast(elapsed)) else 0; + + // Build bar + var bar: [256]u8 = undefined; + var i: usize = 0; + + // Clear line and move cursor to start + bar[i] = '\r'; + i += 1; + + // Add label + const label_len = @min(self.label.len, 30); + @memcpy(bar[i..][0..label_len], self.label[0..label_len]); + i += label_len; + if (label_len < 30) { + bar[i] = ' '; + i += 1; + } + + // Opening bracket + bar[i] = '['; + i += 1; + + // Filled portion + for (0..filled) |_| { + bar[i] = '█'; + i += 1; + } + + // Empty portion + for (filled..self.width) |_| { + bar[i] = '░'; + i += 1; + } + + // Closing bracket + bar[i] = ']'; + i += 1; + + // Percentage + const percent_str = std.fmt.bufPrint(bar[i..][0..20], " {d:>3}%", .{percent}) catch " ???%"; + i += percent_str.len; + + // Rate + if (rate > 0) { + const rate_str = std.fmt.bufPrint(bar[i..][0..30], " ({d} it/s)", .{rate}) catch ""; + i += rate_str.len; + } + + // Write to stdout + const stdout = std.io.getStdOut(); + _ = stdout.write(bar[0..i]) catch {}; + } + + /// Finish and print newline + pub fn finish(self: *Self) void { + self.current = self.total; + self.draw(); + const stdout = std.io.getStdOut(); + _ = stdout.writeAll("\n") catch {}; + } +}; + +/// Spinner for indeterminate operations +pub const Spinner = struct { + frames: []const []const u8, + frame_index: usize, + label: []const u8, + last_update: i64, + + const Self = @This(); + + pub fn init(label: []const u8) Self { + return .{ + .frames = &[_][]const u8{ "⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏" }, + .frame_index = 0, + .label = label, + .last_update = std.time.timestamp(), + }; + } + + /// Draw next frame + pub fn tick(self: *Self) void { + const now = std.time.timestamp(); + if (now - self.last_update < 0) return; // Only update every 100ms + self.last_update = now; + + self.frame_index = (self.frame_index + 1) % self.frames.len; + self.draw(); + } + + fn draw(self: *Self) void { + const frame = self.frames[self.frame_index]; + const stdout = std.io.getStdOut(); + var buf: [256]u8 = undefined; + const msg = std.fmt.bufPrint(&buf, "\r{s} {s}", .{ frame, self.label }) catch return; + _ = stdout.write(msg) catch {}; + } + + /// Finish and clear + pub fn finish(self: *Self) void { + const stdout = std.io.getStdOut(); + _ = stdout.writeAll("\r\x1b[K") catch {}; // Clear line + _ = std.fmt.format(stdout.writer(), "✓ {s}\n", .{self.label}) catch {}; + } +}; + +/// Convenience functions +pub fn showProgress(total: u64, label: []const u8) ProgressBar { + return ProgressBar.init(std.heap.c_allocator, total, label); +} + +pub fn showSpinner(label: []const u8) Spinner { + return Spinner.init(label); +}