146 lines
4.2 KiB
Go
146 lines
4.2 KiB
Go
// Package storage provides database abstraction and job management.
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq" // PostgreSQL driver
|
|
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
|
)
|
|
|
|
// DBConfig holds database connection configuration.
|
|
type DBConfig struct {
|
|
Type string
|
|
Connection string
|
|
Host string
|
|
Port int
|
|
Username string
|
|
Password string
|
|
Database string
|
|
}
|
|
|
|
// DB wraps a database connection with type information.
|
|
type DB struct {
|
|
conn *sql.DB
|
|
dbType string
|
|
}
|
|
|
|
// DBTypeSQLite is the constant for SQLite database type
|
|
const DBTypeSQLite = "sqlite"
|
|
|
|
// NewDB creates a new database connection.
|
|
func NewDB(config DBConfig) (*DB, error) {
|
|
var conn *sql.DB
|
|
var err error
|
|
|
|
switch strings.ToLower(config.Type) {
|
|
case DBTypeSQLite:
|
|
conn, err = sql.Open("sqlite3", config.Connection)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
|
|
}
|
|
// Enable foreign keys
|
|
if _, err := conn.ExecContext(context.Background(), "PRAGMA foreign_keys = ON"); err != nil {
|
|
return nil, fmt.Errorf("failed to enable foreign keys: %w", err)
|
|
}
|
|
// Enable WAL mode for better concurrency
|
|
if _, err := conn.ExecContext(context.Background(), "PRAGMA journal_mode = WAL"); err != nil {
|
|
return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
|
|
}
|
|
// Additional SQLite optimizations for throughput
|
|
if _, err := conn.ExecContext(context.Background(), "PRAGMA synchronous = NORMAL"); err != nil {
|
|
return nil, fmt.Errorf("failed to set synchronous mode: %w", err)
|
|
}
|
|
if _, err := conn.ExecContext(context.Background(), "PRAGMA cache_size = 10000"); err != nil {
|
|
return nil, fmt.Errorf("failed to set cache size: %w", err)
|
|
}
|
|
if _, err := conn.ExecContext(context.Background(), "PRAGMA temp_store = MEMORY"); err != nil {
|
|
return nil, fmt.Errorf("failed to set temp store: %w", err)
|
|
}
|
|
case "postgres":
|
|
connStr := buildPostgresConnectionString(config)
|
|
conn, err = sql.Open("postgres", connStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open PostgreSQL database: %w", err)
|
|
}
|
|
case "postgresql":
|
|
// Handle "postgresql" as alias for "postgres"
|
|
connStr := buildPostgresConnectionString(config)
|
|
conn, err = sql.Open("postgres", connStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open PostgreSQL database: %w", err)
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported database type: %s", config.Type)
|
|
}
|
|
|
|
// Optimize connection pool for better throughput
|
|
conn.SetMaxOpenConns(50) // Increase max open connections
|
|
conn.SetMaxIdleConns(25) // Maintain idle connections
|
|
conn.SetConnMaxLifetime(5 * time.Minute) // Connection lifetime
|
|
conn.SetConnMaxIdleTime(2 * time.Minute) // Idle connection timeout
|
|
|
|
return &DB{conn: conn, dbType: strings.ToLower(config.Type)}, nil
|
|
}
|
|
|
|
func buildPostgresConnectionString(config DBConfig) string {
|
|
if config.Connection != "" {
|
|
return config.Connection
|
|
}
|
|
|
|
var connStr strings.Builder
|
|
connStr.WriteString("host=")
|
|
if config.Host != "" {
|
|
connStr.WriteString(config.Host)
|
|
} else {
|
|
connStr.WriteString("localhost")
|
|
}
|
|
|
|
if config.Port > 0 {
|
|
connStr.WriteString(fmt.Sprintf(" port=%d", config.Port))
|
|
} else {
|
|
connStr.WriteString(" port=5432")
|
|
}
|
|
|
|
if config.Username != "" {
|
|
connStr.WriteString(fmt.Sprintf(" user=%s", config.Username))
|
|
}
|
|
|
|
if config.Password != "" {
|
|
connStr.WriteString(fmt.Sprintf(" password=%s", config.Password))
|
|
}
|
|
|
|
if config.Database != "" {
|
|
connStr.WriteString(fmt.Sprintf(" dbname=%s", config.Database))
|
|
} else {
|
|
connStr.WriteString(" dbname=fetch_ml")
|
|
}
|
|
|
|
connStr.WriteString(" sslmode=disable")
|
|
return connStr.String()
|
|
}
|
|
|
|
// NewDBFromPath creates a new database from a file path (legacy constructor).
|
|
func NewDBFromPath(dbPath string) (*DB, error) {
|
|
return NewDB(DBConfig{
|
|
Type: DBTypeSQLite,
|
|
Connection: dbPath,
|
|
})
|
|
}
|
|
|
|
// Initialize creates database schema.
|
|
func (db *DB) Initialize(schema string) error {
|
|
if _, err := db.conn.ExecContext(context.Background(), schema); err != nil {
|
|
return fmt.Errorf("failed to initialize database: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close closes the database connection.
|
|
func (db *DB) Close() error {
|
|
return db.conn.Close()
|
|
}
|