infra-controller/src/infra_controller/discovery.py
Jeremie Fraeys 4cd7e72e2b
Some checks failed
Deploy / deploy (push) Failing after 7s
initial infra commit
2026-01-19 16:27:09 -05:00

182 lines
6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
import logging
from pathlib import Path
import subprocess
import tomllib
import yaml
from infra_controller.config import DiscoveryConfig
logger = logging.getLogger(__name__)
@dataclass
class InfraMetadata:
project: str
requires: dict[str, object]
metadata: dict[str, object] = field(default_factory=dict)
@classmethod
def from_file(cls, path: Path) -> "InfraMetadata":
if path.suffix == ".toml":
with open(path, "rb") as f:
data = tomllib.load(f)
elif path.suffix in {".yml", ".yaml"}:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
else:
raise ValueError(f"Unsupported format: {path}")
project = data.get("project") or path.parent.name
requires = data.get("requires") or {}
metadata = {k: v for k, v in data.items() if k not in {"project", "requires"}}
return cls(project=str(project), requires=dict(requires), metadata=metadata)
@dataclass
class AppRegistration:
name: str
metadata: InfraMetadata
last_seen: datetime
discovery_method: str
class DiscoveryManager:
def __init__(self, config: DiscoveryConfig):
self.config = config
def discover_all(self) -> dict[str, AppRegistration]:
discovered: dict[str, AppRegistration] = {}
if self.config.file_based_enabled:
discovered.update(self._discover_file_based())
if self.config.scan_enabled:
discovered.update(self._discover_scan())
return discovered
def _discover_file_based(self) -> dict[str, AppRegistration]:
apps: dict[str, AppRegistration] = {}
watch_path = self.config.file_based_path
if not watch_path.exists():
watch_path.mkdir(parents=True, exist_ok=True)
return apps
for link_file in watch_path.glob("*.toml"):
try:
target = link_file.resolve() if link_file.is_symlink() else link_file
if not target.exists():
continue
md = InfraMetadata.from_file(target)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="file_based",
)
logger.info("Discovered app via file: %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", link_file, e)
for link_file in list(watch_path.glob("*.yml")) + list(watch_path.glob("*.yaml")):
try:
if link_file.stem in apps:
continue
target = link_file.resolve() if link_file.is_symlink() else link_file
if not target.exists():
continue
md = InfraMetadata.from_file(target)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="file_based",
)
logger.info("Discovered app via file (YAML): %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", link_file, e)
return apps
def _discover_scan(self) -> dict[str, AppRegistration]:
apps: dict[str, AppRegistration] = {}
for base_path in self.config.scan_paths:
if not base_path.exists():
continue
for infra_file in base_path.rglob(".infra.toml"):
if self._should_exclude(infra_file):
continue
if not self._is_active_project(infra_file.parent):
continue
try:
md = InfraMetadata.from_file(infra_file)
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="scan",
)
logger.info("Discovered app via scan: %s", md.project)
except Exception as e:
logger.error("Error reading %s: %s", infra_file, e)
for infra_file in base_path.rglob(".infra.yml"):
if self._should_exclude(infra_file):
continue
try:
md = InfraMetadata.from_file(infra_file)
except Exception as e:
logger.error("Error reading %s: %s", infra_file, e)
continue
if md.project in apps:
continue
if not self._is_active_project(infra_file.parent):
continue
apps[md.project] = AppRegistration(
name=md.project,
metadata=md,
last_seen=datetime.now(),
discovery_method="scan",
)
logger.info("Discovered app via scan (YAML): %s", md.project)
return apps
def _should_exclude(self, path: Path) -> bool:
return any(path.match(pattern) for pattern in self.config.exclude_patterns)
def _is_active_project(self, project_dir: Path) -> bool:
git_head = project_dir / ".git" / "HEAD"
if git_head.exists():
mtime = datetime.fromtimestamp(git_head.stat().st_mtime)
if (datetime.now() - mtime).total_seconds() < 24 * 60 * 60:
return True
try:
result = subprocess.run(
["pgrep", "-f", str(project_dir)],
capture_output=True,
timeout=5,
text=True,
)
if result.returncode == 0:
return True
except Exception:
return False
return False