Refactor to scan-first config discovery

This commit is contained in:
Jeremie Fraeys 2026-01-23 13:51:52 -05:00
parent b0e531ad21
commit 7a8403e51c
No known key found for this signature in database
20 changed files with 1036 additions and 765 deletions

View file

@ -9,6 +9,49 @@ Services are expected to be managed as Docker Compose projects on the services s
- Python 3.11+
- Docker and Docker Compose installed on the services server
## Quick Start
1. Create a `.infra.toml` file in your app directory:
```toml
[requires]
services = ["postgres", "redis"]
```
That's it.
The controller will:
- Find your `.infra.toml` file
- Start `postgres` and `redis`
- Keep them running while your app is present
- Stop them **15 minutes** after your app is removed (configurable)
## Service Directory Structure
Services should be in `/opt/<service-name>/docker-compose.yml`:
```text
/opt/
postgres/
docker-compose.yml
redis/
docker-compose.yml
```
## Recommended Architecture
For a lightweight, user-friendly system:
```text
User Workflow:
1. Put .infra.toml in app directory (e.g., /home/user/myapp/.infra.toml)
2. Controller scans and finds it automatically
3. Services start/stop automatically
No manual registration needed!
```
## Config
Preferred config file:
@ -29,39 +72,28 @@ Optional YAML config:
## systemd (event-driven)
To avoid running a daemon or polling timer, you can trigger a one-shot run whenever deployments update the active apps directory:
If you want path-based triggering (no polling), you can run `infra-controller --once` whenever a `.infra.*` file changes under your configured scan paths.
- enable path trigger: `sudo systemctl enable --now infra-controller.path`
- view logs: `journalctl -u infra-controller-once.service -f`
Note: `systemd.path` does not support recursive watches, so for scan-based discovery the practical approach is an inotify watcher.
Example watcher (requires `inotify-tools`):
`/etc/systemd/system/infra-controller-watch.service`
```ini
[Unit]
Description=Watch for .infra.* changes and run infra-controller
[Service]
Type=simple
ExecStart=/bin/sh -lc 'inotifywait -m -r -e create,modify,delete,move --format "%w%f" /home /opt/apps | while read -r p; do case "$p" in *"/.infra."*) infra-controller --once ;; esac; done'
Restart=always
RestartSec=2
```
Enable it:
- `sudo systemctl enable --now infra-controller-watch.service`
- `journalctl -u infra-controller-watch.service -f`
Services that are no longer required are stopped after `grace_period_minutes` (see config) using `docker compose down`.
## Remote app registration
Run `infra-controller` on the service server. When you deploy, create/update a registration file in `/var/run/active-apps/` (this triggers the path unit).
Recommended (Forgejo runner on the web/app server):
- deploy app locally on the web/app server (docker compose or bare-metal)
- register app on the service server by streaming `.infra.toml` over SSH (no scp)
Example (from web/app server runner):
```bash
APP_NAME=my-app
ssh infractl@service-host \
"cat > /var/run/active-apps/$APP_NAME.toml.tmp && mv /var/run/active-apps/$APP_NAME.toml.tmp /var/run/active-apps/$APP_NAME.toml" \
< .infra.toml
```
## Restricted SSH keys (recommended)
If you want to avoid giving CI a general shell on the services server, install the helper scripts to `/usr/local/sbin` (see `install.sh`) and restrict the runner key in `authorized_keys`.
Example (services server, `~infractl/.ssh/authorized_keys`):
```text
command="/usr/local/sbin/infra-register-stdin",no-pty,no-agent-forwarding,no-port-forwarding,no-X11-forwarding ssh-ed25519 AAAA... runner
```
For deregistration, use a separate key restricted to `/usr/local/sbin/infra-deregister`.

View file

@ -1,5 +1,5 @@
[discovery.file_based]
enabled = true
enabled = false
path = "/var/run/active-apps"
[discovery.scan]

View file

@ -1,6 +1,6 @@
discovery:
file_based:
enabled: true
enabled: false
path: /var/run/active-apps
scan:

View file

@ -33,11 +33,8 @@ dev = [
]
[project.scripts]
infra-controller = "infra_controller.__main__:main"
infra-register = "infra_controller.cli:register"
infra-deregister = "infra_controller.cli:deregister"
infra-status = "infra_controller.cli:status"
infra-ensure = "infra_controller.cli:ensure_service_cli"
infra-controller = "src.__main__:main"
[tool.setuptools]
package-dir = {"" = "src"}

View file

@ -5,8 +5,8 @@ import logging
import sys
from pathlib import Path
from infra_controller.config import load_config
from infra_controller.controller import InfraController
from src.config import load_config
from src.controller import InfraController
def main() -> None:

239
src/config.py Normal file
View file

