feat(cli): add progress tracking and sync management

Add progress reporting and offline sync infrastructure:
- progress.zig: progress bars and status reporting
- sync_manager.zig: offline run synchronization manager

Supports resilient operation with server connectivity issues.
This commit is contained in:
Jeremie Fraeys 2026-03-04 20:23:17 -05:00
parent 524f440fe4
commit f1965b99bd
No known key found for this signature in database
2 changed files with 358 additions and 0 deletions

193
cli/src/sync_manager.zig Normal file
View file

@ -0,0 +1,193 @@
const std = @import("std");
const db = @import("db.zig");
const manifest_lib = @import("manifest.zig");
const config = @import("config.zig");
const ws = @import("net/ws/client.zig");
const crypto = @import("utils/crypto.zig");
const protocol = @import("net/protocol.zig");
const mode = @import("mode.zig");
/// Auto-sync manager for handling offline runs
/// Automatically syncs pending runs when connection is restored
pub const AutoSync = struct {
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(allocator: std.mem.Allocator) Self {
return .{
.allocator = allocator,
};
}
/// Check for and sync any pending runs
/// Should be called periodically or when connection is restored
pub fn syncPendingRuns(self: Self) !SyncResult {
const cfg = try config.Config.load(self.allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(self.allocator);
}
// Check if we're online
const mode_result = try mode.detect(self.allocator, cfg);
if (mode.isOffline(mode_result.mode)) {
return .{ .synced = 0, .failed = 0, .message = "Offline - no sync possible" };
}
// Get pending runs from sync DB
const db_path = try self.getSyncDBPath();
defer self.allocator.free(db_path);
var database = try db.initOrOpenSyncDB(self.allocator, db_path);
defer database.close();
const pending = try database.getPendingRuns(self.allocator);
defer {
for (pending) |run_id| {
self.allocator.free(run_id);
}
self.allocator.free(pending);
}
if (pending.len == 0) {
return .{ .synced = 0, .failed = 0, .message = "No pending runs to sync" };
}
std.log.info("Found {d} pending run(s) to sync", .{pending.len});
// Connect to server
const ws_url = try cfg.getWebSocketUrl(self.allocator);
defer self.allocator.free(ws_url);
var client = try ws.Client.connect(self.allocator, ws_url, cfg.api_key);
defer client.close();
const api_key_hash = try crypto.hashApiKey(self.allocator, cfg.api_key);
defer self.allocator.free(api_key_hash);
var synced: usize = 0;
var failed: usize = 0;
for (pending) |run_id| {
const result = self.syncSingleRun(run_id, cfg, &client, api_key_hash) catch |err| {
std.log.err("Failed to sync run {s}: {}", .{ run_id[0..@min(8, run_id.len)], err });
failed += 1;
continue;
};
if (result) {
// Mark as synced in database
try database.markAsSynced(run_id);
synced += 1;
std.log.info("Synced run {s}", .{run_id[0..@min(8, run_id.len)]});
} else {
failed += 1;
}
}
const msg = try std.fmt.allocPrint(self.allocator, "Synced {d}/{d} runs", .{ synced, pending.len });
return .{
.synced = synced,
.failed = failed,
.message = msg,
};
}
/// Sync a single run to the server
fn syncSingleRun(
self: Self,
run_id: []const u8,
cfg: config.Config,
client: *ws.Client,
api_key_hash: []const u8,
) !bool {
// Find the run manifest
const manifest_path = try self.findManifestPath(run_id, cfg);
defer if (manifest_path) |p| self.allocator.free(p);
if (manifest_path == null) {
std.log.warn("Could not find manifest for run {s}", .{run_id[0..@min(8, run_id.len)]});
return false;
}
// Read manifest
var manifest = try manifest_lib.readManifest(manifest_path.?, self.allocator);
defer manifest.deinit(self.allocator);
// Send sync request to server
try client.sendSyncRunRequest(
run_id,
manifest.job_name orelse "unnamed",
manifest.command,
manifest.status orelse "UNKNOWN",
manifest.exit_code,
api_key_hash,
);
// Wait for response
const response = try client.receiveMessage(self.allocator);
defer self.allocator.free(response);
// Parse response
if (std.mem.indexOf(u8, response, "success") != null) {
return true;
} else {
std.log.warn("Server rejected sync: {s}", .{response});
return false;
}
}
/// Find the manifest path for a run
fn findManifestPath(self: Self, run_id: []const u8, cfg: config.Config) !?[]const u8 {
// Check in artifact_path/experiment/run_id/
const experiments_dir = try std.fs.openDirAbsolute(cfg.artifact_path, .{ .iterate = true });
defer experiments_dir.close();
var iter = experiments_dir.iterate();
while (try iter.next()) |entry| {
if (entry.kind != .directory) continue;
const run_dir_path = try std.fs.path.join(self.allocator, &[_][]const u8{
cfg.artifact_path,
entry.name,
run_id,
});
const manifest_path = try std.fs.path.join(self.allocator, &[_][]const u8{
run_dir_path,
"run_manifest.json",
});
defer self.allocator.free(manifest_path);
if (std.fs.accessAbsolute(manifest_path, .{})) {
return run_dir_path; // Return the directory path
} else |_| {
self.allocator.free(run_dir_path);
}
}
return null;
}
fn getSyncDBPath(self: Self) ![]const u8 {
const home = std.posix.getenv("HOME") orelse ".";
return std.fs.path.join(self.allocator, &[_][]const u8{ home, ".ml", "sync.db" });
}
};
pub const SyncResult = struct {
synced: usize,
failed: usize,
message: []const u8,
pub fn deinit(self: SyncResult, allocator: std.mem.Allocator) void {
allocator.free(self.message);
}
};
/// Convenience function to sync pending runs
pub fn syncPendingRuns(allocator: std.mem.Allocator) !SyncResult {
const auto_sync = AutoSync.init(allocator);
return auto_sync.syncPendingRuns();
}

