Update queue and storage systems for scheduler integration: - Queue backend with scheduler coordination - Filesystem queue with batch operations - Deduplication with tenant-aware keys - Storage layer with audit logging hooks - Domain models (Task, Events, Errors) with scheduler fields - Database layer with tenant isolation - Dataset storage with integrity checks
182 lines
5.1 KiB
Go
182 lines
5.1 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// Metric represents a recorded metric from WebSocket connections
|
|
type Metric struct {
|
|
RecordedAt time.Time `json:"recorded_at"`
|
|
Name string `json:"name"`
|
|
User string `json:"user,omitempty"`
|
|
ID int64 `json:"id"`
|
|
Value float64 `json:"value"`
|
|
}
|
|
|
|
// MetricSummary represents aggregated metric statistics
|
|
type MetricSummary struct {
|
|
StartTime time.Time `json:"start_time"`
|
|
EndTime time.Time `json:"end_time"`
|
|
Name string `json:"name"`
|
|
Count int64 `json:"count"`
|
|
Avg float64 `json:"avg"`
|
|
Min float64 `json:"min"`
|
|
Max float64 `json:"max"`
|
|
Sum float64 `json:"sum"`
|
|
}
|
|
|
|
// RecordMetric records a metric to the database
|
|
func (db *DB) RecordMetric(ctx context.Context, name string, value float64, user string) error {
|
|
if name == "" {
|
|
return fmt.Errorf("metric name is required")
|
|
}
|
|
|
|
var query string
|
|
if db.dbType == DBTypeSQLite {
|
|
query = `INSERT INTO websocket_metrics (metric_name, metric_value, user, recorded_at)
|
|
VALUES (?, ?, ?, ?)`
|
|
} else {
|
|
query = `INSERT INTO websocket_metrics (metric_name, metric_value, user, recorded_at)
|
|
VALUES ($1, $2, $3, $4)`
|
|
}
|
|
|
|
_, err := db.conn.ExecContext(ctx, query, name, value, user, time.Now())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to record metric: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetMetrics retrieves metrics within a time range
|
|
func (db *DB) GetMetrics(ctx context.Context, start, end time.Time) ([]*Metric, error) {
|
|
var query string
|
|
var args []interface{}
|
|
|
|
if db.dbType == DBTypeSQLite {
|
|
query = `SELECT id, metric_name, metric_value, user, recorded_at
|
|
FROM websocket_metrics
|
|
WHERE recorded_at BETWEEN ? AND ?
|
|
ORDER BY recorded_at DESC`
|
|
args = []interface{}{start, end}
|
|
} else {
|
|
query = `SELECT id, metric_name, metric_value, user, recorded_at
|
|
FROM websocket_metrics
|
|
WHERE recorded_at BETWEEN $1 AND $2
|
|
ORDER BY recorded_at DESC`
|
|
args = []interface{}{start, end}
|
|
}
|
|
|
|
rows, err := db.conn.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get metrics: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
|
|
var metrics []*Metric
|
|
for rows.Next() {
|
|
var m Metric
|
|
if err := rows.Scan(&m.ID, &m.Name, &m.Value, &m.User, &m.RecordedAt); err != nil {
|
|
return nil, fmt.Errorf("failed to scan metric: %w", err)
|
|
}
|
|
metrics = append(metrics, &m)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating metrics: %w", err)
|
|
}
|
|
|
|
return metrics, nil
|
|
}
|
|
|
|
// GetMetricsByName retrieves metrics for a specific name within a time range
|
|
func (db *DB) GetMetricsByName(ctx context.Context, name string, start, end time.Time) ([]*Metric, error) {
|
|
if name == "" {
|
|
return nil, fmt.Errorf("metric name is required")
|
|
}
|
|
|
|
var query string
|
|
var args []interface{}
|
|
|
|
if db.dbType == DBTypeSQLite {
|
|
query = `SELECT id, metric_name, metric_value, user, recorded_at
|
|
FROM websocket_metrics
|
|
WHERE metric_name = ? AND recorded_at BETWEEN ? AND ?
|
|
ORDER BY recorded_at DESC`
|
|
args = []interface{}{name, start, end}
|
|
} else {
|
|
query = `SELECT id, metric_name, metric_value, user, recorded_at
|
|
FROM websocket_metrics
|
|
WHERE metric_name = $1 AND recorded_at BETWEEN $2 AND $3
|
|
ORDER BY recorded_at DESC`
|
|
args = []interface{}{name, start, end}
|
|
}
|
|
|
|
rows, err := db.conn.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get metrics: %w", err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
|
|
var metrics []*Metric
|
|
for rows.Next() {
|
|
var m Metric
|
|
if err := rows.Scan(&m.ID, &m.Name, &m.Value, &m.User, &m.RecordedAt); err != nil {
|
|
return nil, fmt.Errorf("failed to scan metric: %w", err)
|
|
}
|
|
metrics = append(metrics, &m)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating metrics: %w", err)
|
|
}
|
|
|
|
return metrics, nil
|
|
}
|
|
|
|
// GetMetricSummary retrieves aggregated statistics for a metric within a time window
|
|
func (db *DB) GetMetricSummary(ctx context.Context, name string, window time.Duration) (*MetricSummary, error) {
|
|
if name == "" {
|
|
return nil, fmt.Errorf("metric name is required")
|
|
}
|
|
|
|
end := time.Now()
|
|
start := end.Add(-window)
|
|
|
|
var query string
|
|
var args []interface{}
|
|
|
|
if db.dbType == DBTypeSQLite {
|
|
query = `SELECT
|
|
COUNT(*) as count,
|
|
AVG(metric_value) as avg,
|
|
MIN(metric_value) as min,
|
|
MAX(metric_value) as max,
|
|
SUM(metric_value) as sum
|
|
FROM websocket_metrics
|
|
WHERE metric_name = ? AND recorded_at BETWEEN ? AND ?`
|
|
args = []interface{}{name, start, end}
|
|
} else {
|
|
query = `SELECT
|
|
COUNT(*) as count,
|
|
AVG(metric_value) as avg,
|
|
MIN(metric_value) as min,
|
|
MAX(metric_value) as max,
|
|
SUM(metric_value) as sum
|
|
FROM websocket_metrics
|
|
WHERE metric_name = $1 AND recorded_at BETWEEN $2 AND $3`
|
|
args = []interface{}{name, start, end}
|
|
}
|
|
|
|
row := db.conn.QueryRowContext(ctx, query, args...)
|
|
|
|
var summary MetricSummary
|
|
summary.Name = name
|
|
summary.StartTime = start
|
|
summary.EndTime = end
|
|
|
|
if err := row.Scan(&summary.Count, &summary.Avg, &summary.Min, &summary.Max, &summary.Sum); err != nil {
|
|
return nil, fmt.Errorf("failed to get metric summary: %w", err)
|
|
}
|
|
|
|
return &summary, nil
|
|
}
|