@ -0,0 +1,239 @@
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import tomllib
import yaml
@dataclass
class DiscoveryConfig:
"""Configuration for application discovery methods."""
file_based_enabled: bool = False
file_based_path: Path = Path("/var/run/active-apps")
http_enabled: bool = True
http_host: str = "127.0.0.1"
http_port: int = 8080
scan_enabled: bool = True
scan_paths: list[Path] = field(default_factory=lambda: [Path("/home"), Path("/opt/apps")])
exclude_patterns: list[str] = field(
default_factory=lambda: ["**/node_modules/**", "**/.venv/**", "**/venv/**"]
)
@dataclass
class DockerComposeConfig:
"""Configuration for Docker Compose service management."""
base_dir: Path = Path("/opt")
compose_file: str = "docker-compose.yml"
@dataclass
class ServicesConfig:
"""Configuration for service lifecycle management."""
grace_period_minutes: int
check_interval_seconds: int
state_file: Path
@dataclass
class LoggingConfig:
"""Configuration for logging."""
level: str = "INFO"
file: Path | None = None
max_bytes: int = 10 * 1024 * 1024
backup_count: int = 5
@dataclass
class ControllerConfig:
"""Main configuration for the infrastructure controller."""
discovery: DiscoveryConfig = field(default_factory=DiscoveryConfig)
docker: DockerComposeConfig = field(default_factory=DockerComposeConfig)
services: ServicesConfig = field(
default_factory=lambda: ServicesConfig(
grace_period_minutes=15,
check_interval_seconds=60,
state_file=Path("/var/lib/infra-controller/state.json"),
)
)
logging: LoggingConfig = field(default_factory=LoggingConfig)
@classmethod
def from_file(cls, path: Path | str) -> ControllerConfig:
"""Load configuration from a TOML or YAML file."""
path = Path(path)
if not path.exists():
return cls()
data = load_config_file(path)
return cls._from_dict(data)
@classmethod
def from_env(cls) -> ControllerConfig:
"""Load configuration from environment variables only."""
config = cls()
apply_env_overrides(config)
return config
@classmethod
def _from_dict(cls, data: dict[str, Any]) -> ControllerConfig:
"""Parse configuration from a dictionary."""
config = cls()
parse_discovery_config(config, data.get("discovery"))
parse_docker_config(config, data.get("docker"))
parse_services_config(config, data.get("services"))
parse_logging_config(config, data.get("logging"))
return config
def load_config_file(path: Path) -> dict[str, Any]:
"""Load configuration file based on extension."""
if path.suffix in {".toml", ".tml"}:
with open(path, "rb") as f:
return tomllib.load(f)
if path.suffix in {".yml", ".yaml"}:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
raise ValueError(f"Unsupported config format: {path.suffix}")
def parse_discovery_config(config: ControllerConfig, data: Any) -> None:
"""Parse discovery configuration section."""
if not isinstance(data, dict):
return
# File-based discovery
if isinstance(data.get("file_based"), dict):
fb = data["file_based"]
config.discovery.file_based_enabled = bool(fb.get("enabled", True))
if fb.get("path"):
config.discovery.file_based_path = Path(fb["path"])
# HTTP discovery
if isinstance(data.get("http"), dict):
http = data["http"]
config.discovery.http_enabled = bool(http.get("enabled", True))
config.discovery.http_host = str(http.get("host", config.discovery.http_host))
if http.get("port") is not None:
config.discovery.http_port = int(http["port"])
# Scan-based discovery
if isinstance(data.get("scan"), dict):
scan = data["scan"]
config.discovery.scan_enabled = bool(scan.get("enabled", True))
if isinstance(scan.get("paths"), list):
config.discovery.scan_paths = [Path(p) for p in scan["paths"]]
if isinstance(scan.get("exclude_patterns"), list):
config.discovery.exclude_patterns = [str(p) for p in scan["exclude_patterns"]]
def parse_docker_config(config: ControllerConfig, data: Any) -> None:
"""Parse Docker configuration section."""
if not isinstance(data, dict):
return
if data.get("base_dir"):
config.docker.base_dir = Path(data["base_dir"])
if data.get("compose_file"):
config.docker.compose_file = str(data["compose_file"])
def parse_services_config(config: ControllerConfig, data: Any) -> None:
"""Parse services configuration section."""
if not isinstance(data, dict):
return
if data.get("grace_period_minutes") is not None:
config.services.grace_period_minutes = int(data["grace_period_minutes"])
if data.get("check_interval_seconds") is not None:
config.services.check_interval_seconds = int(data["check_interval_seconds"])
if data.get("state_file"):
config.services.state_file = Path(data["state_file"])
def parse_logging_config(config: ControllerConfig, data: Any) -> None:
"""Parse logging configuration section."""
if not isinstance(data, dict):
return
if data.get("level"):
config.logging.level = str(data["level"])
if data.get("file"):
config.logging.file = Path(data["file"])
if data.get("max_bytes") is not None:
config.logging.max_bytes = int(data["max_bytes"])
if data.get("backup_count") is not None:
config.logging.backup_count = int(data["backup_count"])
def apply_env_overrides(config: ControllerConfig) -> None:
"""Apply environment variable overrides to configuration."""
if val := os.getenv("ACTIVE_APPS_DIR"):
config.discovery.file_based_path = Path(val)
if val := os.getenv("SCAN_PATHS"):
config.discovery.scan_paths = [Path(p.strip()) for p in val.split(",") if p.strip()]
if val := os.getenv("DOCKER_BASE_DIR"):
config.docker.base_dir = Path(val)
if val := os.getenv("DOCKER_COMPOSE_FILE"):
config.docker.compose_file = val
if val := os.getenv("CHECK_INTERVAL"):
config.services.check_interval_seconds = int(val)
if val := os.getenv("GRACE_PERIOD_MINUTES"):
config.services.grace_period_minutes = int(val)
if val := os.getenv("LOG_LEVEL"):
config.logging.level = val
def load_config(config_path: str | os.PathLike[str] | None = None) -> ControllerConfig:
"""
Load configuration with the following precedence:
1. Explicit config_path parameter
2. CONFIG_PATH environment variable
3. Default paths (/etc/infra-controller/config.{toml,yml})
4. Default configuration
Environment variables always override file-based configuration.
"""
# Determine config source
if config_path is None:
config_path = os.getenv("CONFIG_PATH")
cfg = _load_base_config(config_path)
# Apply environment variable overrides
apply_env_overrides(cfg)
return cfg
def _load_base_config(config_path: str | os.PathLike[str] | None) -> ControllerConfig:
"""Load base configuration from file or defaults."""
if config_path:
path = Path(str(config_path))
if path.exists():
return ControllerConfig.from_file(path)
return ControllerConfig()
# Try default locations
for default_path in [
Path("/etc/infra-controller/config.toml"),
Path("/etc/infra-controller/config.yml"),
]:
if default_path.exists():
return ControllerConfig.from_file(default_path)
# Fall back to defaults
return ControllerConfig()

192
src/controller.py Normal file
View file

