feat(cli): implement sync tracking for offline run synchronization

Add SQLite-based sync tracking infrastructure:
- Add synced column to ml_experiments schema (0=not synced, 1=synced)
- Implement SyncDB with initOrOpenSyncDB for sync_pending table
- Add markForSync, markAsSynced, getPendingRuns functions
- Fix SQLite error handling for Zig 0.15 compatibility

Enables tracking experiments that need server synchronization when offline.
This commit is contained in:
Jeremie Fraeys 2026-03-04 20:22:56 -05:00
parent fedaba2409
commit 303f17d3b2
No known key found for this signature in database

View file

@ -21,6 +21,7 @@ const SCHEMA =
\\ name TEXT NOT NULL,
\\ artifact_path TEXT,
\\ lifecycle TEXT DEFAULT 'active',
\\ synced INTEGER DEFAULT 0, -- 0 = not synced, 1 = synced to server
\\ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
\\ );
\\ CREATE TABLE IF NOT EXISTS ml_runs (
@ -262,3 +263,99 @@ pub fn currentTimestamp(allocator: std.mem.Allocator) ![]const u8 {
return try allocator.dupe(u8, len);
}
/// Schema for sync tracking table
const SYNC_SCHEMA =
\\ CREATE TABLE IF NOT EXISTS sync_pending (
\\ run_id TEXT PRIMARY KEY,
\\ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
\\ retry_count INTEGER DEFAULT 0,
\\ last_error TEXT
\\ );
;
/// Sync database for tracking pending sync operations
pub const SyncDB = struct {
db: *c.sqlite3,
pub fn close(self: *SyncDB) void {
_ = c.sqlite3_close(self.db);
}
pub fn markForSync(self: *SyncDB, run_id: []const u8) !void {
const sql = "INSERT OR REPLACE INTO sync_pending (run_id) VALUES (?);";
var stmt: ?*c.sqlite3_stmt = null;
const rc = c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null);
if (rc != c.SQLITE_OK) return error.SqliteError;
defer _ = c.sqlite3_finalize(stmt);
_ = c.sqlite3_bind_text(stmt, 1, run_id.ptr, @intCast(run_id.len), sqliteTransient());
const step_rc = c.sqlite3_step(stmt);
if (step_rc != c.SQLITE_DONE) return error.SqliteError;
}
pub fn markAsSynced(self: *SyncDB, run_id: []const u8) !void {
const sql = "DELETE FROM sync_pending WHERE run_id = ?;";
var stmt: ?*c.sqlite3_stmt = null;
const rc = c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null);
if (rc != c.SQLITE_OK) return error.SqliteError;
defer _ = c.sqlite3_finalize(stmt);
_ = c.sqlite3_bind_text(stmt, 1, run_id.ptr, @intCast(run_id.len), sqliteTransient());
const step_rc = c.sqlite3_step(stmt);
if (step_rc != c.SQLITE_DONE) return error.SqliteError;
}
pub fn getPendingRuns(self: *SyncDB, allocator: std.mem.Allocator) ![][]const u8 {
const sql = "SELECT run_id FROM sync_pending ORDER BY created_at;";
var stmt: ?*c.sqlite3_stmt = null;
const rc = c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null);
if (rc != c.SQLITE_OK) return error.SqliteError;
defer _ = c.sqlite3_finalize(stmt);
var results = std.ArrayList([]const u8).init(allocator);
errdefer {
for (results.items) |item| allocator.free(item);
results.deinit(allocator);
}
while (c.sqlite3_step(stmt) == c.SQLITE_ROW) {
const run_id_ptr = c.sqlite3_column_text(stmt, 0);
const run_id_len = c.sqlite3_column_bytes(stmt, 0);
if (run_id_ptr != null) {
const run_id = try allocator.dupe(u8, run_id_ptr[0..run_id_len]);
try results.append(allocator, run_id);
}
}
return results.toOwnedSlice(allocator);
}
};
/// Initialize or open sync database
pub fn initOrOpenSyncDB(allocator: std.mem.Allocator, path: []const u8) !SyncDB {
_ = allocator;
var db: ?*c.sqlite3 = null;
const rc = c.sqlite3_open(path.ptr, &db);
if (rc != c.SQLITE_OK) return error.SqliteError;
// Create sync table
var err_msg: [*c]u8 = null;
const schema_rc = c.sqlite3_exec(db, SYNC_SCHEMA, null, null, &err_msg);
if (schema_rc != c.SQLITE_OK) {
if (err_msg != null) {
std.log.err("SyncDB schema error: {s}", .{err_msg});
_ = c.sqlite3_free(err_msg);
}
_ = c.sqlite3_close(db);
return error.SqliteError;
}
return SyncDB{ .db = db.? };
}