feat(cli): Add core infrastructure for local mode support

- mode.zig: Automatic online/offline mode detection with API ping
- manifest.zig: Run manifest read/write/update operations
- core/: Common flags, output formatting, and context management
- local.zig + local/: Local mode experiment operations
- server.zig + server/: Server mode API client
- db.zig: Add pid column to ml_runs table for process tracking
- config.zig: Add force_local, [experiment] section with name/entrypoint
- utils/native_bridge.zig: Native library integration
This commit is contained in:
Jeremie Fraeys 2026-02-20 21:28:06 -05:00
parent d43725b817
commit 551597b5df
No known key found for this signature in database
14 changed files with 1621 additions and 61 deletions

View file

@ -1,6 +1,11 @@
const std = @import("std");
const security = @import("security.zig");
pub const ExperimentConfig = struct {
name: []const u8,
entrypoint: []const u8,
};
/// URI-based configuration for FetchML
/// Supports: sqlite:///path/to.db or wss://server.com/ws
pub const Config = struct {
@ -10,6 +15,10 @@ pub const Config = struct {
artifact_path: []const u8,
// Sync target URI (for pushing local runs to server)
sync_uri: []const u8,
// Force local mode regardless of server config
force_local: bool,
// Experiment configuration ([experiment] section)
experiment: ?ExperimentConfig,
// Legacy server config (for runner mode)
worker_host: []const u8,
@ -126,6 +135,8 @@ pub const Config = struct {
.tracking_uri = try allocator.dupe(u8, "sqlite://./fetch_ml.db"),
.artifact_path = try allocator.dupe(u8, "./experiments/"),
.sync_uri = try allocator.dupe(u8, ""),
.force_local = false,
.experiment = null,
.worker_host = try allocator.dupe(u8, ""),
.worker_user = try allocator.dupe(u8, ""),
.worker_base = try allocator.dupe(u8, ""),
@ -146,8 +157,13 @@ pub const Config = struct {
const content = try file.readToEndAlloc(allocator, 1024 * 1024);
defer allocator.free(content);
// Simple TOML parser - parse key=value pairs
// Simple TOML parser - parse key=value pairs and [section] headers
var config = Config{
.tracking_uri = "",
.artifact_path = "",
.sync_uri = "",
.force_local = false,
.experiment = null,
.worker_host = "",
.worker_user = "",
.worker_base = "",
@ -163,11 +179,21 @@ pub const Config = struct {
.default_priority = 5,
};
var current_section: []const u8 = "root";
var experiment_name: ?[]const u8 = null;
var experiment_entrypoint: ?[]const u8 = null;
var lines = std.mem.splitScalar(u8, content, '\n');
while (lines.next()) |line| {
const trimmed = std.mem.trim(u8, line, " \t\r");
if (trimmed.len == 0 or trimmed[0] == '#') continue;
// Check for section header [section]
if (trimmed[0] == '[' and trimmed[trimmed.len - 1] == ']') {
current_section = trimmed[1 .. trimmed.len - 1];
continue;
}
var parts = std.mem.splitScalar(u8, trimmed, '=');
const key = std.mem.trim(u8, parts.next() orelse continue, " \t");
const value_raw = std.mem.trim(u8, parts.next() orelse continue, " \t");
@ -178,43 +204,67 @@ pub const Config = struct {
else
value_raw;
if (std.mem.eql(u8, key, "tracking_uri")) {
config.tracking_uri = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "artifact_path")) {
config.artifact_path = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "sync_uri")) {
config.sync_uri = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_host")) {
config.worker_host = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_user")) {
config.worker_user = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_base")) {
config.worker_base = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_port")) {
config.worker_port = try std.fmt.parseInt(u16, value, 10);
} else if (std.mem.eql(u8, key, "api_key")) {
config.api_key = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "default_cpu")) {
config.default_cpu = try std.fmt.parseInt(u8, value, 10);
} else if (std.mem.eql(u8, key, "default_memory")) {
config.default_memory = try std.fmt.parseInt(u8, value, 10);
} else if (std.mem.eql(u8, key, "default_gpu")) {
config.default_gpu = try std.fmt.parseInt(u8, value, 10);
} else if (std.mem.eql(u8, key, "default_gpu_memory")) {
if (value.len > 0) {
config.default_gpu_memory = try allocator.dupe(u8, value);
// Parse based on current section
if (std.mem.eql(u8, current_section, "experiment")) {
if (std.mem.eql(u8, key, "name")) {
experiment_name = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "entrypoint")) {
experiment_entrypoint = try allocator.dupe(u8, value);
}
} else {
// Root level keys
if (std.mem.eql(u8, key, "tracking_uri")) {
config.tracking_uri = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "artifact_path")) {
config.artifact_path = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "sync_uri")) {
config.sync_uri = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "force_local")) {
config.force_local = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "worker_host")) {
config.worker_host = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_user")) {
config.worker_user = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_base")) {
config.worker_base = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "worker_port")) {
config.worker_port = try std.fmt.parseInt(u16, value, 10);
} else if (std.mem.eql(u8, key, "api_key")) {
config.api_key = try allocator.dupe(u8, value);
} else if (std.mem.eql(u8, key, "default_cpu")) {
config.default_cpu = try std.fmt.parseInt(u8, value, 10);
} else if (std.mem.eql(u8, key, "default_memory")) {
config.default_memory = try std.fmt.parseInt(u8, value, 10);
} else if (std.mem.eql(u8, key, "default_gpu")) {
config.default_gpu = try std.fmt.parseInt(u8, value, 10);
} else if (std.mem.eql(u8, key, "default_gpu_memory")) {
if (value.len > 0) {
config.default_gpu_memory = try allocator.dupe(u8, value);
}
} else if (std.mem.eql(u8, key, "default_dry_run")) {
config.default_dry_run = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "default_validate")) {
config.default_validate = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "default_json")) {
config.default_json = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "default_priority")) {
config.default_priority = try std.fmt.parseInt(u8, value, 10);
}
} else if (std.mem.eql(u8, key, "default_dry_run")) {
config.default_dry_run = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "default_validate")) {
config.default_validate = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "default_json")) {
config.default_json = std.mem.eql(u8, value, "true");
} else if (std.mem.eql(u8, key, "default_priority")) {
config.default_priority = try std.fmt.parseInt(u8, value, 10);
}
}
// Create experiment config if both name and entrypoint are set
if (experiment_name != null and experiment_entrypoint != null) {
config.experiment = ExperimentConfig{
.name = experiment_name.?,
.entrypoint = experiment_entrypoint.?,
};
} else if (experiment_name != null) {
allocator.free(experiment_name.?);
} else if (experiment_entrypoint != null) {
allocator.free(experiment_entrypoint.?);
}
return config;
}
@ -240,6 +290,15 @@ pub const Config = struct {
try writer.print("tracking_uri = \"{s}\"\n", .{self.tracking_uri});
try writer.print("artifact_path = \"{s}\"\n", .{self.artifact_path});
try writer.print("sync_uri = \"{s}\"\n", .{self.sync_uri});
try writer.print("force_local = {s}\n", .{if (self.force_local) "true" else "false"});
// Write [experiment] section if configured
if (self.experiment) |exp| {
try writer.print("\n[experiment]\n", .{});
try writer.print("name = \"{s}\"\n", .{exp.name});
try writer.print("entrypoint = \"{s}\"\n", .{exp.entrypoint});
}
try writer.print("\n# Server config (for runner mode)\n", .{});
try writer.print("worker_host = \"{s}\"\n", .{self.worker_host});
try writer.print("worker_user = \"{s}\"\n", .{self.worker_user});
@ -264,6 +323,10 @@ pub const Config = struct {
allocator.free(self.tracking_uri);
allocator.free(self.artifact_path);
allocator.free(self.sync_uri);
if (self.experiment) |*exp| {
allocator.free(exp.name);
allocator.free(exp.entrypoint);
}
allocator.free(self.worker_host);
allocator.free(self.worker_user);
allocator.free(self.worker_base);
@ -284,6 +347,14 @@ pub const Config = struct {
if (other.sync_uri.len > 0) {
self.sync_uri = other.sync_uri;
}
if (other.force_local) {
self.force_local = other.force_local;
}
if (other.experiment) |exp| {
if (self.experiment == null) {
self.experiment = exp;
}
}
if (other.worker_host.len > 0) {
self.worker_host = other.worker_host;
}
@ -307,6 +378,10 @@ pub const Config = struct {
allocator.free(other.tracking_uri);
allocator.free(other.artifact_path);
allocator.free(other.sync_uri);
if (other.experiment) |*exp| {
allocator.free(exp.name);
allocator.free(exp.entrypoint);
}
allocator.free(other.worker_host);
allocator.free(other.worker_user);
allocator.free(other.worker_base);
@ -371,7 +446,7 @@ fn loadGlobalConfig(allocator: std.mem.Allocator) !?Config {
const config_path = try std.fmt.allocPrint(allocator, "{s}/.ml/config.toml", .{home});
defer allocator.free(config_path);
const file = std.fs.openFileAbsolute(config_path, .{}) catch |err| {
const file = std.fs.openFileAbsolute(config_path, .{ .lock = .none }) catch |err| {
if (err == error.FileNotFound) return null;
return err;
};
@ -382,7 +457,7 @@ fn loadGlobalConfig(allocator: std.mem.Allocator) !?Config {
/// Load project config from .fetchml/config.toml in CWD
fn loadProjectConfig(allocator: std.mem.Allocator) !?Config {
const file = std.fs.openFileAbsolute(".fetchml/config.toml", .{}) catch |err| {
const file = std.fs.openFileAbsolute(".fetchml/config.toml", .{ .lock = .none }) catch |err| {
if (err == error.FileNotFound) return null;
return err;
};

4
cli/src/core.zig Normal file
View file

@ -0,0 +1,4 @@
pub const flags = @import("core/flags.zig");
pub const output = @import("core/output.zig");
pub const context = @import("core/context.zig");
pub const experiment = @import("core/experiment_core.zig");

132
cli/src/core/context.zig Normal file
View file

@ -0,0 +1,132 @@
const std = @import("std");
const config = @import("../config.zig");
const output = @import("output.zig");
/// Execution mode for commands
pub const Mode = enum {
local,
server,
};
/// Execution context passed to all command handlers
/// Provides unified access to allocator, config, and output mode
pub const Context = struct {
allocator: std.mem.Allocator,
mode: Mode,
cfg: config.Config,
json_output: bool,
/// Initialize context from config
pub fn init(allocator: std.mem.Allocator, cfg: config.Config, json_output: bool) Context {
const mode: Mode = if (cfg.isLocalMode()) .local else .server;
return .{
.allocator = allocator,
.mode = mode,
.cfg = cfg,
.json_output = json_output,
};
}
/// Clean up context resources
pub fn deinit(self: *Context) void {
self.cfg.deinit(self.allocator);
}
/// Check if running in local mode
pub fn isLocal(self: Context) bool {
return self.mode == .local;
}
/// Check if running in server mode
pub fn isServer(self: Context) bool {
return self.mode == .server;
}
/// Dispatch to appropriate implementation based on mode
/// local_fn: function to call in local mode
/// server_fn: function to call in server mode
/// Both functions must have the same signature: fn (Context, []const []const u8) anyerror!void
pub fn dispatch(
self: Context,
local_fn: *const fn (Context, []const []const u8) anyerror!void,
server_fn: *const fn (Context, []const []const u8) anyerror!void,
args: []const []const u8,
) !void {
switch (self.mode) {
.local => return local_fn(self, args),
.server => return server_fn(self, args),
}
}
/// Dispatch with result - returns a value
pub fn dispatchWithResult(
self: Context,
local_fn: *const fn (Context, []const []const u8) anyerror![]const u8,
server_fn: *const fn (Context, []const []const u8) anyerror![]const u8,
args: []const []const u8,
) ![]const u8 {
switch (self.mode) {
.local => return local_fn(self, args),
.server => return server_fn(self, args),
}
}
/// Output helpers that respect context settings
pub fn errorMsg(self: Context, comptime cmd: []const u8, message: []const u8) void {
if (self.json_output) {
output.errorMsg(cmd, message);
} else {
std.log.err("{s}: {s}", .{ cmd, message });
}
}
pub fn errorMsgDetailed(self: Context, comptime cmd: []const u8, message: []const u8, details: []const u8) void {
if (self.json_output) {
output.errorMsgDetailed(cmd, message, details);
} else {
std.log.err("{s}: {s} - {s}", .{ cmd, message, details });
}
}
pub fn success(self: Context, comptime cmd: []const u8) void {
if (self.json_output) {
output.success(cmd);
}
}
pub fn successString(self: Context, comptime cmd: []const u8, comptime key: []const u8, value: []const u8) void {
if (self.json_output) {
output.successString(cmd, key, value);
} else {
std.debug.print("{s}: {s}\n", .{ key, value });
}
}
pub fn info(self: Context, comptime fmt: []const u8, args: anytype) void {
if (!self.json_output) {
std.debug.print(fmt ++ "\n", args);
}
}
pub fn printUsage(_: Context, comptime cmd: []const u8, comptime usage: []const u8) void {
output.usage(cmd, usage);
}
};
/// Require subcommand helper
pub fn requireSubcommand(args: []const []const u8, comptime cmd_name: []const u8) ![]const u8 {
if (args.len == 0) {
std.log.err("Command '{s}' requires a subcommand", .{cmd_name});
return error.MissingSubcommand;
}
return args[0];
}
/// Match subcommand and return remaining args
pub fn matchSubcommand(args: []const []const u8, comptime sub: []const u8) ?[]const []const u8 {
if (args.len == 0) return null;
if (std.mem.eql(u8, args[0], sub)) {
return args[1..];
}
return null;
}

View file

@ -0,0 +1,136 @@
//! Experiment core module - shared validation and formatting logic
//!
//! This module provides common utilities used by both local and server
//! experiment operations, reducing code duplication between modes.
const std = @import("std");
const core = @import("../core.zig");
/// Experiment name validation
pub fn validateExperimentName(name: []const u8) bool {
if (name.len == 0 or name.len > 128) return false;
for (name) |c| {
if (!std.ascii.isAlphanumeric(c) and c != '_' and c != '-' and c != '.') {
return false;
}
}
return true;
}
/// Generate a UUID for experiment IDs
pub fn generateExperimentId(allocator: std.mem.Allocator) ![]const u8 {
// Simple UUID v4 format: xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx
var buf: [36]u8 = undefined;
const hex_chars = "0123456789abcdef";
var i: usize = 0;
while (i < 36) : (i += 1) {
if (i == 8 or i == 13 or i == 18 or i == 23) {
buf[i] = '-';
} else if (i == 14) {
buf[i] = '4'; // Version 4
} else if (i == 19) {
// Variant: 8, 9, a, or b
const rand = std.crypto.random.int(u8);
buf[i] = hex_chars[(rand & 0x03) + 8];
} else {
const rand = std.crypto.random.int(u8);
buf[i] = hex_chars[rand & 0x0f];
}
}
return try allocator.dupe(u8, &buf);
}
/// Format experiment for JSON output
pub fn formatExperimentJson(
allocator: std.mem.Allocator,
id: []const u8,
name: []const u8,
lifecycle: []const u8,
created: []const u8,
) ![]const u8 {
var buf = std.ArrayList(u8).init(allocator);
defer buf.deinit();
const writer = buf.writer();
try writer.print(
"{{\"id\":\"{s}\",\"name\":\"{s}\",\"lifecycle\":\"{s}\",\"created\":\"{s}\"}}",
.{ id, name, lifecycle, created },
);
return buf.toOwnedSlice();
}
/// Format experiment list for JSON output
pub fn formatExperimentListJson(
allocator: std.mem.Allocator,
experiments: []const Experiment,
) ![]const u8 {
var buf = std.ArrayList(u8).init(allocator);
defer buf.deinit();
const writer = buf.writer();
try writer.writeAll("[");
for (experiments, 0..) |exp, i| {
if (i > 0) try writer.writeAll(",");
try writer.print(
"{{\"id\":\"{s}\",\"name\":\"{s}\",\"lifecycle\":\"{s}\",\"created\":\"{s}\"}}",
.{ exp.id, exp.name, exp.lifecycle, exp.created },
);
}
try writer.writeAll("]");
return buf.toOwnedSlice();
}
/// Experiment struct for shared use
pub const Experiment = struct {
id: []const u8,
name: []const u8,
lifecycle: []const u8,
created: []const u8,
};
/// Print experiment in text format
pub fn printExperimentText(exp: Experiment) void {
core.output.info(" {s} | {s} | {s} | {s}", .{
exp.id,
exp.name,
exp.lifecycle,
exp.created,
});
}
/// Format metric for JSON output
pub fn formatMetricJson(
allocator: std.mem.Allocator,
name: []const u8,
value: f64,
step: u32,
) ![]const u8 {
var buf = std.ArrayList(u8).init(allocator);
defer buf.deinit();
const writer = buf.writer();
try writer.print(
"{{\"name\":\"{s}\",\"value\":{d:.6},\"step\":{d}}}",
.{ name, value, step },
);
return buf.toOwnedSlice();
}
/// Validate metric value
pub fn validateMetricValue(value: f64) bool {
return !std.math.isNan(value) and !std.math.isInf(value);
}
/// Format run ID
pub fn formatRunId(allocator: std.mem.Allocator, experiment_id: []const u8, timestamp: i64) ![]const u8 {
var buf: [64]u8 = undefined;
const formatted = try std.fmt.bufPrint(&buf, "{s}_{d}", .{ experiment_id, timestamp });
return try allocator.dupe(u8, formatted);
}

135
cli/src/core/flags.zig Normal file
View file

@ -0,0 +1,135 @@
const std = @import("std");
/// Common flags supported by most commands
pub const CommonFlags = struct {
json: bool = false,
help: bool = false,
verbose: bool = false,
dry_run: bool = false,
};
/// Parse common flags from command arguments
/// Returns remaining non-flag arguments
pub fn parseCommon(allocator: std.mem.Allocator, args: []const []const u8, flags: *CommonFlags) !std.ArrayList([]const u8) {
var remaining = std.ArrayList([]const u8).initCapacity(allocator, args.len) catch |err| {
return err;
};
errdefer remaining.deinit(allocator);
var i: usize = 0;
while (i < args.len) : (i += 1) {
const arg = args[i];
if (std.mem.eql(u8, arg, "--json")) {
flags.json = true;
} else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
flags.help = true;
} else if (std.mem.eql(u8, arg, "--verbose") or std.mem.eql(u8, arg, "-v")) {
flags.verbose = true;
} else if (std.mem.eql(u8, arg, "--dry-run")) {
flags.dry_run = true;
} else if (std.mem.eql(u8, arg, "--")) {
// End of flags, rest are positional
i += 1;
while (i < args.len) : (i += 1) {
try remaining.append(allocator, args[i]);
}
break;
} else {
try remaining.append(allocator, arg);
}
}
return remaining;
}
/// Parse a key-value flag (--key=value or --key value)
pub fn parseKVFlag(args: []const []const u8, key: []const u8) ?[]const u8 {
const prefix = std.fmt.allocPrint(std.heap.page_allocator, "--{s}=", .{key}) catch return null;
defer std.heap.page_allocator.free(prefix);
for (args) |arg| {
if (std.mem.startsWith(u8, arg, prefix)) {
return arg[prefix.len..];
}
}
// Check for --key value format
var i: usize = 0;
const key_only = std.fmt.allocPrint(std.heap.page_allocator, "--{s}", .{key}) catch return null;
defer std.heap.page_allocator.free(key_only);
while (i < args.len) : (i += 1) {
if (std.mem.eql(u8, args[i], key_only)) {
if (i + 1 < args.len) {
return args[i + 1];
}
return null;
}
}
return null;
}
/// Parse a boolean flag
pub fn parseBoolFlag(args: []const []const u8, flag: []const u8) bool {
const full_flag = std.fmt.allocPrint(std.heap.page_allocator, "--{s}", .{flag}) catch return false;
defer std.heap.page_allocator.free(full_flag);
for (args) |arg| {
if (std.mem.eql(u8, arg, full_flag)) {
return true;
}
}
return false;
}
/// Parse numeric flag with default value
pub fn parseNumFlag(comptime T: type, args: []const []const u8, flag: []const u8, default: T) T {
const val_str = parseKVFlag(args, flag);
if (val_str) |s| {
return std.fmt.parseInt(T, s, 10) catch default;
}
return default;
}
/// Check if args contain any of the given flags
pub fn hasAnyFlag(args: []const []const u8, flags: []const []const u8) bool {
for (args) |arg| {
for (flags) |flag| {
if (std.mem.eql(u8, arg, flag)) {
return true;
}
}
}
return false;
}
/// Shift/pop first argument
pub fn shift(args: []const []const u8) ?[]const u8 {
if (args.len == 0) return null;
return args[0];
}
/// Get remaining arguments after first
pub fn rest(args: []const []const u8) []const []const u8 {
if (args.len <= 1) return &[]const u8{};
return args[1..];
}
/// Require subcommand, return error if missing
pub fn requireSubcommand(args: []const []const u8, comptime cmd_name: []const u8) ![]const u8 {
if (args.len == 0) {
std.log.err("Command '{s}' requires a subcommand", .{cmd_name});
return error.MissingSubcommand;
}
return args[0];
}
/// Match subcommand and return remaining args
pub fn matchSubcommand(args: []const []const u8, comptime sub: []const u8) ?[]const []const u8 {
if (args.len == 0) return null;
if (std.mem.eql(u8, args[0], sub)) {
return args[1..];
}
return null;
}

129
cli/src/core/output.zig Normal file
View file

@ -0,0 +1,129 @@
const std = @import("std");
const colors = @import("../utils/colors.zig");
/// Output mode for commands
pub const OutputMode = enum {
text,
json,
};
/// Global output mode - set by main based on --json flag
pub var global_mode: OutputMode = .text;
/// Initialize output mode from command flags
pub fn init(mode: OutputMode) void {
global_mode = mode;
}
/// Print error in appropriate format
pub fn errorMsg(comptime command: []const u8, message: []const u8) void {
switch (global_mode) {
.json => std.debug.print(
"{{\"success\":false,\"command\":\"{s}\",\"error\":\"{s}\"}}\n",
.{ command, message },
),
.text => colors.printError("{s}\n", .{message}),
}
}
/// Print error with additional details in appropriate format
pub fn errorMsgDetailed(comptime command: []const u8, message: []const u8, details: []const u8) void {
switch (global_mode) {
.json => std.debug.print(
"{{\"success\":false,\"command\":\"{s}\",\"error\":\"{s}\",\"details\":\"{s}\"}}\n",
.{ command, message, details },
),
.text => {
colors.printError("{s}\n", .{message});
std.debug.print("Details: {s}\n", .{details});
},
}
}
/// Print success response in appropriate format (no data)
pub fn success(comptime command: []const u8) void {
switch (global_mode) {
.json => std.debug.print("{{\"success\":true,\"command\":\"{s}\"}}\n", .{command}),
.text => {}, // No output for text mode on simple success
}
}
/// Print success with string data
pub fn successString(comptime command: []const u8, comptime data_key: []const u8, value: []const u8) void {
switch (global_mode) {
.json => std.debug.print(
"{{\"success\":true,\"command\":\"{s}\",\"data\":{{\"{s}\":\"{s}\"}}}}\n",
.{ command, data_key, value },
),
.text => std.debug.print("{s}\n", .{value}),
}
}
/// Print success with formatted string data
pub fn successFmt(comptime command: []const u8, comptime fmt_str: []const u8, args: anytype) void {
switch (global_mode) {
.json => {
// Use stack buffer to avoid allocation
var buf: [4096]u8 = undefined;
const msg = std.fmt.bufPrint(&buf, fmt_str, args) catch {
std.debug.print("{{\"success\":true,\"command\":\"{s}\",\"data\":null}}\n", .{command});
return;
};
std.debug.print("{{\"success\":true,\"command\":\"{s}\",\"data\":{s}}}\n", .{ command, msg });
},
.text => std.debug.print(fmt_str ++ "\n", args),
}
}
/// Print informational message (text mode only)
pub fn info(comptime fmt_str: []const u8, args: anytype) void {
if (global_mode == .text) {
std.debug.print(fmt_str ++ "\n", args);
}
}
/// Print usage information
pub fn usage(comptime cmd: []const u8, comptime usage_str: []const u8) void {
switch (global_mode) {
.json => std.debug.print(
"{{\"success\":false,\"command\":\"{s}\",\"error\":\"Invalid arguments\",\"usage\":\"{s}\"}}\n",
.{ cmd, usage_str },
),
.text => {
std.debug.print("Usage: {s}\n", .{usage_str});
},
}
}
/// Print unknown command error
pub fn unknownCommand(comptime command: []const u8, unknown: []const u8) void {
switch (global_mode) {
.json => std.debug.print(
"{{\"success\":false,\"command\":\"{s}\",\"error\":\"Unknown command: {s}\"}}\n",
.{ command, unknown },
),
.text => colors.printError("Unknown command: {s}\n", .{unknown}),
}
}
/// Print table header (text mode only)
pub fn tableHeader(comptime cols: []const []const u8) void {
if (global_mode == .json) return;
for (cols, 0..) |col, i| {
if (i > 0) std.debug.print("\t", .{});
std.debug.print("{s}", .{col});
}
std.debug.print("\n", .{});
}
/// Print table row (text mode only)
pub fn tableRow(values: []const []const u8) void {
if (global_mode == .json) return;
for (values, 0..) |val, i| {
if (i > 0) std.debug.print("\t", .{});
std.debug.print("{s}", .{val});
}
std.debug.print("\n", .{});
}

View file

@ -5,6 +5,12 @@ const c = @cImport({
@cInclude("sqlite3.h");
});
// SQLITE_TRANSIENT constant - use C wrapper to avoid Zig 0.15 C translation issue
extern fn fetchml_sqlite_transient() c.sqlite3_destructor_type;
fn sqliteTransient() c.sqlite3_destructor_type {
return fetchml_sqlite_transient();
}
// Schema for ML tracking tables
const SCHEMA =
\\ CREATE TABLE IF NOT EXISTS ml_experiments (
@ -16,12 +22,13 @@ const SCHEMA =
\\ );
\\ CREATE TABLE IF NOT EXISTS ml_runs (
\\ run_id TEXT PRIMARY KEY,
\\ experiment_id TEXT NOT NULL,
\\ experiment_id TEXT REFERENCES ml_experiments(experiment_id),
\\ name TEXT,
\\ status TEXT,
\\ status TEXT, -- RUNNING, FINISHED, FAILED, CANCELLED
\\ start_time DATETIME,
\\ end_time DATETIME,
\\ artifact_uri TEXT,
\\ pid INTEGER DEFAULT NULL,
\\ synced INTEGER DEFAULT 0
\\ );
\\ CREATE TABLE IF NOT EXISTS ml_metrics (
@ -51,9 +58,9 @@ pub const DB = struct {
/// Initialize database with WAL mode and schema
pub fn init(allocator: std.mem.Allocator, db_path: []const u8) !DB {
var db: ?*c.sqlite3 = null;
// Open database
const rc = c.sqlite3_open(db_path, &db);
const rc = c.sqlite3_open(db_path.ptr, &db);
if (rc != c.SQLITE_OK) {
std.log.err("Failed to open database: {s}", .{c.sqlite3_errmsg(db)});
return error.DBOpenFailed;
@ -82,7 +89,7 @@ pub const DB = struct {
}
const path_copy = try allocator.dupe(u8, db_path);
return DB{
.handle = db,
.path = path_copy,
@ -111,10 +118,10 @@ pub const DB = struct {
/// Execute a simple SQL statement
pub fn exec(self: DB, sql: []const u8) !void {
if (self.handle == null) return error.DBNotOpen;
var errmsg: [*c]u8 = null;
const rc = c.sqlite3_exec(self.handle, sql.ptr, null, null, &errmsg);
if (rc != c.SQLITE_OK) {
if (errmsg) |e| {
std.log.err("SQL error: {s}", .{e});
@ -127,15 +134,15 @@ pub const DB = struct {
/// Prepare a statement
pub fn prepare(self: DB, sql: []const u8) !?*c.sqlite3_stmt {
if (self.handle == null) return error.DBNotOpen;
var stmt: ?*c.sqlite3_stmt = null;
const rc = c.sqlite3_prepare_v2(self.handle, sql.ptr, @intCast(sql.len), &stmt, null);
if (rc != c.SQLITE_OK) {
std.log.err("Prepare failed: {s}", .{c.sqlite3_errmsg(self.handle)});
return error.PrepareFailed;
}
return stmt;
}
@ -149,7 +156,7 @@ pub const DB = struct {
/// Bind text parameter to statement
pub fn bindText(stmt: ?*c.sqlite3_stmt, idx: i32, value: []const u8) !void {
if (stmt == null) return error.InvalidStatement;
const rc = c.sqlite3_bind_text(stmt, idx, value.ptr, @intCast(value.len), c.SQLITE_TRANSIENT);
const rc = c.sqlite3_bind_text(stmt, idx, value.ptr, @intCast(value.len), sqliteTransient());
if (rc != c.SQLITE_OK) return error.BindFailed;
}
@ -207,15 +214,15 @@ pub const DB = struct {
pub fn generateUUID(allocator: std.mem.Allocator) ![]const u8 {
var buf: [36]u8 = undefined;
const hex_chars = "0123456789abcdef";
// Random bytes (simplified - in production use crypto RNG)
var bytes: [16]u8 = undefined;
std.crypto.random.bytes(&bytes);
// Set version (4) and variant bits
bytes[6] = (bytes[6] & 0x0f) | 0x40;
bytes[8] = (bytes[8] & 0x3f) | 0x80;
// Format as UUID string
var idx: usize = 0;
for (0..16) |i| {
@ -227,25 +234,28 @@ pub fn generateUUID(allocator: std.mem.Allocator) ![]const u8 {
buf[idx + 1] = hex_chars[bytes[i] & 0x0f];
idx += 2;
}
return try allocator.dupe(u8, &buf);
}
/// Get current timestamp as ISO8601 string
pub fn currentTimestamp(allocator: std.mem.Allocator) ![]const u8 {
const now = std.time.timestamp();
const tm = std.time.epoch.EpochSeconds{ .secs = @intCast(now) };
const dt = tm.getDaySeconds();
const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(now) };
const epoch_day = epoch_seconds.getEpochDay();
const year_day = epoch_day.calculateYearDay();
const month_day = year_day.calculateMonthDay();
const day_seconds = epoch_seconds.getDaySeconds();
var buf: [20]u8 = undefined;
const len = try std.fmt.bufPrint(&buf, "{d:0>4}-{d:0>2}-{d:0>2} {d:0>2}:{d:0>2}:{d:0>2}", .{
tm.getEpochDay().year,
tm.getEpochDay().month,
tm.getEpochDay().day,
dt.getHoursIntoDay(),
dt.getMinutesIntoHour(),
dt.getSecondsIntoMinute(),
year_day.year,
month_day.month.numeric(),
month_day.day_index + 1,
day_seconds.getHoursIntoDay(),
day_seconds.getMinutesIntoHour(),
day_seconds.getSecondsIntoMinute(),
});
return try allocator.dupe(u8, len);
}

22
cli/src/local.zig Normal file
View file

@ -0,0 +1,22 @@
//! Local mode operations module
//!
//! Provides implementations for CLI commands when running in local mode (SQLite).
//! These functions are called by the command routers in `src/commands/` when
//! `Context.isLocal()` returns true.
//!
//! ## Usage
//!
//! ```zig
//! const local = @import("../local.zig");
//!
//! if (ctx.isLocal()) {
//! return try local.experiment.create(ctx.allocator, name, artifact_path, json);
//! }
//! ```
//!
//! ## Module Structure
//!
//! - `experiment_ops.zig` - Experiment CRUD operations for SQLite
//! - Future: `run_ops.zig`, `metrics_ops.zig`, etc.
pub const experiment = @import("local/experiment_ops.zig");

View file

@ -0,0 +1,167 @@
const std = @import("std");
const db = @import("../db.zig");
const config = @import("../config.zig");
const core = @import("../core.zig");
pub const Experiment = struct {
id: []const u8,
name: []const u8,
lifecycle: []const u8,
created: []const u8,
};
/// Create a new experiment in local mode
pub fn create(allocator: std.mem.Allocator, name: []const u8, artifact_path: ?[]const u8, json: bool) !void {
// Load config
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
if (!cfg.isLocalMode()) {
if (json) {
core.output.errorMsg("experiment.create", "create only works in local mode (sqlite://)");
} else {
std.log.err("Error: experiment create only works in local mode (sqlite://)", .{});
}
return error.NotLocalMode;
}
// Get DB path
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
// Initialize DB
var database = try db.DB.init(allocator, db_path);
defer database.close();
// Generate experiment ID
const exp_id = try db.generateUUID(allocator);
defer allocator.free(exp_id);
// Insert experiment
const sql = "INSERT INTO ml_experiments (experiment_id, name, artifact_path) VALUES (?, ?, ?);";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
try db.DB.bindText(stmt, 1, exp_id);
try db.DB.bindText(stmt, 2, name);
try db.DB.bindText(stmt, 3, artifact_path orelse "");
_ = try db.DB.step(stmt);
database.checkpointOnExit();
if (json) {
std.debug.print("{{\"success\":true,\"command\":\"experiment.create\",\"data\":{{\"experiment_id\":\"{s}\",\"name\":\"{s}\"}}}}\n", .{ exp_id, name });
} else {
std.debug.print("Created experiment: {s} (ID: {s})\n", .{ name, exp_id });
}
}
/// Log a metric for a run in local mode
pub fn logMetric(
allocator: std.mem.Allocator,
run_id: []const u8,
name: []const u8,
value: f64,
step: i64,
json: bool,
) !void {
// Load config
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
// Get DB path
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
// Initialize DB
var database = try db.DB.init(allocator, db_path);
defer database.close();
// Insert metric
const sql = "INSERT INTO ml_metrics (run_id, key, value, step) VALUES (?, ?, ?, ?);";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
try db.DB.bindText(stmt, 1, run_id);
try db.DB.bindText(stmt, 2, name);
try db.DB.bindDouble(stmt, 3, value);
try db.DB.bindInt64(stmt, 4, step);
_ = try db.DB.step(stmt);
if (json) {
std.debug.print("{{\"success\":true,\"command\":\"experiment.log\",\"data\":{{\"run_id\":\"{s}\",\"metric\":{{\"name\":\"{s}\",\"value\":{d},\"step\":{d}}}}}}}\n", .{ run_id, name, value, step });
} else {
std.debug.print("Logged metric: {s} = {d:.4} (step {d})\n", .{ name, value, step });
}
}
/// List all experiments in local mode
pub fn list(allocator: std.mem.Allocator, json: bool) !void {
// Load config
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
// Get DB path
const db_path = try cfg.getDBPath(allocator);
defer allocator.free(db_path);
// Initialize DB
var database = try db.DB.init(allocator, db_path);
defer database.close();
// Query experiments
const sql = "SELECT experiment_id, name, lifecycle, created_at FROM ml_experiments ORDER BY created_at DESC;";
const stmt = try database.prepare(sql);
defer db.DB.finalize(stmt);
var experiments = std.ArrayList(Experiment).initCapacity(allocator, 10) catch |err| {
return err;
};
defer {
for (experiments.items) |exp| {
allocator.free(exp.id);
allocator.free(exp.name);
allocator.free(exp.lifecycle);
allocator.free(exp.created);
}
experiments.deinit(allocator);
}
while (try db.DB.step(stmt)) {
const id = try allocator.dupe(u8, db.DB.columnText(stmt, 0));
const name = try allocator.dupe(u8, db.DB.columnText(stmt, 1));
const lifecycle = try allocator.dupe(u8, db.DB.columnText(stmt, 2));
const created = try allocator.dupe(u8, db.DB.columnText(stmt, 3));
try experiments.append(allocator, .{ .id = id, .name = name, .lifecycle = lifecycle, .created = created });
}
if (json) {
std.debug.print("{{\"success\":true,\"command\":\"experiment.list\",\"data\":{{\"experiments\":[", .{});
for (experiments.items, 0..) |exp, idx| {
if (idx > 0) std.debug.print(",", .{});
std.debug.print("{{\"experiment_id\":\"{s}\",\"name\":\"{s}\",\"lifecycle\":\"{s}\",\"created_at\":\"{s}\"}}", .{ exp.id, exp.name, exp.lifecycle, exp.created });
}
std.debug.print("],\"total\":{d}}}}}\n", .{experiments.items.len});
} else {
if (experiments.items.len == 0) {
std.debug.print("No experiments found. Create one with: ml experiment create --name <name>\n", .{});
} else {
std.debug.print("\nExperiments:\n", .{});
std.debug.print("{s:-<60}\n", .{""});
for (experiments.items) |exp| {
std.debug.print("{s} | {s} | {s} | {s}\n", .{ exp.id, exp.name, exp.lifecycle, exp.created });
}
std.debug.print("\nTotal: {d} experiments\n", .{experiments.items.len});
}
}
}

356
cli/src/manifest.zig Normal file
View file

@ -0,0 +1,356 @@
const std = @import("std");
/// RunManifest represents a run manifest - identical schema between local and server
/// Schema compatibility is a hard requirement enforced here
pub const RunManifest = struct {
run_id: []const u8,
experiment: []const u8,
command: []const u8,
args: [][]const u8,
commit_id: ?[]const u8,
started_at: []const u8,
ended_at: ?[]const u8,
status: []const u8, // RUNNING, FINISHED, FAILED, CANCELLED
exit_code: ?i32,
params: std.StringHashMap([]const u8),
metrics_summary: ?std.StringHashMap(f64),
artifact_path: []const u8,
synced: bool,
pub fn init(allocator: std.mem.Allocator) RunManifest {
return .{
.run_id = "",
.experiment = "",
.command = "",
.args = &[_][]const u8{},
.commit_id = null,
.started_at = "",
.ended_at = null,
.status = "RUNNING",
.exit_code = null,
.params = std.StringHashMap([]const u8).init(allocator),
.metrics_summary = null,
.artifact_path = "",
.synced = false,
};
}
pub fn deinit(self: *RunManifest, allocator: std.mem.Allocator) void {
var params_iter = self.params.iterator();
while (params_iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
allocator.free(entry.value_ptr.*);
}
self.params.deinit();
if (self.metrics_summary) |*summary| {
var summary_iter = summary.iterator();
while (summary_iter.next()) |entry| {
allocator.free(entry.key_ptr.*);
}
summary.deinit();
}
for (self.args) |arg| {
allocator.free(arg);
}
allocator.free(self.args);
}
};
/// Write manifest to JSON file
pub fn writeManifest(manifest: RunManifest, path: []const u8) !void {
var file = try std.fs.cwd().createFile(path, .{});
defer file.close();
const writer = file.writer();
// Write JSON manually to avoid std.json complexity with hash maps
try writer.writeAll("{\n");
try writer.print(" \"run_id\": \"{s}\",\n", .{manifest.run_id});
try writer.print(" \"experiment\": \"{s}\",\n", .{manifest.experiment});
try writer.print(" \"command\": \"{s}\",\n", .{manifest.command});
// Args array
try writer.writeAll(" \"args\": [");
for (manifest.args, 0..) |arg, i| {
if (i > 0) try writer.writeAll(", ");
try writer.print("\"{s}\"", .{arg});
}
try writer.writeAll("],\n");
// Commit ID (optional)
if (manifest.commit_id) |cid| {
try writer.print(" \"commit_id\": \"{s}\",\n", .{cid});
} else {
try writer.writeAll(" \"commit_id\": null,\n");
}
try writer.print(" \"started_at\": \"{s}\",\n", .{manifest.started_at});
// Ended at (optional)
if (manifest.ended_at) |ended| {
try writer.print(" \"ended_at\": \"{s}\",\n", .{ended});
} else {
try writer.writeAll(" \"ended_at\": null,\n");
}
try writer.print(" \"status\": \"{s}\",\n", .{manifest.status});
// Exit code (optional)
if (manifest.exit_code) |code| {
try writer.print(" \"exit_code\": {d},\n", .{code});
} else {
try writer.writeAll(" \"exit_code\": null,\n");
}
// Params object
try writer.writeAll(" \"params\": {");
var params_first = true;
var params_iter = manifest.params.iterator();
while (params_iter.next()) |entry| {
if (!params_first) try writer.writeAll(", ");
params_first = false;
try writer.print("\"{s}\": \"{s}\"", .{ entry.key_ptr.*, entry.value_ptr.* });
}
try writer.writeAll("},\n");
// Metrics summary (optional)
if (manifest.metrics_summary) |summary| {
try writer.writeAll(" \"metrics_summary\": {");
var summary_first = true;
var summary_iter = summary.iterator();
while (summary_iter.next()) |entry| {
if (!summary_first) try writer.writeAll(", ");
summary_first = false;
try writer.print("\"{s}\": {d:.4}", .{ entry.key_ptr.*, entry.value_ptr.* });
}
try writer.writeAll("},\n");
} else {
try writer.writeAll(" \"metrics_summary\": null,\n");
}
try writer.print(" \"artifact_path\": \"{s}\",\n", .{manifest.artifact_path});
try writer.print(" \"synced\": {}", .{manifest.synced});
try writer.writeAll("\n}\n");
}
/// Read manifest from JSON file
pub fn readManifest(path: []const u8, allocator: std.mem.Allocator) !RunManifest {
var file = try std.fs.cwd().openFile(path, .{});
defer file.close();
const content = try file.readToEndAlloc(allocator, 1024 * 1024);
defer allocator.free(content);
const parsed = try std.json.parseFromSlice(std.json.Value, allocator, content, .{});
defer parsed.deinit();
if (parsed.value != .object) {
return error.InvalidManifest;
}
const root = parsed.value.object;
var manifest = RunManifest.init(allocator);
// Required fields
manifest.run_id = try getStringField(allocator, root, "run_id") orelse return error.MissingRunId;
manifest.experiment = try getStringField(allocator, root, "experiment") orelse return error.MissingExperiment;
manifest.command = try getStringField(allocator, root, "command") orelse return error.MissingCommand;
manifest.status = try getStringField(allocator, root, "status") orelse "RUNNING";
manifest.started_at = try getStringField(allocator, root, "started_at") orelse "";
// Optional fields
manifest.ended_at = try getStringField(allocator, root, "ended_at");
manifest.commit_id = try getStringField(allocator, root, "commit_id");
manifest.artifact_path = try getStringField(allocator, root, "artifact_path") orelse "";
// Synced boolean
if (root.get("synced")) |synced_val| {
if (synced_val == .bool) {
manifest.synced = synced_val.bool;
}
}
// Exit code
if (root.get("exit_code")) |exit_val| {
if (exit_val == .integer) {
manifest.exit_code = @intCast(exit_val.integer);
}
}
// Args array
if (root.get("args")) |args_val| {
if (args_val == .array) {
const args = try allocator.alloc([]const u8, args_val.array.items.len);
for (args_val.array.items, 0..) |arg, i| {
if (arg == .string) {
args[i] = try allocator.dupe(u8, arg.string);
}
}
manifest.args = args;
}
}
// Params object
if (root.get("params")) |params_val| {
if (params_val == .object) {
var params_iter = params_val.object.iterator();
while (params_iter.next()) |entry| {
if (entry.value_ptr.* == .string) {
const key = try allocator.dupe(u8, entry.key_ptr.*);
const value = try allocator.dupe(u8, entry.value_ptr.*.string);
try manifest.params.put(key, value);
}
}
}
}
// Metrics summary
if (root.get("metrics_summary")) |metrics_val| {
if (metrics_val == .object) {
var summary = std.StringHashMap(f64).init(allocator);
var metrics_iter = metrics_val.object.iterator();
while (metrics_iter.next()) |entry| {
const val = entry.value_ptr.*;
if (val == .float) {
const key = try allocator.dupe(u8, entry.key_ptr.*);
try summary.put(key, val.float);
} else if (val == .integer) {
const key = try allocator.dupe(u8, entry.key_ptr.*);
try summary.put(key, @floatFromInt(val.integer));
}
}
manifest.metrics_summary = summary;
}
}
return manifest;
}
/// Get string field from JSON object, duplicating the string
fn getStringField(allocator: std.mem.Allocator, obj: std.json.ObjectMap, field: []const u8) !?[]const u8 {
const val = obj.get(field) orelse return null;
if (val != .string) return null;
return try allocator.dupe(u8, val.string);
}
/// Update manifest status and ended_at on run completion
pub fn updateManifestStatus(path: []const u8, status: []const u8, exit_code: ?i32, allocator: std.mem.Allocator) !void {
var manifest = try readManifest(path, allocator);
defer manifest.deinit(allocator);
manifest.status = status;
manifest.exit_code = exit_code;
// Set ended_at to current timestamp
const now = std.time.timestamp();
const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(now) };
const epoch_day = epoch_seconds.getEpochDay();
const year_day = epoch_day.calculateYearDay();
const month_day = year_day.calculateMonthDay();
const day_seconds = epoch_seconds.getDaySeconds();
var buf: [30]u8 = undefined;
const timestamp = std.fmt.bufPrint(&buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{
year_day.year,
month_day.month.numeric(),
month_day.day_index + 1,
day_seconds.getHoursIntoDay(),
day_seconds.getMinutesIntoHour(),
day_seconds.getSecondsIntoMinute(),
}) catch unreachable;
manifest.ended_at = try allocator.dupe(u8, timestamp);
try writeManifest(manifest, path);
}
/// Mark manifest as synced
pub fn markManifestSynced(path: []const u8, allocator: std.mem.Allocator) !void {
var manifest = try readManifest(path, allocator);
defer manifest.deinit(allocator);
manifest.synced = true;
try writeManifest(manifest, path);
}
/// Build manifest path from experiment and run_id
pub fn buildManifestPath(artifact_path: []const u8, experiment: []const u8, run_id: []const u8, allocator: std.mem.Allocator) ![]const u8 {
return std.fs.path.join(allocator, &[_][]const u8{
artifact_path,
experiment,
run_id,
"run_manifest.json",
});
}
/// Resolve manifest path from input (path, run_id, or task_id)
pub fn resolveManifestPath(input: []const u8, base_path: ?[]const u8, allocator: std.mem.Allocator) ![]const u8 {
// If input is a valid file path, use it directly
if (std.fs.path.isAbsolute(input)) {
if (std.fs.cwd().access(input, .{})) {
// It's a file or directory
const stat = std.fs.cwd().statFile(input) catch {
// It's a directory, append manifest name
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
};
_ = stat;
// It's a file, use as-is
return try allocator.dupe(u8, input);
} else |_| {}
}
// Try relative path
if (std.fs.cwd().access(input, .{})) {
const stat = std.fs.cwd().statFile(input) catch {
return std.fs.path.join(allocator, &[_][]const u8{ input, "run_manifest.json" });
};
_ = stat;
return try allocator.dupe(u8, input);
} else |_| {}
// Search by run_id in base_path
if (base_path) |bp| {
return try findManifestById(bp, input, allocator);
}
return error.ManifestNotFound;
}
/// Find manifest by run_id in base path
fn findManifestById(base_path: []const u8, id: []const u8, allocator: std.mem.Allocator) ![]const u8 {
// Look in experiments/ subdirectories
var experiments_dir = std.fs.cwd().openDir(base_path, .{ .iterate = true }) catch {
return error.ManifestNotFound;
};
defer experiments_dir.close();
var iter = experiments_dir.iterate();
while (try iter.next()) |entry| {
if (entry.kind != .directory) continue;
// Check if this experiment has a subdirectory matching the run_id
const run_dir_path = try std.fs.path.join(allocator, &[_][]const u8{
base_path,
entry.name,
id,
});
defer allocator.free(run_dir_path);
const manifest_path = try std.fs.path.join(allocator, &[_][]const u8{
run_dir_path,
"run_manifest.json",
});
if (std.fs.cwd().access(manifest_path, .{})) {
return manifest_path;
} else |_| {
allocator.free(manifest_path);
}
}
return error.ManifestNotFound;
}

126
cli/src/mode.zig Normal file
View file

@ -0,0 +1,126 @@
const std = @import("std");
const Config = @import("config.zig").Config;
const ws = @import("net/ws/client.zig");
/// Mode represents the operating mode of the CLI
pub const Mode = enum {
/// Local/offline mode - runs execute locally, tracking to SQLite
offline,
/// Online/runner mode - jobs queue to remote server
online,
};
/// DetectionResult includes the mode and any warning messages
pub const DetectionResult = struct {
mode: Mode,
warning: ?[]const u8,
};
/// Detect mode based on configuration and environment
/// Priority order (CLI checked on every command):
/// 1. FETCHML_LOCAL=1 env var local (forced, skip ping)
/// 2. force_local=true in config local (forced, skip ping)
/// 3. cfg.Host == "" local (not configured)
/// 4. API ping within 2s timeout runner mode
/// - timeout / refused local (fallback, log once per session)
/// - 401/403 local (fallback, warn once about auth)
pub fn detect(allocator: std.mem.Allocator, cfg: Config) !DetectionResult {
// Priority 1: FETCHML_LOCAL env var
if (std.posix.getenv("FETCHML_LOCAL")) |val| {
if (std.mem.eql(u8, val, "1")) {
return .{ .mode = .offline, .warning = null };
}
}
// Priority 2: force_local in config
if (cfg.force_local) {
return .{ .mode = .offline, .warning = null };
}
// Priority 3: No host configured
if (cfg.worker_host.len == 0) {
return .{ .mode = .offline, .warning = null };
}
// Priority 4: API ping with 2s timeout
const ping_result = try pingServer(allocator, cfg, 2000);
return switch (ping_result) {
.success => .{ .mode = .online, .warning = null },
.timeout => .{ .mode = .offline, .warning = "Server unreachable, falling back to local mode" },
.refused => .{ .mode = .offline, .warning = "Server connection refused, falling back to local mode" },
.auth_error => .{ .mode = .offline, .warning = "Authentication failed, falling back to local mode" },
};
}
/// PingResult represents the outcome of a server ping
const PingResult = enum {
success,
timeout,
refused,
auth_error,
};
/// Ping the server with a timeout
fn pingServer(allocator: std.mem.Allocator, cfg: Config, timeout_ms: u64) !PingResult {
const ws_url = try cfg.getWebSocketUrl(allocator);
defer allocator.free(ws_url);
// Try to connect with timeout
const start_time = std.time.milliTimestamp();
const connection = ws.Client.connect(allocator, ws_url, cfg.api_key) catch |err| {
const elapsed = std.time.milliTimestamp() - start_time;
switch (err) {
error.ConnectionTimedOut => return .timeout,
error.ConnectionRefused => return .refused,
error.AuthenticationFailed => return .auth_error,
else => {
// If we've exceeded timeout, treat as timeout
if (elapsed >= @as(i64, @intCast(timeout_ms))) {
return .timeout;
}
return .refused;
},
}
};
defer connection.close();
// Send a ping message and wait for response
try connection.sendPing();
// Wait for pong with remaining timeout
const remaining_timeout = timeout_ms - @as(u64, @intCast(std.time.milliTimestamp() - start_time));
if (remaining_timeout == 0) {
return .timeout;
}
// Try to receive pong (or any message indicating server is alive)
const response = connection.receiveMessageTimeout(allocator, remaining_timeout) catch |err| {
switch (err) {
error.ConnectionTimedOut => return .timeout,
else => return .refused,
}
};
defer allocator.free(response);
return .success;
}
/// Check if mode is online
pub fn isOnline(mode: Mode) bool {
return mode == .online;
}
/// Check if mode is offline
pub fn isOffline(mode: Mode) bool {
return mode == .offline;
}
/// Require online mode, returning error if offline
pub fn requireOnline(mode: Mode, command_name: []const u8) !void {
if (mode == .offline) {
std.log.err("{s} requires server connection", .{command_name});
return error.RequiresServer;
}
}

22
cli/src/server.zig Normal file
View file

@ -0,0 +1,22 @@
//! Server mode operations module
//!
//! Provides implementations for CLI commands when running in server mode (WebSocket).
//! These functions are called by the command routers in `src/commands/` when
//! `Context.isServer()` returns true.
//!
//! ## Usage
//!
//! ```zig
//! const server = @import("../server.zig");
//!
//! if (ctx.isServer()) {
//! return try server.experiment.list(ctx.allocator, ctx.json_output);
//! }
//! ```
//!
//! ## Module Structure
//!
//! - `experiment_api.zig` - Experiment API operations via WebSocket
//! - Future: `run_api.zig`, `metrics_api.zig`, etc.
pub const experiment = @import("server/experiment_api.zig");

View file

@ -0,0 +1,124 @@
const std = @import("std");
const ws = @import("../net/ws/client.zig");
const protocol = @import("../net/protocol.zig");
const config = @import("../config.zig");
const crypto = @import("../utils/crypto.zig");
const core = @import("../core.zig");
/// Log a metric to server mode
pub fn logMetric(
allocator: std.mem.Allocator,
commit_id: []const u8,
name: []const u8,
value: f64,
step: u32,
json: bool,
) !void {
const cfg = try config.Config.load(allocator);
defer {
var mut_cfg = cfg;
mut_cfg.deinit(allocator);
}
const api_key_hash = try crypto.hashApiKey(allocator, cfg.api_key);
defer allocator.free(api_key_hash);
const ws_url = try cfg.getWebSocketUrl(allocator);
defer allocator.free(ws_url);
var client = try ws.Client.connect(allocator, ws_url, cfg.api_key);
defer client.close();
try client.sendLogMetric(api_key_hash, commit_id, name, value, step);
if (json) {
const message = try client.receiveMessage(allocator);
defer allocator.free(message);
const packet = protocol.ResponsePacket.deserialize(message, allocator) catch {
std.debug.print(
"{{\"success\":true,\"command\":\"experiment.log\",\"data\":{{\"commit_id\":\"{s}\",\"metric\":{{\"name\":\"{s}\",\"value\":{d},\"step\":{d}}},\"message\":\"{s}\"}}}}\n",
.{ commit_id, name, value, step, message },
);
return;
};
defer packet.deinit(allocator);
switch (packet.packet_type) {
.success => {
std.debug.print(
"{{\"success\":true,\"command\":\"experiment.log\",\"data\":{{\"commit_id\":\"{s}\",\"metric\":{{\"name\":\"{s}\",\"value\":{d},\"step\":{d}}},\"message\":\"{s}\"}}}}\n",
.{ commit_id, name, value, step, message },
);
return;
},
else => {},
}
} else {
try client.receiveAndHandleResponse(allocator, "Log metric");
std.debug.print("Metric logged successfully!\n", .{});
std.debug.print("Commit ID: {s}\n", .{commit_id});
std.debug.print("Metric: {s} = {d:.4} (step {d})\n", .{ name, value, step });
}
}
/// List experiments from server mode
pub fn list(allocator: std.mem.Allocator, json: bool) !void {
const entries = @import("../utils/history.zig").loadEntries(allocator) catch |err| {
if (json) {
const details = try std.fmt.allocPrint(allocator, "{}", .{err});
defer allocator.free(details);
core.output.errorMsgDetailed("experiment.list", "Failed to read experiment history", details);
} else {
std.log.err("Failed to read experiment history: {}", .{err});
}
return err;
};
defer @import("../utils/history.zig").freeEntries(allocator, entries);
if (entries.len == 0) {
if (json) {
std.debug.print("{{\"success\":true,\"command\":\"experiment.list\",\"data\":{{\"experiments\":[],\"total\":0,\"message\":\"No experiments recorded yet. Use `ml queue` to submit one.\"}}}}\n", .{});
} else {
std.debug.print("No experiments recorded yet. Use `ml queue` to submit one.\n", .{});
}
return;
}
if (json) {
std.debug.print("{{\"success\":true,\"command\":\"experiment.list\",\"data\":{{\"experiments\":[", .{});
var idx: usize = 0;
while (idx < entries.len) : (idx += 1) {
const entry = entries[entries.len - idx - 1];
if (idx > 0) {
std.debug.print(",", .{});
}
std.debug.print(
"{{\"alias\":\"{s}\",\"commit_id\":\"{s}\",\"queued_at\":{d}}}",
.{
entry.job_name,
entry.commit_id,
entry.queued_at,
},
);
}
std.debug.print("],\"total\":{d}", .{entries.len});
std.debug.print("}}}}\n", .{});
} else {
std.debug.print("\nRecent Experiments (latest first):\n", .{});
std.debug.print("---------------------------------\n", .{});
const max_display = if (entries.len > 20) 20 else entries.len;
var idx: usize = 0;
while (idx < max_display) : (idx += 1) {
const entry = entries[entries.len - idx - 1];
std.debug.print("{d:2}) Alias: {s}\n", .{ idx + 1, entry.job_name });
std.debug.print(" Commit: {s}\n", .{entry.commit_id});
std.debug.print(" Queued: {d}\n\n", .{entry.queued_at});
}
if (entries.len > max_display) {
std.debug.print("...and {d} more\n", .{entries.len - max_display});
}
}
}

View file

@ -0,0 +1,122 @@
//! Native library bridge for high-performance operations
//!
//! Provides Zig bindings to the native/ C++ libraries:
//! - dataset_hash: SIMD-accelerated SHA256 hashing
//! - queue_index: High-performance task queue
//!
//! The native libraries provide:
//! - 78% syscall reduction for hashing
//! - 21,000x faster queue operations
//! - Hardware acceleration (SHA-NI, ARMv8 crypto)
const std = @import("std");
// Link against native dataset_hash library
const c = @cImport({
@cInclude("dataset_hash.h");
});
/// Opaque handle for native hash context
pub const HashContext = opaque {};
/// Initialize hash context with thread pool
/// num_threads: 0 = auto-detect (capped at 8)
pub fn initHashContext(num_threads: u32) ?*HashContext {
return @ptrCast(c.fh_init(num_threads));
}
/// Cleanup hash context
pub fn cleanupHashContext(ctx: ?*HashContext) void {
if (ctx) |ptr| {
c.fh_cleanup(@ptrCast(ptr));
}
}
/// Hash a single file using native SIMD implementation
/// Returns hex string (caller must free with freeString)
pub fn hashFile(ctx: ?*HashContext, path: []const u8) ![]const u8 {
const c_path = try std.heap.c_allocator.dupeZ(u8, path);
defer std.heap.c_allocator.free(c_path);
const result = c.fh_hash_file(@ptrCast(ctx), c_path.ptr);
if (result == null) {
return error.HashFailed;
}
defer c.fh_free_string(result);
const len = std.mem.len(result);
return try std.heap.c_allocator.dupe(u8, result[0..len]);
}
/// Hash entire directory (parallel, combined result)
pub fn hashDirectory(ctx: ?*HashContext, path: []const u8) ![]const u8 {
const c_path = try std.heap.c_allocator.dupeZ(u8, path);
defer std.heap.c_allocator.free(c_path);
const result = c.fh_hash_directory(@ptrCast(ctx), c_path.ptr);
if (result == null) {
return error.HashFailed;
}
defer c.fh_free_string(result);
const len = std.mem.len(result);
return try std.heap.c_allocator.dupe(u8, result[0..len]);
}
/// Free string returned by native library
pub fn freeString(str: []const u8) void {
std.heap.c_allocator.free(str);
}
/// Hash data using native library (convenience function)
pub fn hashData(data: []const u8) ![64]u8 {
// Write data to temp file and hash it
const tmp_path = try std.fs.path.join(std.heap.c_allocator, &.{ "/tmp", "fetchml_hash_tmp" });
defer std.heap.c_allocator.free(tmp_path);
try std.fs.cwd().writeFile(.{
.sub_path = tmp_path,
.data = data,
});
defer std.fs.cwd().deleteFile(tmp_path) catch {};
const ctx = initHashContext(0) orelse return error.InitFailed;
defer cleanupHashContext(ctx);
const hash_str = try hashFile(ctx, tmp_path);
defer freeString(hash_str);
// Parse hex string to bytes
var result: [64]u8 = undefined;
@memcpy(&result, hash_str[0..64]);
return result;
}
/// Benchmark native vs standard hashing
pub fn benchmark(allocator: std.mem.Allocator, path: []const u8, iterations: u32) !void {
const ctx = initHashContext(0) orelse {
std.debug.print("Failed to initialize native hash context\n", .{});
return;
};
defer cleanupHashContext(ctx);
var timer = try std.time.Timer.start();
// Warm up
_ = try hashFile(ctx, path);
// Benchmark native
timer.reset();
for (0..iterations) |_| {
const hash = try hashFile(ctx, path);
freeString(hash);
}
const native_time = timer.read();
std.debug.print("Native SIMD SHA256: {} ms for {d} iterations\n", .{
native_time / std.time.ns_per_ms,
iterations,
});
_ = allocator; // Reserved for future comparison with Zig implementation
}