@ -0,0 +1,192 @@
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from typing import Any
from src.config import ControllerConfig
from src.discovery import AppRegistration, DiscoveryManager
from src.manager import ServiceManager
from src.validation import extract_required_services
logger = logging.getLogger(__name__)
class InfraController:
"""
Main controller for infrastructure service management.
Discovers applications, determines required services, and manages
their lifecycle with configurable grace periods.
"""
def __init__(
self,
cfg: ControllerConfig,
discovery: DiscoveryManager | None = None,
services: ServiceManager | None = None,
):
self._cfg = cfg
self._discovery = discovery or DiscoveryManager(cfg.discovery)
self._services = services or ServiceManager(cfg.docker)
def run(self) -> None:
"""Run the controller in a continuous loop."""
while True:
self.run_once()
time.sleep(self._cfg.services.check_interval_seconds)
def run_once(self) -> None:
"""Execute one iteration of the controller logic."""
discovered = self._discovery.discover_all()
required = self._collect_required_services(discovered)
state = self._load_state()
unused_since = _extract_unused_tracking(state)
known_services = _extract_known_services(state)
now = time.time()
# Start/ensure required services
self._handle_required_services(required, unused_since, known_services)
# Stop unused services after grace period
self._handle_unused_services(required, known_services, unused_since, now)
# Save updated state
self._save_state({
"unused_since": unused_since,
"known_services": sorted(known_services),
})
def ensure_service(self, service_name: str) -> None:
"""Ensure a service is running."""
res = self._services.apply_service(service_name)
if res.returncode != 0:
raise RuntimeError(res.stderr or res.stdout)
def _collect_required_services(self, apps: dict[str, AppRegistration]) -> set[str]:
"""Collect all required services from discovered applications."""
required: set[str] = set()
for reg in apps.values():
services = extract_required_services(reg.metadata.requires)
required.update(services)
return required
def _handle_required_services(
self,
required: set[str],
unused_since: dict[str, float],
known_services: set[str],
) -> None:
"""Start required services and update tracking."""
for service in sorted(required):
logger.info("Ensuring service: %s", service)
self.ensure_service(service)
unused_since.pop(service, None)
known_services.add(service)
def _handle_unused_services(
self,
required: set[str],
known_services: set[str],
unused_since: dict[str, float],
now: float,
) -> None:
"""Stop services that have been unused beyond the grace period."""
# Include all services that are being tracked as unused
known_services |= set(unused_since.keys())
grace_seconds = self._cfg.services.grace_period_minutes * 60
unused_services = known_services - required
for service in sorted(unused_services):
self._process_unused_service(service, unused_since, known_services, now, grace_seconds)
def _process_unused_service(
self,
service: str,
unused_since: dict[str, float],
known_services: set[str],
now: float,
grace_seconds: int,
) -> None:
"""Process a single unused service, starting grace period or stopping if expired."""
since = unused_since.get(service)
# Start grace period if not already tracking
if since is None:
unused_since[service] = now
logger.info("Service no longer required (grace period started): %s", service)
return
# Validate and normalize timestamp
try:
since_ts = float(since)
except (ValueError, TypeError):
unused_since[service] = now
return
# Check if grace period has elapsed
if (now - since_ts) < grace_seconds:
return
# Stop the service
logger.info("Stopping unused service: %s", service)
res = self._services.stop_service(service)
if res.returncode != 0:
raise RuntimeError(res.stderr or res.stdout)
unused_since.pop(service, None)
known_services.discard(service)
def _load_state(self) -> dict[str, Any]:
"""Load state from the configured state file."""
path = self._cfg.services.state_file
try:
if not path.exists():
return {}
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
logger.warning("Failed to load state from %s: %s", path, e)
return {}
def _save_state(self, state: dict[str, Any]) -> None:
"""Save state to the configured state file atomically."""
path = self._cfg.services.state_file
try:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)
tmp.replace(path)
except Exception as e:
logger.error("Failed to save state to %s: %s", path, e)
raise
def _extract_unused_tracking(state: dict[str, Any]) -> dict[str, float]:
"""Extract and validate unused service tracking from state."""
unused_since = state.get("unused_since")
if isinstance(unused_since, dict):
return unused_since
return {}
def _extract_known_services(state: dict[str, Any]) -> set[str]:
"""Extract and validate known services from state."""
known_services_val = state.get("known_services")
if isinstance(known_services_val, list):
return {str(s) for s in known_services_val if isinstance(s, str) and s.strip()}
return set()

190
src/discovery.py Normal file
View file

@ -0,0 +1,190 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import logging
from pathlib import Path
import subprocess
from typing import Callable
from src.config import DiscoveryConfig
from src.metadata import InfraMetadata, infra_suffix_precedence
logger = logging.getLogger(__name__)
def _find_metadata_files(base_path: Path, exclude_patterns: list[str]) -> list[Path]:
candidates: list[Path] = []
for suffix in infra_suffix_precedence():
candidates.extend(base_path.rglob(f".infra{suffix}"))
return [
p
for p in candidates
if not any(p.match(pattern) for pattern in exclude_patterns)
]
def scan_for_apps(
paths: list[Path],
exclude_patterns: list[str],
is_active_project: Callable[[Path], bool],
) -> dict[str, "AppRegistration"]:
apps: dict[str, AppRegistration] = {}
for base_path in paths:
if not base_path.exists():
continue
chosen_dirs: set[Path] = set()
for suffix in infra_suffix_precedence():
for metadata_file in base_path.rglob(f".infra{suffix}"):
if any(metadata_file.match(pattern) for pattern in exclude_patterns):
continue
if metadata_file.parent in chosen_dirs:
continue
if not is_active_project(metadata_file.parent):
continue
try:
md = InfraMetadata.from_file(metadata_file)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method=f"scan:{metadata_file}",
)
chosen_dirs.add(metadata_file.parent)
logger.info("Discovered app via scan: %s", md.project)
except Exception as e:
logger.warning("Failed to load %s: %s", metadata_file, e)
return apps
@dataclass
class AppRegistration:
name: str
metadata: InfraMetadata
last_seen: datetime
discovery_method: str
class DiscoveryManager:
def __init__(self, config: DiscoveryConfig):
self.config = config
def discover_all(self) -> dict[str, AppRegistration]:
discovered: dict[str, AppRegistration] = {}
if self.config.file_based_enabled:
discovered.update(self._discover_file_based())
if self.config.scan_enabled:
discovered.update(self._discover_scan())
return discovered
def _discover_file_based_by_suffix_precedence(
self, watch_path: Path, apps: dict[str, AppRegistration]
) -> None:
chosen_stems: set[str] = set()
for suffix in infra_suffix_precedence():
for link_file in watch_path.glob(f"*{suffix}"):
try:
if link_file.stem in chosen_stems:
continue
target = link_file.resolve() if link_file.is_symlink() else link_file
if not target.exists():
continue
md = InfraMetadata.from_file(target)
if md.project in apps:
continue
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="file_based",
)
chosen_stems.add(link_file.stem)
logger.info("Discovered app via file: %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", link_file, e)
def _discover_file_based(self) -> dict[str, AppRegistration]:
apps: dict[str, AppRegistration] = {}
watch_path = self.config.file_based_path
if not watch_path.exists():
watch_path.mkdir(parents=True, exist_ok=True)
return apps
self._discover_file_based_by_suffix_precedence(watch_path, apps)
return apps
def _discover_scan_by_suffix_precedence(
self, base_path: Path, apps: dict[str, AppRegistration]
) -> None:
chosen_dirs: set[Path] = set()
for suffix in infra_suffix_precedence():
for infra_file in base_path.rglob(f".infra{suffix}"):
if self._should_exclude(infra_file):
continue
if infra_file.parent in chosen_dirs:
continue
if not self._is_active_project(infra_file.parent):
continue
try:
md = InfraMetadata.from_file(infra_file)
except Exception as e:
logger.error("Error reading %s: %s", infra_file, e)
continue
if md.project in apps:
continue
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="scan",
)
chosen_dirs.add(infra_file.parent)
logger.info("Discovered app via scan: %s", md.project)
def _discover_scan(self) -> dict[str, AppRegistration]:
return scan_for_apps(
paths=self.config.scan_paths,
exclude_patterns=self.config.exclude_patterns,
is_active_project=self._is_active_project,
)
def _should_exclude(self, path: Path) -> bool:
return any(path.match(pattern) for pattern in self.config.exclude_patterns)
def _is_active_project(self, project_dir: Path) -> bool:
git_head = project_dir / ".git" / "HEAD"
if git_head.exists():
mtime = datetime.fromtimestamp(git_head.stat().st_mtime)
if (datetime.now() - mtime).total_seconds() < 24 * 60 * 60:
return True
try:
result = subprocess.run(
["pgrep", "-f", str(project_dir)],
capture_output=True,
timeout=5,
text=True,
)
if result.returncode == 0:
return True
except Exception:
return False
return False

