fetch_ml/internal/worker/executor/adapter.go
Jeremie Fraeys a7360869f8
refactor: Implement TaskExecutorAdapter and Worker.runningCount()
- Created executor/TaskExecutorAdapter implementing lifecycle.TaskExecutor
- Properly wires LocalExecutor and ContainerExecutor through adapter
- Worker.runningCount() now delegates to runLoop.RunningCount()
- Added lifecycle.RunLoop.RunningCount() public method
- factory.go creates proper executor chain instead of placeholder

Build status: Compiles successfully
2026-02-17 16:15:41 -05:00

56 lines
1.6 KiB
Go

// Package executor provides job execution implementations
package executor
import (
"context"
"github.com/jfraeys/fetch_ml/internal/queue"
"github.com/jfraeys/fetch_ml/internal/worker/interfaces"
"github.com/jfraeys/fetch_ml/internal/worker/lifecycle"
)
// TaskExecutorAdapter adapts JobExecutor to lifecycle.TaskExecutor interface
type TaskExecutorAdapter struct {
local *LocalExecutor
container *ContainerExecutor
localMode bool
}
// NewTaskExecutorAdapter creates a new task executor adapter
func NewTaskExecutorAdapter(
local *LocalExecutor,
container *ContainerExecutor,
localMode bool,
) lifecycle.TaskExecutor {
return &TaskExecutorAdapter{
local: local,
container: container,
localMode: localMode,
}
}
// Execute runs a task using the appropriate executor
func (a *TaskExecutorAdapter) Execute(ctx context.Context, task *queue.Task) error {
// For now, use localMode to determine which executor to use
// This is a simplified implementation - can be enhanced later
if a.localMode && a.local != nil {
// Create a basic execution environment for local execution
env := interfaces.ExecutionEnv{
JobDir: "/tmp/jobs/" + task.ID,
OutputDir: "/tmp/output/" + task.ID,
LogFile: "/tmp/logs/" + task.ID + ".log",
}
return a.local.Execute(ctx, task, env)
}
if a.container != nil {
env := interfaces.ExecutionEnv{
JobDir: "/tmp/jobs/" + task.ID,
OutputDir: "/tmp/output/" + task.ID,
LogFile: "/tmp/logs/" + task.ID + ".log",
}
return a.container.Execute(ctx, task, env)
}
return nil // No executor available
}