fetch_ml/internal/api/adapter.go
Jeremie Fraeys 1f495dfbb7
api: regenerate OpenAPI types and server code
- Update openapi.yaml spec
- Regenerate server_gen.go with oapi-codegen
- Update adapter, routes, and server configuration
2026-03-04 13:23:34 -05:00

372 lines
12 KiB
Go

// Package api provides HTTP handlers and OpenAPI-generated server interface implementations
package api
import (
"net/http"
"github.com/jfraeys/fetch_ml/internal/api/audit"
"github.com/jfraeys/fetch_ml/internal/api/datasets"
"github.com/jfraeys/fetch_ml/internal/api/jobs"
"github.com/jfraeys/fetch_ml/internal/api/jupyter"
"github.com/jfraeys/fetch_ml/internal/api/plugins"
sch "github.com/jfraeys/fetch_ml/internal/api/scheduler"
"github.com/labstack/echo/v4"
)
// HandlerAdapter implements the generated ServerInterface using existing handlers
type HandlerAdapter struct {
jobsHandler *jobs.Handler
jupyterHandler *jupyter.Handler
datasetsHandler *datasets.Handler
pluginsHandler *plugins.Handler
schedulerHandler *sch.APIHandler
auditHandler *audit.Handler
}
// NewHandlerAdapter creates a new handler adapter
func NewHandlerAdapter(
jobsHandler *jobs.Handler,
jupyterHandler *jupyter.Handler,
datasetsHandler *datasets.Handler,
pluginsHandler *plugins.Handler,
schedulerHandler *sch.APIHandler,
auditHandler *audit.Handler,
) *HandlerAdapter {
return &HandlerAdapter{
jobsHandler: jobsHandler,
jupyterHandler: jupyterHandler,
datasetsHandler: datasetsHandler,
pluginsHandler: pluginsHandler,
schedulerHandler: schedulerHandler,
auditHandler: auditHandler,
}
}
// Ensure HandlerAdapter implements the generated interface
var _ ServerInterface = (*HandlerAdapter)(nil)
// toHTTPHandler converts echo.Context to standard HTTP handler
func toHTTPHandler(h func(http.ResponseWriter, *http.Request)) echo.HandlerFunc {
return func(c echo.Context) error {
h(c.Response().Writer, c.Request())
return nil
}
}
// GetHealth implements the health check endpoint
func (a *HandlerAdapter) GetHealth(ctx echo.Context) error {
return ctx.String(200, "OK\n")
}
// GetV1Experiments lists all experiments
func (a *HandlerAdapter) GetV1Experiments(ctx echo.Context) error {
return ctx.JSON(200, map[string]any{
"experiments": []any{},
"message": "Not yet implemented",
})
}
// PostV1Experiments creates a new experiment
func (a *HandlerAdapter) PostV1Experiments(ctx echo.Context) error {
return ctx.JSON(201, map[string]any{
"message": "Not yet implemented",
})
}
// GetV1JupyterServices lists all Jupyter services
func (a *HandlerAdapter) GetV1JupyterServices(ctx echo.Context) error {
if a.jupyterHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Jupyter service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.jupyterHandler.ListServicesHTTP)(ctx)
}
// PostV1JupyterServices starts a new Jupyter service
func (a *HandlerAdapter) PostV1JupyterServices(ctx echo.Context) error {
if a.jupyterHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Jupyter service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.jupyterHandler.StartServiceHTTP)(ctx)
}
// DeleteV1JupyterServicesServiceId stops a Jupyter service
func (a *HandlerAdapter) DeleteV1JupyterServicesServiceId(ctx echo.Context, serviceId string) error {
if a.jupyterHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Jupyter service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
// TODO: Implement when StopServiceHTTP is available
return ctx.JSON(501, map[string]any{
"error": "Not implemented",
"code": "NOT_IMPLEMENTED",
"message": "Jupyter service stop not yet implemented via REST API",
})
}
// GetV1Queue returns queue status
func (a *HandlerAdapter) GetV1Queue(ctx echo.Context) error {
return ctx.JSON(200, map[string]any{
"status": "healthy",
"pending": 0,
"running": 0,
})
}
// GetV1Tasks lists all tasks
func (a *HandlerAdapter) GetV1Tasks(ctx echo.Context, params GetV1TasksParams) error {
if a.jobsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Jobs handler not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.jobsHandler.ListAllJobsHTTP)(ctx)
}
// PostV1Tasks creates a new task
func (a *HandlerAdapter) PostV1Tasks(ctx echo.Context) error {
return ctx.JSON(501, map[string]any{
"error": "Not implemented",
"code": "NOT_IMPLEMENTED",
"message": "Task creation via REST API not yet implemented - use WebSocket",
})
}
// DeleteV1TasksTaskId cancels/deletes a task
func (a *HandlerAdapter) DeleteV1TasksTaskId(ctx echo.Context, taskId string) error {
return ctx.JSON(501, map[string]any{
"error": "Not implemented",
"code": "NOT_IMPLEMENTED",
"message": "Task cancellation via REST API not yet implemented - use WebSocket",
})
}
// GetV1TasksTaskId gets task details
func (a *HandlerAdapter) GetV1TasksTaskId(ctx echo.Context, taskId string) error {
return ctx.JSON(501, map[string]any{
"error": "Not implemented",
"code": "NOT_IMPLEMENTED",
"message": "Task details via REST API not yet implemented - use WebSocket",
})
}
// GetWs handles WebSocket connections
func (a *HandlerAdapter) GetWs(ctx echo.Context) error {
return ctx.JSON(426, map[string]any{
"error": "WebSocket connection required",
"code": "UPGRADE_REQUIRED",
"message": "Use WebSocket protocol to connect to this endpoint",
})
}
// Plugin handlers
// GetV1Plugins lists all plugins
func (a *HandlerAdapter) GetV1Plugins(ctx echo.Context) error {
if a.pluginsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Plugin service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.pluginsHandler.GetV1Plugins)(ctx)
}
// GetV1PluginsPluginName gets plugin details
func (a *HandlerAdapter) GetV1PluginsPluginName(ctx echo.Context, pluginName string) error {
if a.pluginsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Plugin service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.pluginsHandler.GetV1PluginsPluginName)(ctx)
}
// GetV1PluginsPluginNameConfig gets plugin configuration
func (a *HandlerAdapter) GetV1PluginsPluginNameConfig(ctx echo.Context, pluginName string) error {
if a.pluginsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Plugin service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.pluginsHandler.GetV1PluginsPluginNameConfig)(ctx)
}
// PutV1PluginsPluginNameConfig updates plugin configuration
func (a *HandlerAdapter) PutV1PluginsPluginNameConfig(ctx echo.Context, pluginName string) error {
if a.pluginsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Plugin service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.pluginsHandler.PutV1PluginsPluginNameConfig)(ctx)
}
// DeleteV1PluginsPluginNameConfig disables/unloads plugin
func (a *HandlerAdapter) DeleteV1PluginsPluginNameConfig(ctx echo.Context, pluginName string) error {
if a.pluginsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Plugin service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.pluginsHandler.DeleteV1PluginsPluginNameConfig)(ctx)
}
// GetV1PluginsPluginNameHealth checks plugin health
func (a *HandlerAdapter) GetV1PluginsPluginNameHealth(ctx echo.Context, pluginName string) error {
if a.pluginsHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Plugin service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.pluginsHandler.GetV1PluginsPluginNameHealth)(ctx)
}
// Scheduler handlers
// GetV1SchedulerStatus gets scheduler status
func (a *HandlerAdapter) GetV1SchedulerStatus(ctx echo.Context) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.GetV1SchedulerStatus)(ctx)
}
// GetV1SchedulerStatusStream gets scheduler status stream (SSE)
func (a *HandlerAdapter) GetV1SchedulerStatusStream(ctx echo.Context) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.GetV1SchedulerStatusStream)(ctx)
}
// GetV1SchedulerWorkers lists connected workers
func (a *HandlerAdapter) GetV1SchedulerWorkers(ctx echo.Context) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.GetV1SchedulerWorkers)(ctx)
}
// GetV1SchedulerWorkersWorkerId gets worker details
func (a *HandlerAdapter) GetV1SchedulerWorkersWorkerId(ctx echo.Context, workerId string) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.GetV1SchedulerWorkersWorkerID)(ctx)
}
// DeleteV1SchedulerWorkersWorkerId drains a worker
func (a *HandlerAdapter) DeleteV1SchedulerWorkersWorkerId(ctx echo.Context, workerId string) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.DeleteV1SchedulerWorkersWorkerID)(ctx)
}
// GetV1SchedulerReservations lists active reservations
func (a *HandlerAdapter) GetV1SchedulerReservations(ctx echo.Context) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.GetV1SchedulerReservations)(ctx)
}
// PostV1SchedulerReservations creates a reservation
func (a *HandlerAdapter) PostV1SchedulerReservations(ctx echo.Context) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.PostV1SchedulerReservations)(ctx)
}
// PatchV1SchedulerJobsJobIdPriority updates job priority
func (a *HandlerAdapter) PatchV1SchedulerJobsJobIdPriority(ctx echo.Context, jobId string) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.PatchV1SchedulerJobsJobIDPriority)(ctx)
}
// GetV1SchedulerJobsJobIdStream gets job progress stream (SSE)
func (a *HandlerAdapter) GetV1SchedulerJobsJobIdStream(ctx echo.Context, jobId string) error {
if a.schedulerHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Scheduler service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.schedulerHandler.GetV1SchedulerJobsJobIDStream)(ctx)
}
// Audit handlers
// GetV1AuditEvents queries audit events
func (a *HandlerAdapter) GetV1AuditEvents(ctx echo.Context, params GetV1AuditEventsParams) error {
if a.auditHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Audit service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.auditHandler.GetV1AuditEvents)(ctx)
}
// PostV1AuditVerify verifies audit chain integrity
func (a *HandlerAdapter) PostV1AuditVerify(ctx echo.Context) error {
if a.auditHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Audit service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.auditHandler.PostV1AuditVerify)(ctx)
}
// GetV1AuditChainRoot gets chain root hash
func (a *HandlerAdapter) GetV1AuditChainRoot(ctx echo.Context) error {
if a.auditHandler == nil {
return ctx.JSON(503, map[string]any{
"error": "Audit service not available",
"code": "SERVICE_UNAVAILABLE",
})
}
return toHTTPHandler(a.auditHandler.GetV1AuditChainRoot)(ctx)
}