View file

@ -1,72 +0,0 @@
from __future__ import annotations
import argparse
from pathlib import Path
import shutil
import sys
from infra_controller.config import load_config
from infra_controller.controller import InfraController
from infra_controller.discovery import InfraMetadata
def register() -> None:
parser = argparse.ArgumentParser(prog="infra-register")
parser.add_argument("metadata_file", help="Path to .infra.toml/.infra.yml (or any metadata file)")
parser.add_argument(
"--name",
help="Override project name (otherwise read from metadata_file)",
)
args = parser.parse_args(sys.argv[1:])
cfg = load_config()
src = Path(args.metadata_file)
if not src.exists():
print(f"metadata file not found: {src}", file=sys.stderr)
raise SystemExit(2)
md = InfraMetadata.from_file(src)
name = args.name or md.project
dst_dir = cfg.discovery.file_based_path
dst_dir.mkdir(parents=True, exist_ok=True)
ext = src.suffix if src.suffix in {".toml", ".yml", ".yaml"} else ".toml"
dst = dst_dir / f"{name}{ext}"
tmp = dst.with_suffix(dst.suffix + ".tmp")
shutil.copyfile(src, tmp)
tmp.replace(dst)
def deregister() -> None:
parser = argparse.ArgumentParser(prog="infra-deregister")
parser.add_argument("name", help="Project name to deregister")
args = parser.parse_args(sys.argv[1:])
cfg = load_config()
dst_dir = cfg.discovery.file_based_path
removed = False
for ext in (".toml", ".yml", ".yaml"):
p = dst_dir / f"{args.name}{ext}"
if p.exists():
p.unlink()
removed = True
if not removed:
print(f"no registration found for: {args.name}", file=sys.stderr)
raise SystemExit(1)
def status() -> None:
cfg = load_config()
print(f"config={cfg}")
def ensure_service_cli(argv: list[str] | None = None) -> None:
parser = argparse.ArgumentParser(prog="infra-ensure")
parser.add_argument("service")
args = parser.parse_args(argv)
cfg = load_config()
InfraController(cfg).ensure_service(args.service)

View file

