from __future__ import annotations import json import logging import time from pathlib import Path from infra_controller.config import ControllerConfig from infra_controller.discovery import AppRegistration, DiscoveryManager from infra_controller.service_manager import ServiceManager logger = logging.getLogger(__name__) class InfraController: def __init__( self, cfg: ControllerConfig, discovery: DiscoveryManager | None = None, services: ServiceManager | None = None, ): self._cfg = cfg self._discovery = discovery or DiscoveryManager(cfg.discovery) self._services = services or ServiceManager(cfg.docker) def run(self) -> None: while True: self.run_once() time.sleep(self._cfg.services.check_interval_seconds) def run_once(self) -> None: discovered = self._discovery.discover_all() required = self._required_services(discovered) state = self._load_state(self._cfg.services.state_file) unused_since = state.get("unused_since") if not isinstance(unused_since, dict): unused_since = {} known_services_val = state.get("known_services") if isinstance(known_services_val, list): known_services = {str(s) for s in known_services_val if isinstance(s, str) and s.strip()} else: known_services = set() now = time.time() for service in sorted(required): logger.info("Ensuring service: %s", service) self.ensure_service(service) unused_since.pop(service, None) known_services.add(service) known_services |= set(unused_since.keys()) grace_seconds = int(self._cfg.services.grace_period_minutes) * 60 for service in sorted(known_services - set(required)): since = unused_since.get(service) if since is None: unused_since[service] = now logger.info("Service no longer required (grace period started): %s", service) continue try: since_ts = float(since) except Exception: since_ts = now unused_since[service] = now continue if (now - since_ts) < grace_seconds: continue logger.info("Stopping unused service: %s", service) res = self._services.stop_service(service) if res.returncode != 0: raise RuntimeError(res.stderr or res.stdout) unused_since.pop(service, None) known_services.discard(service) state["unused_since"] = unused_since state["known_services"] = sorted(known_services) self._save_state(self._cfg.services.state_file, state) def ensure_service(self, service_name: str) -> None: res = self._services.apply_service(service_name) if res.returncode != 0: raise RuntimeError(res.stderr or res.stdout) def _required_services(self, apps: dict[str, AppRegistration]) -> set[str]: required: set[str] = set() for reg in apps.values(): requires = reg.metadata.requires services = requires.get("services") if services is None: continue if isinstance(services, list): for s in services: if isinstance(s, str) and s.strip(): required.add(s.strip()) elif isinstance(services, str) and services.strip(): required.add(services.strip()) return required def _load_state(self, path: Path) -> dict: try: if not path.exists(): return {} with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): return data return {} except Exception: return {} def _save_state(self, path: Path, state: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(state, f) tmp.replace(path)