-- PostgreSQL schema for Fetch ML job persistence -- Complements Redis for task queuing CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, job_name TEXT NOT NULL, args TEXT, status TEXT NOT NULL DEFAULT 'pending', priority INTEGER DEFAULT 0, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP WITH TIME ZONE, ended_at TIMESTAMP WITH TIME ZONE, worker_id TEXT, error TEXT, datasets TEXT, -- JSON array metadata TEXT, -- JSON object updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS job_metrics ( job_id TEXT, metric_name TEXT, metric_value TEXT, timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (job_id, metric_name, timestamp), FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS workers ( id TEXT PRIMARY KEY, hostname TEXT, last_heartbeat TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'active', current_jobs INTEGER DEFAULT 0, max_jobs INTEGER DEFAULT 1, metadata TEXT -- JSON object ); CREATE TABLE IF NOT EXISTS system_metrics ( metric_name TEXT, metric_value TEXT, timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (metric_name, timestamp) ); CREATE TABLE IF NOT EXISTS datasets ( name TEXT PRIMARY KEY, url TEXT NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); -- Indexes for performance CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at); CREATE INDEX IF NOT EXISTS idx_jobs_worker_id ON jobs(worker_id); CREATE INDEX IF NOT EXISTS idx_job_metrics_job_id ON job_metrics(job_id); CREATE INDEX IF NOT EXISTS idx_job_metrics_timestamp ON job_metrics(timestamp); CREATE INDEX IF NOT EXISTS idx_workers_heartbeat ON workers(last_heartbeat); CREATE INDEX IF NOT EXISTS idx_system_metrics_timestamp ON system_metrics(timestamp); CREATE INDEX IF NOT EXISTS idx_datasets_name ON datasets(name); -- WebSocket metrics table for tracking real-time metrics CREATE TABLE IF NOT EXISTS websocket_metrics ( id SERIAL PRIMARY KEY, metric_name TEXT NOT NULL, metric_value REAL NOT NULL, user TEXT, recorded_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_websocket_metrics_name_time ON websocket_metrics(metric_name, recorded_at); -- Function to update updated_at timestamp CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW; END; $$ language 'plpgsql'; -- Trigger to update timestamps CREATE TRIGGER update_jobs_timestamp BEFORE UPDATE ON jobs FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();