@ -1,189 +0,0 @@
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import tomllib
import yaml
@dataclass
class DiscoveryConfig:
file_based_enabled: bool = True
file_based_path: Path = Path("/var/run/active-apps")
http_enabled: bool = True
http_host: str = "127.0.0.1"
http_port: int = 8080
scan_enabled: bool = True
scan_paths: list[Path] = field(default_factory=lambda: [Path("/home"), Path("/opt/apps")])
exclude_patterns: list[str] = field(
default_factory=lambda: ["**/node_modules/**", "**/.venv/**", "**/venv/**"]
)
@dataclass
class DockerComposeConfig:
base_dir: Path = Path("/opt")
compose_file: str = "docker-compose.yml"
@dataclass
class ServicesConfig:
grace_period_minutes: int
check_interval_seconds: int
state_file: Path
@dataclass
class LoggingConfig:
level: str = "INFO"
file: Path | None = None
max_bytes: int = 10 * 1024 * 1024
backup_count: int = 5
@dataclass
class ControllerConfig:
discovery: DiscoveryConfig = field(default_factory=DiscoveryConfig)
docker: DockerComposeConfig = field(default_factory=DockerComposeConfig)
services: ServicesConfig = field(
default_factory=lambda: ServicesConfig(
grace_period_minutes=15,
check_interval_seconds=60,
state_file=Path("/var/lib/infra-controller/state.json"),
)
)
logging: LoggingConfig = field(default_factory=LoggingConfig)
@classmethod
def from_file(cls, path: Path | str) -> "ControllerConfig":
path = Path(path)
if not path.exists():
return cls()
if path.suffix in {".toml", ".tml"}:
with open(path, "rb") as f:
data = tomllib.load(f)
elif path.suffix in {".yml", ".yaml"}:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
else:
raise ValueError(f"Unsupported config format: {path.suffix}")
return cls._from_dict(data)
@classmethod
def from_env(cls) -> "ControllerConfig":
config = cls()
if val := os.getenv("ACTIVE_APPS_DIR"):
config.discovery.file_based_path = Path(val)
if val := os.getenv("SCAN_PATHS"):
config.discovery.scan_paths = [Path(p.strip()) for p in val.split(",") if p.strip()]
if val := os.getenv("DOCKER_BASE_DIR"):
config.docker.base_dir = Path(val)
if val := os.getenv("DOCKER_COMPOSE_FILE"):
config.docker.compose_file = val
if val := os.getenv("CHECK_INTERVAL"):
config.services.check_interval_seconds = int(val)
if val := os.getenv("GRACE_PERIOD_MINUTES"):
config.services.grace_period_minutes = int(val)
if val := os.getenv("LOG_LEVEL"):
config.logging.level = val
return config
@classmethod
def _from_dict(cls, data: dict[str, Any]) -> "ControllerConfig":
config = cls()
if isinstance(data.get("discovery"), dict):
disc = data["discovery"]
if isinstance(disc.get("file_based"), dict):
fb = disc["file_based"]
config.discovery.file_based_enabled = bool(fb.get("enabled", True))
if fb.get("path"):
config.discovery.file_based_path = Path(fb["path"])
if isinstance(disc.get("http"), dict):
http = disc["http"]
config.discovery.http_enabled = bool(http.get("enabled", True))
config.discovery.http_host = str(http.get("host", config.discovery.http_host))
if http.get("port") is not None:
config.discovery.http_port = int(http["port"])
if isinstance(disc.get("scan"), dict):
scan = disc["scan"]
config.discovery.scan_enabled = bool(scan.get("enabled", True))
if isinstance(scan.get("paths"), list):
config.discovery.scan_paths = [Path(p) for p in scan.get("paths") or []]
if isinstance(scan.get("exclude_patterns"), list):
config.discovery.exclude_patterns = [str(p) for p in scan.get("exclude_patterns") or []]
if isinstance(data.get("docker"), dict):
dk = data["docker"]
if dk.get("base_dir"):
config.docker.base_dir = Path(dk["base_dir"])
if dk.get("compose_file"):
config.docker.compose_file = str(dk["compose_file"])
if isinstance(data.get("services"), dict):
svc = data["services"]
if svc.get("grace_period_minutes") is not None:
config.services.grace_period_minutes = int(svc["grace_period_minutes"])
if svc.get("check_interval_seconds") is not None:
config.services.check_interval_seconds = int(svc["check_interval_seconds"])
if svc.get("state_file"):
config.services.state_file = Path(svc["state_file"])
if isinstance(data.get("logging"), dict):
lg = data["logging"]
if lg.get("level"):
config.logging.level = str(lg["level"])
if lg.get("file"):
config.logging.file = Path(lg["file"])
if lg.get("max_bytes") is not None:
config.logging.max_bytes = int(lg["max_bytes"])
if lg.get("backup_count") is not None:
config.logging.backup_count = int(lg["backup_count"])
return config
def load_config(config_path: str | os.PathLike[str] | None = None) -> ControllerConfig:
if config_path is None:
config_path = os.getenv("CONFIG_PATH")
if config_path:
p = Path(str(config_path))
if p.exists():
cfg = ControllerConfig.from_file(p)
else:
cfg = ControllerConfig.from_env()
else:
default_toml = Path("/etc/infra-controller/config.toml")
default_yaml = Path("/etc/infra-controller/config.yml")
if default_toml.exists():
cfg = ControllerConfig.from_file(default_toml)
elif default_yaml.exists():
cfg = ControllerConfig.from_file(default_yaml)
else:
cfg = ControllerConfig.from_env()
if val := os.getenv("ACTIVE_APPS_DIR"):
cfg.discovery.file_based_path = Path(val)
if val := os.getenv("SCAN_PATHS"):
cfg.discovery.scan_paths = [Path(p.strip()) for p in val.split(",") if p.strip()]
if val := os.getenv("DOCKER_BASE_DIR"):
cfg.docker.base_dir = Path(val)
if val := os.getenv("DOCKER_COMPOSE_FILE"):
cfg.docker.compose_file = val
if val := os.getenv("CHECK_INTERVAL"):
cfg.services.check_interval_seconds = int(val)
if val := os.getenv("GRACE_PERIOD_MINUTES"):
cfg.services.grace_period_minutes = int(val)
if val := os.getenv("LOG_LEVEL"):
cfg.logging.level = val
return cfg

View file

