const deps = @import("deps.zig"); const std = deps.std; const crypto = deps.crypto; const io = deps.io; const log = deps.log; const protocol = deps.protocol; const resolve = @import("resolve.zig"); const handshake = @import("handshake.zig"); const frame = @import("frame.zig"); const response = @import("response.zig"); const response_handlers = @import("response_handlers.zig"); const opcode = @import("opcode.zig"); const utils = @import("utils.zig"); /// Helper for building WebSocket binary messages const MessageBuilder = struct { buffer: []u8, offset: usize, allocator: std.mem.Allocator, pub fn init(allocator: std.mem.Allocator, total_len: usize) !MessageBuilder { const buffer = try allocator.alloc(u8, total_len); return MessageBuilder{ .buffer = buffer, .offset = 0, .allocator = allocator, }; } pub fn deinit(self: *MessageBuilder) void { self.allocator.free(self.buffer); } pub fn writeOpcode(self: *MessageBuilder, op: opcode.Opcode) void { self.buffer[self.offset] = @intFromEnum(op); self.offset += 1; } pub fn writeBytes(self: *MessageBuilder, data: []const u8) void { @memcpy(self.buffer[self.offset .. self.offset + data.len], data); self.offset += data.len; } pub fn writeU8(self: *MessageBuilder, value: u8) void { self.buffer[self.offset] = value; self.offset += 1; } pub fn writeU16(self: *MessageBuilder, value: u16) void { std.mem.writeInt(u16, self.buffer[self.offset .. self.offset + 2][0..2], value, .big); self.offset += 2; } pub fn writeU32(self: *MessageBuilder, value: u32) void { std.mem.writeInt(u32, self.buffer[self.offset .. self.offset + 4][0..4], value, .big); self.offset += 4; } pub fn writeU64(self: *MessageBuilder, value: u64) void { std.mem.writeInt(u64, self.buffer[self.offset .. self.offset + 8][0..8], value, .big); self.offset += 8; } pub fn writeStringU8(self: *MessageBuilder, str: []const u8) void { self.writeU8(@intCast(str.len)); if (str.len > 0) { self.writeBytes(str); } } pub fn writeStringU16(self: *MessageBuilder, str: []const u8) void { self.writeU16(@intCast(str.len)); if (str.len > 0) { self.writeBytes(str); } } pub fn send(self: *MessageBuilder, stream: std.net.Stream) !void { try frame.sendWebSocketFrame(stream, self.buffer); } }; /// WebSocket client for binary protocol communication pub const Client = struct { allocator: std.mem.Allocator, stream: ?std.net.Stream, host: []const u8, port: u16, is_tls: bool = false, pub fn formatPrewarmFromStatusRoot(allocator: std.mem.Allocator, root: std.json.ObjectMap) !?[]u8 { return response.formatPrewarmFromStatusRoot(allocator, root); } pub fn connect(allocator: std.mem.Allocator, url: []const u8, api_key: []const u8) !Client { // Detect TLS const is_tls = std.mem.startsWith(u8, url, "wss://"); // Parse URL (simplified - assumes ws://host:port/path or wss://host:port/path) const host_start = std.mem.indexOf(u8, url, "//") orelse return error.InvalidURL; const host_port_start = host_start + 2; const path_start = std.mem.indexOfPos(u8, url, host_port_start, "/") orelse url.len; const colon_pos = std.mem.indexOfPos(u8, url, host_port_start, ":"); const host_end = blk: { if (colon_pos) |pos| { if (pos < path_start) break :blk pos; } break :blk path_start; }; const host = url[host_port_start..host_end]; var port: u16 = if (is_tls) 9101 else 9100; // default ports if (colon_pos) |pos| { if (pos < path_start) { const port_start = pos + 1; const port_end = std.mem.indexOfPos(u8, url, port_start, "/") orelse url.len; const port_str = url[port_start..port_end]; port = try std.fmt.parseInt(u16, port_str, 10); } } // Connect to server const stream = try std.net.tcpConnectToAddress(try resolve.resolveHostAddress(allocator, host, port)); // For TLS, we'd need to wrap the stream with TLS // For now, we'll just support ws:// and document wss:// requires additional setup if (is_tls) { // TODO(context): Implement native wss:// support by introducing a transport abstraction // (raw TCP vs TLS client stream), performing TLS handshake + certificate verification, and updating // handshake/frame read+write helpers to operate on the chosen transport. std.log.warn("TLS (wss://) support requires additional TLS library integration", .{}); return error.TLSNotSupported; } // Perform WebSocket handshake try handshake.handshake(allocator, stream, host, url, api_key); return Client{ .allocator = allocator, .stream = stream, .host = try allocator.dupe(u8, host), .port = port, .is_tls = is_tls, }; } /// Connect to WebSocket server with retry logic pub fn connectWithRetry(allocator: std.mem.Allocator, url: []const u8, api_key: []const u8, max_retries: u32) !Client { var retry_count: u32 = 0; var last_error: anyerror = error.ConnectionFailed; while (retry_count < max_retries) { const client = connect(allocator, url, api_key) catch |err| { last_error = err; retry_count += 1; if (retry_count < max_retries) { const delay_ms = @min(1000 * retry_count, 5000); // Exponential backoff, max 5s log.warn("Connection failed (attempt {d}/{d}), retrying in {d}s...\n", .{ retry_count, max_retries, delay_ms / 1000 }); std.Thread.sleep(@as(u64, delay_ms) * std.time.ns_per_ms); } continue; }; if (retry_count > 0) { log.success("Connected successfully after {d} attempts\n", .{retry_count + 1}); } return client; } return last_error; } /// Disconnect from WebSocket server (closes stream only) pub fn disconnect(self: *Client) void { if (self.stream) |stream| { stream.close(); self.stream = null; } } /// Fully close client - disconnects stream and frees host memory pub fn close(self: *Client) void { self.disconnect(); if (self.host.len > 0) { self.allocator.free(self.host); } } // Validation helpers fn validateApiKeyHash(api_key_hash: []const u8) error{InvalidApiKeyHash}!void { if (api_key_hash.len != 16) return error.InvalidApiKeyHash; } fn validateCommitId(commit_id: []const u8) error{InvalidCommitId}!void { if (commit_id.len != 20) return error.InvalidCommitId; } fn validateJobName(job_name: []const u8) error{JobNameTooLong}!void { if (job_name.len == 0 or job_name.len > 255) return error.JobNameTooLong; } fn getStream(self: *Client) error{NotConnected}!std.net.Stream { return self.stream orelse error.NotConnected; } pub fn sendValidateRequestCommit(self: *Client, api_key_hash: []const u8, commit_id: []const u8) !void { const stream = try self.getStream(); try validateApiKeyHash(api_key_hash); try validateCommitId(commit_id); var builder = try MessageBuilder.init(self.allocator, 1 + 16 + 1 + 1 + 20); defer builder.deinit(); builder.writeOpcode(opcode.Opcode.validate_request); builder.writeBytes(api_key_hash); builder.writeU8(@intFromEnum(opcode.ValidateTargetType.commit_id)); builder.writeU8(20); builder.writeBytes(commit_id); try builder.send(stream); } pub fn sendQueryJobByCommit(self: *Client, job_name: []const u8, commit_id: []const u8, api_key_hash: []const u8) !void { const stream = try self.getStream(); try validateApiKeyHash(api_key_hash); try validateCommitId(commit_id); try validateJobName(job_name); // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [job_name_len: u8] [job_name: var] [commit_id: 20 bytes] const total_len = 1 + 16 + 1 + job_name.len + 20; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.query_job); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; @memcpy(buffer[offset .. offset + 20], commit_id); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendListJupyterPackages(self: *Client, name: []const u8, api_key_hash: []const u8) !void { const stream = try self.getStream(); try validateApiKeyHash(api_key_hash); if (name.len > 255) return error.NameTooLong; var builder = try MessageBuilder.init(self.allocator, 1 + 16 + 1 + name.len); defer builder.deinit(); builder.writeOpcode(opcode.list_jupyter_packages); builder.writeBytes(api_key_hash); builder.writeStringU8(name); try builder.send(stream); } pub fn sendSetRunNarrative( self: *Client, job_name: []const u8, patch_json: []const u8, api_key_hash: []const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (job_name.len == 0 or job_name.len > 255) return error.JobNameTooLong; if (patch_json.len == 0 or patch_json.len > 0xFFFF) return error.PayloadTooLarge; // [opcode] // [api_key_hash:16] // [job_name_len:1][job_name] // [patch_len:2][patch_json] const total_len = 1 + 16 + 1 + job_name.len + 2 + patch_json.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.set_run_narrative); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @as(u8, @intCast(job_name.len)); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; std.mem.writeInt(u16, buffer[offset .. offset + 2][0..2], @as(u16, @intCast(patch_json.len)), .big); offset += 2; @memcpy(buffer[offset .. offset + patch_json.len], patch_json); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendSetRunPrivacy( self: *Client, job_name: []const u8, patch_json: []const u8, api_key_hash: []const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (job_name.len == 0 or job_name.len > 255) return error.JobNameTooLong; if (patch_json.len == 0 or patch_json.len > 0xFFFF) return error.PayloadTooLarge; // [opcode] // [api_key_hash:16] // [job_name_len:1][job_name] // [patch_len:2][patch_json] const total_len = 1 + 16 + 1 + job_name.len + 2 + patch_json.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.set_run_privacy); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @as(u8, @intCast(job_name.len)); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; std.mem.writeInt(u16, buffer[offset .. offset + 2][0..2], @as(u16, @intCast(patch_json.len)), .big); offset += 2; @memcpy(buffer[offset .. offset + patch_json.len], patch_json); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendAnnotateRun( self: *Client, job_name: []const u8, author: []const u8, note: []const u8, api_key_hash: []const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (job_name.len == 0 or job_name.len > 255) return error.JobNameTooLong; if (author.len > 255) return error.PayloadTooLarge; if (note.len == 0 or note.len > 0xFFFF) return error.PayloadTooLarge; // [opcode] // [api_key_hash:16] // [job_name_len:1][job_name] // [author_len:1][author] // [note_len:2][note] const total_len = 1 + 16 + 1 + job_name.len + 1 + author.len + 2 + note.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.annotate_run); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @as(u8, @intCast(job_name.len)); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; buffer[offset] = @as(u8, @intCast(author.len)); offset += 1; if (author.len > 0) { @memcpy(buffer[offset .. offset + author.len], author); } offset += author.len; std.mem.writeInt(u16, buffer[offset .. offset + 2][0..2], @as(u16, @intCast(note.len)), .big); offset += 2; @memcpy(buffer[offset .. offset + note.len], note); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJobWithArgsNoteAndResources( self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8, args: []const u8, note: []const u8, force: bool, cpu: u8, memory_gb: u8, gpu: u8, gpu_memory: ?[]const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; if (args.len > 0xFFFF) return error.PayloadTooLarge; if (note.len > 0xFFFF) return error.PayloadTooLarge; const gpu_mem = gpu_memory orelse ""; if (gpu_mem.len > 255) return error.PayloadTooLarge; // [opcode] // [api_key_hash] // [commit_id] // [priority] // [job_name_len][job_name] // [args_len:2][args] // [note_len:2][note] // [force:1] // [cpu][memory_gb][gpu][gpu_mem_len][gpu_mem] const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len + 2 + args.len + 2 + note.len + 1 + 4 + gpu_mem.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job_with_note); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; buffer[offset] = @intCast((args.len >> 8) & 0xFF); buffer[offset + 1] = @intCast(args.len & 0xFF); offset += 2; if (args.len > 0) { @memcpy(buffer[offset .. offset + args.len], args); offset += args.len; } buffer[offset] = @intCast((note.len >> 8) & 0xFF); buffer[offset + 1] = @intCast(note.len & 0xFF); offset += 2; if (note.len > 0) { @memcpy(buffer[offset .. offset + note.len], note); offset += note.len; } // Force flag buffer[offset] = if (force) 0x01 else 0x00; offset += 1; buffer[offset] = cpu; buffer[offset + 1] = memory_gb; buffer[offset + 2] = gpu; buffer[offset + 3] = @intCast(gpu_mem.len); offset += 4; if (gpu_mem.len > 0) { @memcpy(buffer[offset .. offset + gpu_mem.len], gpu_mem); } try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJobWithArgsAndResources( self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8, args: []const u8, force: bool, cpu: u8, memory_gb: u8, gpu: u8, gpu_memory: ?[]const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; if (args.len > 0xFFFF) return error.PayloadTooLarge; const gpu_mem = gpu_memory orelse ""; if (gpu_mem.len > 255) return error.PayloadTooLarge; // [opcode] // [api_key_hash] // [commit_id] // [priority] // [job_name_len][job_name] // [args_len:2][args] // [force:1] // [cpu][memory_gb][gpu][gpu_mem_len][gpu_mem] const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len + 2 + args.len + 1 + 4 + gpu_mem.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job_with_args); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; buffer[offset] = @intCast((args.len >> 8) & 0xFF); buffer[offset + 1] = @intCast(args.len & 0xFF); offset += 2; if (args.len > 0) { @memcpy(buffer[offset .. offset + args.len], args); offset += args.len; } // Force flag buffer[offset] = if (force) 0x01 else 0x00; offset += 1; buffer[offset] = cpu; buffer[offset + 1] = memory_gb; buffer[offset + 2] = gpu; buffer[offset + 3] = @intCast(gpu_mem.len); offset += 4; if (gpu_mem.len > 0) { @memcpy(buffer[offset .. offset + gpu_mem.len], gpu_mem); } try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJobWithSnapshotAndResources( self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8, snapshot_id: []const u8, snapshot_sha256: []const u8, cpu: u8, memory_gb: u8, gpu: u8, gpu_memory: ?[]const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; if (snapshot_id.len == 0 or snapshot_id.len > 255) return error.PayloadTooLarge; if (snapshot_sha256.len == 0 or snapshot_sha256.len > 255) return error.PayloadTooLarge; const gpu_mem = gpu_memory orelse ""; if (gpu_mem.len > 255) return error.PayloadTooLarge; const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len + 1 + snapshot_id.len + 1 + snapshot_sha256.len + 4 + gpu_mem.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job_with_snapshot); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; buffer[offset] = @intCast(snapshot_id.len); offset += 1; @memcpy(buffer[offset .. offset + snapshot_id.len], snapshot_id); offset += snapshot_id.len; buffer[offset] = @intCast(snapshot_sha256.len); offset += 1; @memcpy(buffer[offset .. offset + snapshot_sha256.len], snapshot_sha256); offset += snapshot_sha256.len; buffer[offset] = cpu; buffer[offset + 1] = memory_gb; buffer[offset + 2] = gpu; buffer[offset + 3] = @intCast(gpu_mem.len); offset += 4; if (gpu_mem.len > 0) { @memcpy(buffer[offset .. offset + gpu_mem.len], gpu_mem); } try frame.sendWebSocketFrame(stream, buffer); } pub fn sendValidateRequestTask(self: *Client, api_key_hash: []const u8, task_id: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (task_id.len == 0 or task_id.len > 255) return error.PayloadTooLarge; const total_len = 1 + 16 + 1 + 1 + task_id.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.validate_request); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intFromEnum(opcode.ValidateTargetType.task_id); offset += 1; buffer[offset] = @intCast(task_id.len); offset += 1; @memcpy(buffer[offset .. offset + task_id.len], task_id); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJob(self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; // Validate input lengths if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [commit_id: 20 bytes] [priority: u8] [job_name_len: u8] [job_name: var] const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset..], job_name); // Send as WebSocket binary frame try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJobWithResources( self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8, cpu: u8, memory_gb: u8, gpu: u8, gpu_memory: ?[]const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; const gpu_mem = gpu_memory orelse ""; if (gpu_mem.len > 255) return error.PayloadTooLarge; // Tail encoding: [cpu:1][memory_gb:1][gpu:1][gpu_mem_len:1][gpu_mem:var] const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len + 4 + gpu_mem.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; buffer[offset] = cpu; buffer[offset + 1] = memory_gb; buffer[offset + 2] = gpu; buffer[offset + 3] = @intCast(gpu_mem.len); offset += 4; if (gpu_mem.len > 0) { @memcpy(buffer[offset .. offset + gpu_mem.len], gpu_mem); } try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJobWithTracking( self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8, tracking_json: []const u8, ) !void { const stream = self.stream orelse return error.NotConnected; // Validate input lengths if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; if (tracking_json.len > 0xFFFF) return error.PayloadTooLarge; // Build binary message: // [opcode: u8] // [api_key_hash: 16] // [commit_id: 20] // [priority: u8] // [job_name_len: u8] // [job_name: var] // [tracking_json_len: u16] // [tracking_json: var] const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len + 2 + tracking_json.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job_with_tracking); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; // tracking_json length (big-endian) buffer[offset] = @intCast((tracking_json.len >> 8) & 0xFF); buffer[offset + 1] = @intCast(tracking_json.len & 0xFF); offset += 2; if (tracking_json.len > 0) { @memcpy(buffer[offset .. offset + tracking_json.len], tracking_json); } // Single WebSocket frame for throughput try frame.sendWebSocketFrame(stream, buffer); } pub fn sendQueueJobWithTrackingAndResources( self: *Client, job_name: []const u8, commit_id: []const u8, priority: u8, api_key_hash: []const u8, tracking_json: []const u8, cpu: u8, memory_gb: u8, gpu: u8, gpu_memory: ?[]const u8, ) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (job_name.len > 255) return error.JobNameTooLong; if (tracking_json.len > 0xFFFF) return error.PayloadTooLarge; const gpu_mem = gpu_memory orelse ""; if (gpu_mem.len > 255) return error.PayloadTooLarge; // [opcode] // [api_key_hash] // [commit_id] // [priority] // [job_name_len][job_name] // [tracking_json_len:2][tracking_json] // [cpu][memory_gb][gpu][gpu_mem_len][gpu_mem] const total_len = 1 + 16 + 20 + 1 + 1 + job_name.len + 2 + tracking_json.len + 4 + gpu_mem.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.queue_job_with_tracking); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; buffer[offset] = priority; offset += 1; buffer[offset] = @intCast(job_name.len); offset += 1; @memcpy(buffer[offset .. offset + job_name.len], job_name); offset += job_name.len; buffer[offset] = @intCast((tracking_json.len >> 8) & 0xFF); buffer[offset + 1] = @intCast(tracking_json.len & 0xFF); offset += 2; if (tracking_json.len > 0) { @memcpy(buffer[offset .. offset + tracking_json.len], tracking_json); offset += tracking_json.len; } buffer[offset] = cpu; buffer[offset + 1] = memory_gb; buffer[offset + 2] = gpu; buffer[offset + 3] = @intCast(gpu_mem.len); offset += 4; if (gpu_mem.len > 0) { @memcpy(buffer[offset .. offset + gpu_mem.len], gpu_mem); } try frame.sendWebSocketFrame(stream, buffer); } pub fn sendCancelJob(self: *Client, job_name: []const u8, api_key_hash: []const u8) !void { const stream = try self.getStream(); try validateApiKeyHash(api_key_hash); try validateJobName(job_name); var builder = try MessageBuilder.init(self.allocator, 1 + 16 + 1 + job_name.len); defer builder.deinit(); builder.writeOpcode(opcode.cancel_job); builder.writeBytes(api_key_hash); builder.writeStringU8(job_name); try builder.send(stream); } pub fn sendPrune(self: *Client, api_key_hash: []const u8, prune_type: u8, value: u32) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [prune_type: u8] [value: u4] const total_len = 1 + 16 + 1 + 4; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.prune); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = prune_type; offset += 1; // Store value in big-endian format buffer[offset] = @intCast((value >> 24) & 0xFF); buffer[offset + 1] = @intCast((value >> 16) & 0xFF); buffer[offset + 2] = @intCast((value >> 8) & 0xFF); buffer[offset + 3] = @intCast(value & 0xFF); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendStatusRequest(self: *Client, api_key_hash: []const u8) !void { const stream = try self.getStream(); try validateApiKeyHash(api_key_hash); var builder = try MessageBuilder.init(self.allocator, 1 + 16); defer builder.deinit(); builder.writeOpcode(opcode.status_request); builder.writeBytes(api_key_hash); try builder.send(stream); } pub fn receiveMessage(self: *Client, allocator: std.mem.Allocator) ![]u8 { const stream = self.stream orelse return error.NotConnected; return frame.receiveBinaryMessage(stream, allocator); } /// Receive and handle response with automatic display pub fn receiveAndHandleResponse(self: *Client, allocator: std.mem.Allocator, operation: []const u8) !void { const message = try self.receiveMessage(allocator); defer allocator.free(message); const packet = protocol.ResponsePacket.deserialize(message, allocator) catch { // Fallback: treat as plain response. std.debug.print("Server response: {s}\n", .{message}); return; }; defer packet.deinit(allocator); try response_handlers.handleResponsePacket(self, packet, operation); } pub fn receiveAndHandleStatusResponse(self: *Client, allocator: std.mem.Allocator, user_context: anytype, options: anytype) !void { return response_handlers.receiveAndHandleStatusResponse(self, allocator, user_context, options); } pub fn receiveAndHandleCancelResponse(self: *Client, allocator: std.mem.Allocator, user_context: anytype, job_name: []const u8, options: anytype) !void { return response_handlers.receiveAndHandleCancelResponse(self, allocator, user_context, job_name, options); } pub fn handleResponsePacket(self: *Client, packet: protocol.ResponsePacket, operation: []const u8) !void { return response_handlers.handleResponsePacket(self, packet, operation); } fn convertServerError(self: *Client, server_error: protocol.ErrorCode) anyerror { _ = self; return switch (server_error) { .authentication_failed => error.AuthenticationFailed, .permission_denied => error.PermissionDenied, .resource_not_found => error.JobNotFound, .resource_already_exists => error.ResourceExists, .timeout => error.RequestTimeout, .server_overloaded, .service_unavailable => error.ServerUnreachable, .invalid_request => error.InvalidArguments, .job_not_found => error.JobNotFound, .job_already_running => error.JobAlreadyRunning, .job_failed_to_start, .job_execution_failed => error.CommandFailed, .job_cancelled => error.JobCancelled, else => error.ServerError, }; } pub fn sendCrashReport(self: *Client, api_key_hash: []const u8, error_type: []const u8, error_message: []const u8, command: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; // Build binary message: [opcode:1][api_key_hash:16][error_type_len:2][error_type][error_message_len:2][error_message][command_len:2][command] const total_len = 1 + 16 + 2 + error_type.len + 2 + error_message.len + 2 + command.len; const message = try self.allocator.alloc(u8, total_len); defer self.allocator.free(message); var offset: usize = 0; // opcode message[offset] = @intFromEnum(opcode.crash_report); offset += 1; // API key hash @memcpy(message[offset .. offset + 16], api_key_hash); offset += 16; // Error type length and data std.mem.writeInt(u16, message[offset .. offset + 2][0..2], @intCast(error_type.len), .big); offset += 2; @memcpy(message[offset .. offset + error_type.len], error_type); offset += error_type.len; // Error message length and data std.mem.writeInt(u16, message[offset .. offset + 2][0..2], @intCast(error_message.len), .big); offset += 2; @memcpy(message[offset .. offset + error_message.len], error_message); offset += error_message.len; // Command length and data std.mem.writeInt(u16, message[offset .. offset + 2][0..2], @intCast(command.len), .big); offset += 2; @memcpy(message[offset .. offset + command.len], command); // Send WebSocket frame try frame.sendWebSocketFrame(stream, message); } // Dataset management methods pub fn sendDatasetList(self: *Client, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; // Build binary message: [opcode: u8] [api_key_hash: 16 bytes] const total_len = 1 + 16; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); buffer[0] = @intFromEnum(opcode.dataset_list); @memcpy(buffer[1..17], api_key_hash); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendDatasetRegister(self: *Client, name: []const u8, url: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (name.len > 255) return error.NameTooLong; if (url.len > 1023) return error.URLTooLong; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [name_len: u8] [name: var] [url_len: u16] [url: var] const total_len = 1 + 16 + 1 + name.len + 2 + url.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.dataset_register); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(name.len); offset += 1; @memcpy(buffer[offset .. offset + name.len], name); offset += name.len; std.mem.writeInt(u16, buffer[offset .. offset + 2][0..2], @intCast(url.len), .big); offset += 2; @memcpy(buffer[offset .. offset + url.len], url); try frame.sendWebSocketFrame(stream, buffer); } // Jupyter management methods pub fn sendStartJupyter(self: *Client, name: []const u8, workspace: []const u8, password: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (name.len > 255) return error.NameTooLong; if (workspace.len > 65535) return error.WorkspacePathTooLong; if (password.len > 255) return error.PasswordTooLong; // Build binary message: // [opcode:1][api_key_hash:16][name_len:1][name:var][workspace_len:2][workspace:var][password_len:1][password:var] const total_len = 1 + 16 + 1 + name.len + 2 + workspace.len + 1 + password.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.start_jupyter); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(name.len); offset += 1; @memcpy(buffer[offset .. offset + name.len], name); offset += name.len; std.mem.writeInt(u16, buffer[offset .. offset + 2][0..2], @intCast(workspace.len), .big); offset += 2; @memcpy(buffer[offset .. offset + workspace.len], workspace); offset += workspace.len; buffer[offset] = @intCast(password.len); offset += 1; @memcpy(buffer[offset .. offset + password.len], password); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendStopJupyter(self: *Client, service_id: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (service_id.len > 255) return error.InvalidServiceId; // Build binary message: [opcode:1][api_key_hash:16][service_id_len:1][service_id:var] const total_len = 1 + 16 + 1 + service_id.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.stop_jupyter); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(service_id.len); offset += 1; @memcpy(buffer[offset .. offset + service_id.len], service_id); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendRemoveJupyter(self: *Client, service_id: []const u8, api_key_hash: []const u8, purge: bool) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (service_id.len > 255) return error.InvalidServiceId; // Build binary message: [opcode:1][api_key_hash:16][service_id_len:1][service_id:var][purge:1] const total_len = 1 + 16 + 1 + service_id.len + 1; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.remove_jupyter); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(service_id.len); offset += 1; @memcpy(buffer[offset .. offset + service_id.len], service_id); offset += service_id.len; buffer[offset] = if (purge) 0x01 else 0x00; try frame.sendWebSocketFrame(stream, buffer); } pub fn sendRestoreJupyter(self: *Client, name: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (name.len > 255) return error.NameTooLong; // Build binary message: [opcode:1][api_key_hash:16][name_len:1][name:var] const total_len = 1 + 16 + 1 + name.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.restore_jupyter); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(name.len); offset += 1; @memcpy(buffer[offset .. offset + name.len], name); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendListJupyter(self: *Client, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; // Build binary message: [opcode:1][api_key_hash:16] const total_len = 1 + 16; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); buffer[0] = @intFromEnum(opcode.list_jupyter); @memcpy(buffer[1..17], api_key_hash); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendDatasetInfo(self: *Client, name: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (name.len > 255) return error.NameTooLong; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [name_len: u8] [name: var] const total_len = 1 + 16 + 1 + name.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.dataset_info); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(name.len); offset += 1; @memcpy(buffer[offset..], name); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendDatasetSearch(self: *Client, term: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; // Build binary message: [opcode: u8] [api_key_hash: 16 bytes] [term_len: u8] [term: var] const total_len = 1 + 16 + 1 + term.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.dataset_search); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(term.len); offset += 1; @memcpy(buffer[offset..], term); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendLogMetric(self: *Client, api_key_hash: []const u8, commit_id: []const u8, name: []const u8, value: f64, step: u32) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; if (name.len > 255) return error.NameTooLong; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [commit_id: 20 bytes] [step: u32] [value: f64] [name_len: u8] [name: var] const total_len = 1 + 16 + 20 + 4 + 8 + 1 + name.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.log_metric); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; std.mem.writeInt(u32, buffer[offset .. offset + 4][0..4], step, .big); offset += 4; std.mem.writeInt(u64, buffer[offset .. offset + 8][0..8], @as(u64, @bitCast(value)), .big); offset += 8; buffer[offset] = @intCast(name.len); offset += 1; @memcpy(buffer[offset..], name); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendGetExperiment(self: *Client, api_key_hash: []const u8, commit_id: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (commit_id.len != 20) return error.InvalidCommitId; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [commit_id: 20 bytes] const total_len = 1 + 16 + 20; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.get_experiment); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; @memcpy(buffer[offset .. offset + 20], commit_id); offset += 20; try frame.sendWebSocketFrame(stream, buffer); } pub fn sendCreateExperiment(self: *Client, api_key_hash: []const u8, name: []const u8, description: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (name.len == 0 or name.len > 255) return error.NameTooLong; if (description.len > 1023) return error.DescriptionTooLong; // Build binary message: // [opcode: u8] [api_key_hash: 16 bytes] [name_len: u8] [name: var] [desc_len: u16] [description: var] const total_len = 1 + 16 + 1 + name.len + 2 + description.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.create_experiment); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(name.len); offset += 1; @memcpy(buffer[offset .. offset + name.len], name); offset += name.len; std.mem.writeInt(u16, buffer[offset .. offset + 2][0..2], @intCast(description.len), .big); offset += 2; if (description.len > 0) { @memcpy(buffer[offset .. offset + description.len], description); } try frame.sendWebSocketFrame(stream, buffer); } pub fn sendListExperiments(self: *Client, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; // Build binary message: [opcode: u8] [api_key_hash: 16 bytes] const total_len = 1 + 16; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); buffer[0] = @intFromEnum(opcode.list_experiments); @memcpy(buffer[1..17], api_key_hash); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendGetExperimentByID(self: *Client, api_key_hash: []const u8, experiment_id: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (experiment_id.len == 0 or experiment_id.len > 255) return error.InvalidExperimentId; // Build binary message: [opcode: u8] [api_key_hash: 16 bytes] [exp_id_len: u8] [experiment_id: var] const total_len = 1 + 16 + 1 + experiment_id.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.get_experiment); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(experiment_id.len); offset += 1; @memcpy(buffer[offset .. offset + experiment_id.len], experiment_id); try frame.sendWebSocketFrame(stream, buffer); } // Logs and debug methods pub fn sendGetLogs(self: *Client, target_id: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (target_id.len == 0 or target_id.len > 255) return error.InvalidTargetId; // Build binary message: [opcode:1][api_key_hash:16][target_id_len:1][target_id:var] const total_len = 1 + 16 + 1 + target_id.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.get_logs); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(target_id.len); offset += 1; @memcpy(buffer[offset .. offset + target_id.len], target_id); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendStreamLogs(self: *Client, target_id: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (target_id.len == 0 or target_id.len > 255) return error.InvalidTargetId; // Build binary message: [opcode:1][api_key_hash:16][target_id_len:1][target_id:var] const total_len = 1 + 16 + 1 + target_id.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.stream_logs); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(target_id.len); offset += 1; @memcpy(buffer[offset .. offset + target_id.len], target_id); try frame.sendWebSocketFrame(stream, buffer); } pub fn sendAttachDebug(self: *Client, target_id: []const u8, debug_type: []const u8, api_key_hash: []const u8) !void { const stream = self.stream orelse return error.NotConnected; if (api_key_hash.len != 16) return error.InvalidApiKeyHash; if (target_id.len == 0 or target_id.len > 255) return error.InvalidTargetId; if (debug_type.len > 255) return error.InvalidDebugType; // Build binary message: [opcode:1][api_key_hash:16][target_id_len:1][target_id:var][debug_type:var] const total_len = 1 + 16 + 1 + target_id.len + debug_type.len; var buffer = try self.allocator.alloc(u8, total_len); defer self.allocator.free(buffer); var offset: usize = 0; buffer[offset] = @intFromEnum(opcode.attach_debug); offset += 1; @memcpy(buffer[offset .. offset + 16], api_key_hash); offset += 16; buffer[offset] = @intCast(target_id.len); offset += 1; @memcpy(buffer[offset .. offset + target_id.len], target_id); offset += target_id.len; if (debug_type.len > 0) { @memcpy(buffer[offset .. offset + debug_type.len], debug_type); } try frame.sendWebSocketFrame(stream, buffer); } /// Receive and handle dataset response pub fn receiveAndHandleDatasetResponse(self: *Client, allocator: std.mem.Allocator) ![]const u8 { const message = try self.receiveMessage(allocator); defer allocator.free(message); const packet = protocol.ResponsePacket.deserialize(message, allocator) catch { // Fallback: treat as plain response. return allocator.dupe(u8, message); }; defer packet.deinit(allocator); switch (packet.packet_type) { .data => { if (packet.data_payload) |payload| { return allocator.dupe(u8, payload); } return allocator.dupe(u8, ""); }, .success => { if (packet.success_message) |msg| { return allocator.dupe(u8, msg); } return allocator.dupe(u8, ""); }, .error_packet => { // Print details and raise appropriate CLI error. _ = response_handlers.handleResponsePacket(self, packet, "Dataset") catch {}; return self.convertServerError(packet.error_code.?); }, else => { // Unexpected packet type. return error.UnexpectedResponse; }, } } };