165
cli/src/utils/progress.zig Normal file
View file

@ -0,0 +1,165 @@
const std = @import("std");
const colors = @import("colors.zig");
/// Progress bar for long-running operations
pub const ProgressBar = struct {
allocator: std.mem.Allocator,
total: u64,
current: u64,
width: usize,
start_time: i64,
label: []const u8,
const Self = @This();
pub fn init(allocator: std.mem.Allocator, total: u64, label: []const u8) Self {
return .{
.allocator = allocator,
.total = total,
.current = 0,
.width = 40,
.start_time = std.time.timestamp(),
.label = label,
};
}
/// Update progress and redraw
pub fn update(self: *Self, current: u64) void {
self.current = current;
self.draw();
}
/// Increment progress by amount
pub fn increment(self: *Self, amount: u64) void {
self.current += amount;
self.draw();
}
/// Draw progress bar to stdout
pub fn draw(self: *Self) void {
const percent = if (self.total > 0)
@min(100, @as(u64, self.current * 100 / self.total))
else
0;
const filled = if (self.total > 0)
@min(self.width, @as(usize, self.current * self.width / self.total))
else
0;
const elapsed = std.time.timestamp() - self.start_time;
const rate = if (elapsed > 0) self.current / @as(u64, @intCast(elapsed)) else 0;
// Build bar
var bar: [256]u8 = undefined;
var i: usize = 0;
// Clear line and move cursor to start
bar[i] = '\r';
i += 1;
// Add label
const label_len = @min(self.label.len, 30);
@memcpy(bar[i..][0..label_len], self.label[0..label_len]);
i += label_len;
if (label_len < 30) {
bar[i] = ' ';
i += 1;
}
// Opening bracket
bar[i] = '[';
i += 1;
// Filled portion
for (0..filled) |_| {
bar[i] = '█';
i += 1;
}
// Empty portion
for (filled..self.width) |_| {
bar[i] = '░';
i += 1;
}
// Closing bracket
bar[i] = ']';
i += 1;
// Percentage
const percent_str = std.fmt.bufPrint(bar[i..][0..20], " {d:>3}%", .{percent}) catch " ???%";
i += percent_str.len;
// Rate
if (rate > 0) {
const rate_str = std.fmt.bufPrint(bar[i..][0..30], " ({d} it/s)", .{rate}) catch "";
i += rate_str.len;
}
// Write to stdout
const stdout = std.io.getStdOut();
_ = stdout.write(bar[0..i]) catch {};
}
/// Finish and print newline
pub fn finish(self: *Self) void {
self.current = self.total;
self.draw();
const stdout = std.io.getStdOut();
_ = stdout.writeAll("\n") catch {};
}
};
/// Spinner for indeterminate operations
pub const Spinner = struct {
frames: []const []const u8,
frame_index: usize,
label: []const u8,
last_update: i64,
const Self = @This();
pub fn init(label: []const u8) Self {
return .{
.frames = &[_][]const u8{ "", "", "", "", "", "", "", "", "", "" },
.frame_index = 0,
.label = label,
.last_update = std.time.timestamp(),
};
}
/// Draw next frame
pub fn tick(self: *Self) void {
const now = std.time.timestamp();
if (now - self.last_update < 0) return; // Only update every 100ms
self.last_update = now;
self.frame_index = (self.frame_index + 1) % self.frames.len;
self.draw();
}
fn draw(self: *Self) void {
const frame = self.frames[self.frame_index];
const stdout = std.io.getStdOut();
var buf: [256]u8 = undefined;
const msg = std.fmt.bufPrint(&buf, "\r{s} {s}", .{ frame, self.label }) catch return;
_ = stdout.write(msg) catch {};
}
/// Finish and clear
pub fn finish(self: *Self) void {
const stdout = std.io.getStdOut();
_ = stdout.writeAll("\r\x1b[K") catch {}; // Clear line
_ = std.fmt.format(stdout.writer(), "✓ {s}\n", .{self.label}) catch {};
}
};
/// Convenience functions
pub fn showProgress(total: u64, label: []const u8) ProgressBar {
return ProgressBar.init(std.heap.c_allocator, total, label);
}
pub fn showSpinner(label: []const u8) Spinner {
return Spinner.init(label);
}