@ -1,124 +0,0 @@
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from infra_controller.config import ControllerConfig
from infra_controller.discovery import AppRegistration, DiscoveryManager
from infra_controller.service_manager import ServiceManager
logger = logging.getLogger(__name__)
class InfraController:
def __init__(
self,
cfg: ControllerConfig,
discovery: DiscoveryManager | None = None,
services: ServiceManager | None = None,
):
self._cfg = cfg
self._discovery = discovery or DiscoveryManager(cfg.discovery)
self._services = services or ServiceManager(cfg.docker)
def run(self) -> None:
while True:
self.run_once()
time.sleep(self._cfg.services.check_interval_seconds)
def run_once(self) -> None:
discovered = self._discovery.discover_all()
required = self._required_services(discovered)
state = self._load_state(self._cfg.services.state_file)
unused_since = state.get("unused_since")
if not isinstance(unused_since, dict):
unused_since = {}
known_services_val = state.get("known_services")
if isinstance(known_services_val, list):
known_services = {str(s) for s in known_services_val if isinstance(s, str) and s.strip()}
else:
known_services = set()
now = time.time()
for service in sorted(required):
logger.info("Ensuring service: %s", service)
self.ensure_service(service)
unused_since.pop(service, None)
known_services.add(service)
known_services |= set(unused_since.keys())
grace_seconds = int(self._cfg.services.grace_period_minutes) * 60
for service in sorted(known_services - set(required)):
since = unused_since.get(service)
if since is None:
unused_since[service] = now
logger.info("Service no longer required (grace period started): %s", service)
continue
try:
since_ts = float(since)
except Exception:
since_ts = now
unused_since[service] = now
continue
if (now - since_ts) < grace_seconds:
continue
logger.info("Stopping unused service: %s", service)
res = self._services.stop_service(service)
if res.returncode != 0:
raise RuntimeError(res.stderr or res.stdout)
unused_since.pop(service, None)
known_services.discard(service)
state["unused_since"] = unused_since
state["known_services"] = sorted(known_services)
self._save_state(self._cfg.services.state_file, state)
def ensure_service(self, service_name: str) -> None:
res = self._services.apply_service(service_name)
if res.returncode != 0:
raise RuntimeError(res.stderr or res.stdout)
def _required_services(self, apps: dict[str, AppRegistration]) -> set[str]:
required: set[str] = set()
for reg in apps.values():
requires = reg.metadata.requires
services = requires.get("services")
if services is None:
continue
if isinstance(services, list):
for s in services:
if isinstance(s, str) and s.strip():
required.add(s.strip())
elif isinstance(services, str) and services.strip():
required.add(services.strip())
return required
def _load_state(self, path: Path) -> dict:
try:
if not path.exists():
return {}
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return data
return {}
except Exception:
return {}
def _save_state(self, path: Path, state: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(state, f)
tmp.replace(path)

View file

@ -1,241 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
import logging
from pathlib import Path
import subprocess
import re
import tomllib
import yaml
from infra_controller.config import DiscoveryConfig
logger = logging.getLogger(__name__)
@dataclass
class InfraMetadata:
project: str
requires: dict[str, object]
metadata: dict[str, object] = field(default_factory=dict)
_PROJECT_RE = re.compile(r"^[A-Za-z0-9._-]+$")
@classmethod
def _validate_project(cls, project: object, path: Path) -> str:
if not isinstance(project, str):
raise ValueError(f"Invalid 'project' (expected string) in {path}")
if not project.strip():
raise ValueError(f"Invalid 'project' (empty) in {path}")
if cls._PROJECT_RE.fullmatch(project.strip()) is None:
raise ValueError(
f"Invalid 'project' (must match {cls._PROJECT_RE.pattern}) in {path}"
)
return project.strip()
@classmethod
def _validate_requires(cls, requires: object, path: Path) -> dict[str, object]:
if requires is None:
requires = {}
if not isinstance(requires, dict):
raise ValueError(f"Invalid 'requires' (expected mapping) in {path}")
services = requires.get("services")
if services is None:
return dict(requires)
if isinstance(services, str):
if not services.strip():
raise ValueError(f"Invalid 'requires.services' (empty string) in {path}")
return dict(requires)
if isinstance(services, list):
if not services:
raise ValueError(f"Invalid 'requires.services' (empty list) in {path}")
for idx, item in enumerate(services):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"Invalid 'requires.services[{idx}]' (expected non-empty string) in {path}"
)
return dict(requires)
raise ValueError(
f"Invalid 'requires.services' (expected string or list of strings) in {path}"
)
@classmethod
def from_file(cls, path: Path) -> "InfraMetadata":
if path.suffix == ".toml":
with open(path, "rb") as f:
data = tomllib.load(f)
elif path.suffix in {".yml", ".yaml"}:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
else:
raise ValueError(f"Unsupported format: {path}")
if not isinstance(data, dict):
raise ValueError(f"Invalid config (expected mapping at top level) in {path}")
project_val = data.get("project")
if project_val is None:
project_val = path.parent.name
project = cls._validate_project(project_val, path)
requires = cls._validate_requires(data.get("requires"), path)
metadata = {k: v for k, v in data.items() if k not in {"project", "requires"}}
return cls(project=str(project), requires=dict(requires), metadata=metadata)
@dataclass
class AppRegistration:
name: str
metadata: InfraMetadata
last_seen: datetime
discovery_method: str
class DiscoveryManager:
def __init__(self, config: DiscoveryConfig):
self.config = config
def discover_all(self) -> dict[str, AppRegistration]:
discovered: dict[str, AppRegistration] = {}
if self.config.file_based_enabled:
discovered.update(self._discover_file_based())
if self.config.scan_enabled:
discovered.update(self._discover_scan())
return discovered
def _discover_file_based(self) -> dict[str, AppRegistration]:
apps: dict[str, AppRegistration] = {}
watch_path = self.config.file_based_path
if not watch_path.exists():
watch_path.mkdir(parents=True, exist_ok=True)
return apps
for link_file in watch_path.glob("*.toml"):
try:
target = link_file.resolve() if link_file.is_symlink() else link_file
if not target.exists():
continue
md = InfraMetadata.from_file(target)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="file_based",
)
logger.info("Discovered app via file: %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", link_file, e)
for link_file in list(watch_path.glob("*.yml")) + list(watch_path.glob("*.yaml")):
try:
if (watch_path / f"{link_file.stem}.toml").exists():
continue
if link_file.stem in apps:
continue
target = link_file.resolve() if link_file.is_symlink() else link_file
if not target.exists():
continue
md = InfraMetadata.from_file(target)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="file_based",
)
logger.info("Discovered app via file (YAML): %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", link_file, e)
return apps
def _discover_scan(self) -> dict[str, AppRegistration]:
apps: dict[str, AppRegistration] = {}
for base_path in self.config.scan_paths:
if not base_path.exists():
continue
toml_dirs: set[Path] = set()
for infra_file in base_path.rglob(".infra.toml"):
if self._should_exclude(infra_file):
continue
if not self._is_active_project(infra_file.parent):
continue
try:
md = InfraMetadata.from_file(infra_file)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="scan",
)
toml_dirs.add(infra_file.parent)
logger.info("Discovered app via scan: %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", infra_file, e)
for infra_file in list(base_path.rglob(".infra.yml")) + list(base_path.rglob(".infra.yaml")):
if self._should_exclude(infra_file):
continue
if infra_file.parent in toml_dirs:
continue
if not self._is_active_project(infra_file.parent):
continue
try:
md = InfraMetadata.from_file(infra_file)
except Exception as e:
logger.error("Error reading %s: %s", infra_file, e)
continue
if md.project in apps:
continue
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="scan",
)
logger.info("Discovered app via scan (YAML): %s", md.project)
return apps
def _should_exclude(self, path: Path) -> bool:
return any(path.match(pattern) for pattern in self.config.exclude_patterns)
def _is_active_project(self, project_dir: Path) -> bool:
git_head = project_dir / ".git" / "HEAD"
if git_head.exists():
mtime = datetime.fromtimestamp(git_head.stat().st_mtime)
if (datetime.now() - mtime).total_seconds() < 24 * 60 * 60:
return True
try:
result = subprocess.run(
["pgrep", "-f", str(project_dir)],
capture_output=True,
timeout=5,
text=True,
)
if result.returncode == 0:
return True
except Exception:
return False
return False

View file

@ -1,84 +0,0 @@
from __future__ import annotations
import subprocess
from dataclasses import dataclass
from pathlib import Path
from infra_controller.config import DockerComposeConfig
@dataclass(frozen=True)
class ServiceResult:
returncode: int
stdout: str
stderr: str
class ServiceManager:
def __init__(self, docker: DockerComposeConfig):
self._docker = docker
def service_dir_for_service(self, service_name: str) -> Path:
return self._docker.base_dir / service_name
def _resolve_compose_file(self, service_dir: Path) -> Path:
configured = service_dir / self._docker.compose_file
if configured.exists():
return configured
candidates = list(service_dir.glob("docker-compose*.yml")) + list(service_dir.glob("docker-compose*.yaml"))
candidates = [p for p in candidates if p.is_file()]
if not candidates:
raise FileNotFoundError(
f"Compose file not found in {service_dir} (expected {self._docker.compose_file} or docker-compose*.yml/.yaml)"
)
def rank(p: Path) -> tuple[int, str]:
name = p.name
if name == "docker-compose.yml":
return (0, name)
if name == "docker-compose.yaml":
return (1, name)
if name.endswith(".yml"):
return (2, name)
return (3, name)
return sorted(candidates, key=rank)[0]
def apply_service(self, service_name: str) -> ServiceResult:
service_dir = self.service_dir_for_service(service_name)
if not service_dir.exists():
raise FileNotFoundError(f"Service directory not found: {service_dir}")
compose_file = self._resolve_compose_file(service_dir)
cmd = [
"docker",
"compose",
"-f",
str(compose_file),
"up",
"-d",
]
proc = subprocess.run(cmd, capture_output=True, text=True, cwd=str(service_dir))
return ServiceResult(returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)
def stop_service(self, service_name: str) -> ServiceResult:
service_dir = self.service_dir_for_service(service_name)
if not service_dir.exists():
raise FileNotFoundError(f"Service directory not found: {service_dir}")
compose_file = self._resolve_compose_file(service_dir)
cmd = [
"docker",
"compose",
"-f",
str(compose_file),
"down",
]
proc = subprocess.run(cmd, capture_output=True, text=True, cwd=str(service_dir))
return ServiceResult(returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)

149
src/manager.py Normal file
View file

@ -0,0 +1,149 @@
from __future__ import annotations
import logging
import subprocess
from dataclasses import dataclass
from pathlib import Path
from src.config import DockerComposeConfig
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class ServiceResult:
"""Result from a Docker Compose operation."""
returncode: int
stdout: str
stderr: str
@property
def success(self) -> bool:
"""Check if the operation was successful."""
return self.returncode == 0
def raise_for_status(self) -> None:
"""Raise an error if the operation failed."""
if not self.success:
raise RuntimeError(self.stderr or self.stdout)
class ServiceManager:
"""
Manages Docker Compose services.
Handles starting, stopping, and locating Docker Compose files for services.
"""
def __init__(self, docker: DockerComposeConfig):
self._docker = docker
def apply_service(self, service_name: str) -> ServiceResult:
"""Start a service using docker compose up -d."""
service_dir = self._get_service_dir(service_name)
compose_file = self._resolve_compose_file(service_dir)
logger.debug("Starting service %s with compose file %s", service_name, compose_file)
cmd = ["docker", "compose", "-f", str(compose_file), "up", "-d"]
return self._run_docker_compose(cmd, service_dir)
def stop_service(self, service_name: str) -> ServiceResult:
"""Stop a service using docker compose down."""
service_dir = self._get_service_dir(service_name)
compose_file = self._resolve_compose_file(service_dir)
logger.debug("Stopping service %s with compose file %s", service_name, compose_file)
cmd = ["docker", "compose", "-f", str(compose_file), "down"]
return self._run_docker_compose(cmd, service_dir)
def service_dir_for_service(self, service_name: str) -> Path:
"""Get the directory path for a service (public API for compatibility)."""
return self._get_service_dir(service_name)
def _get_service_dir(self, service_name: str) -> Path:
"""Get and validate the service directory path."""
service_dir = self._docker.base_dir / service_name
if not service_dir.exists():
raise FileNotFoundError(f"Service directory not found: {service_dir}")
if not service_dir.is_dir():
raise NotADirectoryError(f"Service path is not a directory: {service_dir}")
return service_dir
def _resolve_compose_file(self, service_dir: Path) -> Path:
"""
Resolve the Docker Compose file for a service directory.
Priority:
1. Configured compose file name (e.g., docker-compose.yml)
2. docker-compose.yml
3. docker-compose.yaml
4. docker-compose*.yml (sorted)
5. docker-compose*.yaml (sorted)
"""
# Check configured file first
configured = service_dir / self._docker.compose_file
if configured.exists():
return configured
# Find all docker-compose files
candidates = (
list(service_dir.glob("docker-compose*.yml")) +
list(service_dir.glob("docker-compose*.yaml"))
)
candidates = [p for p in candidates if p.is_file()]
if not candidates:
raise FileNotFoundError(
f"Compose file not found in {service_dir} "
f"(expected {self._docker.compose_file} or docker-compose*.yml/.yaml)"
)
# Sort by priority
return sorted(candidates, key=_compose_file_priority)[0]
def _run_docker_compose(self, cmd: list[str], cwd: Path) -> ServiceResult:
"""Execute a docker compose command."""
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=str(cwd),
check=False,
)
return ServiceResult(
returncode=proc.returncode,
stdout=proc.stdout,
stderr=proc.stderr,
)
except Exception as e:
logger.error("Failed to run docker compose: %s", e)
return ServiceResult(
returncode=1,
stdout="",
stderr=str(e),
)
def _compose_file_priority(path: Path) -> tuple[int, str]:
"""
Determine priority for compose file selection.
Lower values have higher priority.
"""
name = path.name
if name == "docker-compose.yml":
return (0, name)
if name == "docker-compose.yaml":
return (1, name)
if name.endswith(".yml"):
return (2, name)
return (3, name)

103
src/metadata.py Normal file
View file

@ -0,0 +1,103 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
import re
from typing import Callable, Any
import tomllib
import yaml
from src.validation import validate_project, validate_requires
Loader = Callable[[Path], dict[str, Any]]
def _load_toml(path: Path) -> dict[str, Any]:
"""Load a TOML configuration file."""
with open(path, "rb") as f:
data = tomllib.load(f)
if not isinstance(data, dict):
raise ValueError(f"Invalid config (expected mapping at top level) in {path}")
return data
def _load_yaml(path: Path) -> dict[str, Any]:
"""Load a YAML configuration file."""
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
raise ValueError(f"Invalid config (expected mapping at top level) in {path}")
return data
_LOADERS: dict[str, Loader] = {
".toml": _load_toml,
".yml": _load_yaml,
".yaml": _load_yaml,
}
_DEFAULT_SUFFIX_PRECEDENCE: tuple[str, ...] = (".toml", ".yml", ".yaml")
def infra_suffix_precedence() -> tuple[str, ...]:
"""Get the precedence order for infrastructure config file suffixes."""
known = list(_DEFAULT_SUFFIX_PRECEDENCE)
for suffix in sorted(_LOADERS.keys()):
if suffix not in known:
known.append(suffix)
return tuple(known)
def register_infra_loader(suffix: str, loader: Loader) -> None:
"""Register a custom loader for a file suffix."""
suffix = suffix if suffix.startswith(".") else f".{suffix}"
_LOADERS[suffix] = loader
def unregister_infra_loader(suffix: str) -> None:
"""Unregister a loader for a file suffix."""
suffix = suffix if suffix.startswith(".") else f".{suffix}"
_LOADERS.pop(suffix, None)
def read_infra_metadata(path: Path) -> InfraMetadata:
"""
Read and parse infrastructure metadata from a file.
Supports TOML and YAML formats. Project defaults to parent directory name
if not specified in the file.
"""
loader = _LOADERS.get(path.suffix)
if loader is None:
raise ValueError(f"Unsupported format: {path.suffix}")
data = loader(path)
# Extract and validate project
project_val = data.get("project", path.parent.name)
project = validate_project(project_val, path, InfraMetadata._PROJECT_RE)
# Extract and validate requires
requires = validate_requires(data.get("requires"), path)
# Extract remaining metadata
metadata = {k: v for k, v in data.items() if k not in {"project", "requires"}}
return InfraMetadata(project=project, requires=requires, metadata=metadata)
@dataclass
class InfraMetadata:
"""Infrastructure metadata for an application."""
project: str
requires: dict[str, Any]
metadata: dict[str, Any] = field(default_factory=dict)
_PROJECT_RE = re.compile(r"^[A-Za-z0-9._-]+$")
@classmethod
def from_file(cls, path: Path) -> InfraMetadata:
"""Load infrastructure metadata from a file."""
return read_infra_metadata(path)

78
src/validation.py Normal file
View file

@ -0,0 +1,78 @@
from __future__ import annotations
from pathlib import Path
import re
from typing import Any
def validate_project(project: Any, path: Path, project_re: re.Pattern[str]) -> str:
"""Validate and normalize a project name."""
if not isinstance(project, str):
raise ValueError(f"Invalid 'project' (expected string) in {path}")
normalized = project.strip()
if not normalized:
raise ValueError(f"Invalid 'project' (empty) in {path}")
if not project_re.fullmatch(normalized):
raise ValueError(f"Invalid 'project' (must match {project_re.pattern}) in {path}")
return normalized
def validate_requires(requires: Any, path: Path) -> dict[str, Any]:
"""Validate the requires configuration section."""
if requires is None:
return {}
if not isinstance(requires, dict):
raise ValueError(f"Invalid 'requires' (expected mapping) in {path}")
services = requires.get("services")
if services is not None:
validate_services(services, path)
return dict(requires)
def validate_services(services: Any, path: Path) -> None:
"""Validate the services field within requires."""
if isinstance(services, str):
if not services.strip():
raise ValueError(f"Invalid 'requires.services' (empty string) in {path}")
return
if isinstance(services, list):
if not services:
raise ValueError(f"Invalid 'requires.services' (empty list) in {path}")
for idx, item in enumerate(services):
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"Invalid 'requires.services[{idx}]' (expected non-empty string) in {path}"
)
return
raise ValueError(
f"Invalid 'requires.services' (expected string or list of strings) in {path}"
)
def extract_required_services(requires: dict[str, Any]) -> set[str]:
"""
Extract normalized service names from a requires dict.
Returns a set of service names, handling both string and list formats.
"""
services = requires.get("services")
if services is None:
return set()
if isinstance(services, str):
normalized = services.strip()
return {normalized} if normalized else set()
if isinstance(services, list):
return {s.strip() for s in services if isinstance(s, str) and s.strip()}
return set()

View file

@ -1,4 +1,4 @@
from infra_controller.config import load_config
from src.config import load_config
def test_load_config_defaults(tmp_path, monkeypatch):

View file

@ -4,9 +4,10 @@ from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from infra_controller.config import ControllerConfig
from infra_controller.controller import InfraController
from infra_controller.discovery import AppRegistration, InfraMetadata
from src.config import ControllerConfig
from src.controller import InfraController
from src.discovery import AppRegistration
from src.metadata import InfraMetadata
@dataclass
@ -56,17 +57,17 @@ def test_controller_stops_unused_services_after_grace_period(tmp_path: Path, mon
c = InfraController(cfg, discovery=discovery, services=services)
monkeypatch.setattr("infra_controller.controller.time.time", lambda: 0.0)
monkeypatch.setattr("src.controller.time.time", lambda: 0.0)
c.run_once()
assert services.applied == ["svc1"]
assert services.stopped == []
discovery.set_apps({})
monkeypatch.setattr("infra_controller.controller.time.time", lambda: 10.0)
monkeypatch.setattr("src.controller.time.time", lambda: 10.0)
c.run_once()
assert services.stopped == []
monkeypatch.setattr("infra_controller.controller.time.time", lambda: 20.0)
monkeypatch.setattr("src.controller.time.time", lambda: 20.0)
c.run_once()
assert services.stopped == ["svc1"]
@ -80,13 +81,13 @@ def test_controller_does_not_stop_service_within_grace_period(tmp_path: Path, mo
services = FakeServiceManager()
c = InfraController(cfg, discovery=discovery, services=services)
monkeypatch.setattr("infra_controller.controller.time.time", lambda: 0.0)
monkeypatch.setattr("src.controller.time.time", lambda: 0.0)
c.run_once()
discovery.set_apps({})
monkeypatch.setattr("infra_controller.controller.time.time", lambda: 10.0)
monkeypatch.setattr("src.controller.time.time", lambda: 10.0)
c.run_once()
monkeypatch.setattr("infra_controller.controller.time.time", lambda: 20.0)
monkeypatch.setattr("src.controller.time.time", lambda: 20.0)
c.run_once()
assert services.stopped == []

View file

@ -4,9 +4,9 @@ from pathlib import Path
import pytest
from infra_controller.config import DiscoveryConfig
from infra_controller.discovery import InfraMetadata
from infra_controller.discovery import DiscoveryManager
from src.config import DiscoveryConfig
from src.discovery import DiscoveryManager
from src.metadata import InfraMetadata
def _write(path: Path, content: str) -> Path: