fetch_ml/podman/secure_runner.py

526 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Secure ML Experiment Runner
Optimized for data scientists with maximum speed
"""
import argparse
import json
import os
from pathlib import Path
import subprocess
import sys
import time
class SecurityPolicy:
"""Manages security policies for experiment execution"""
def __init__(
self, policy_file: str = "/etc/ml_runner/security_policy.json"
):
self.policy_file = policy_file
self.policy = self._load_policy()
def _load_policy(self) -> dict:
"""Load security policy from file"""
try:
with open(self.policy_file, "r") as f:
return json.load(f)
except FileNotFoundError:
# Default restrictive policy for Conda
return {
"allow_network": False,
"blocked_packages": [
"requests",
"urllib3",
"httpx",
"aiohttp",
"socket",
"telnetlib",
"ftplib",
"smtplib",
"paramiko",
"fabric",
],
"max_execution_time": 3600,
"max_memory_gb": 16,
"gpu_devices": ["/dev/dri"],
"allow_file_writes": True,
"resource_limits": {
"cpu_count": 4,
"memory_gb": 16,
"gpu_memory_gb": 12,
},
# Conda-specific settings
"conda_env": "ml_env",
"package_manager": "mamba",
"ds_friendly": True,
}
def check_package_safety(self, package_name: str) -> bool:
"""Check if a package is allowed"""
# Always allow ML tools even if they might be in blocked list
allowed_tools = self.policy.get("allowed_network_tools", [])
if package_name in allowed_tools:
return True
if package_name in self.policy.get("blocked_packages", []):
return False
return True
def check_network_access(self, domain: str | None = None) -> bool:
"""Check if network access is allowed"""
if not self.policy.get("allow_network", False):
return False
# Check if domain is in whitelist
if domain:
whitelist = self.policy.get("network_whitelist", [])
return any(allowed in domain for allowed in whitelist)
return True
def check_tool_allowed(self, tool_name: str) -> bool:
"""Check if a specific tool is allowed network access"""
allowed_tools = self.policy.get("allowed_network_tools", [])
return tool_name in allowed_tools
class CondaRunner:
"""Secure experiment runner with Conda + Mamba"""
def __init__(self, workspace_dir: str = "/workspace"):
self.workspace_dir = Path(workspace_dir)
self.security_policy = SecurityPolicy()
self.conda_env = self.security_policy.policy.get("conda_env", "ml_env")
self.package_manager = self.security_policy.policy.get(
"package_manager", "mamba"
)
self.results_dir = self.workspace_dir / "results"
# Detect if running in conda environment
self.is_conda = os.environ.get("CONDA_DEFAULT_ENV") is not None
# Conda paths
self.conda_prefix = os.environ.get("CONDA_PREFIX", "/opt/conda")
self.env_path = f"{self.conda_prefix}/envs/{self.conda_env}"
self.gpu_devices = self.security_policy.policy.get("gpu_devices", [])
def setup_environment(self, deps_file: Path) -> bool:
"""Setup Conda environment based on a dependency manifest."""
try:
name = deps_file.name
print(f"[MANIFEST] Using dependency manifest: {name}")
if name in ("environment.yml", "environment.yaml"):
print(f"[SETUP] Applying conda environment file: {deps_file}")
cmd = [
self.package_manager,
"env",
"update",
"-n",
self.conda_env,
"-f",
str(deps_file),
"-y",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=900)
if result.returncode != 0:
print(f"[ERROR] Failed to apply environment file: {result.stderr}")
return False
return True
if name == "poetry.lock":
pyproject = self.workspace_dir / "pyproject.toml"
if not pyproject.exists():
print("[ERROR] poetry.lock provided but pyproject.toml is missing")
return False
print(f"[SETUP] Installing dependencies from Poetry lockfile: {deps_file}")
env = os.environ.copy()
env.update(
{
"POETRY_VIRTUALENVS_CREATE": "false",
"POETRY_NO_INTERACTION": "1",
}
)
# Ensure Poetry is available in the conda env.
check = subprocess.run(
["conda", "run", "-n", self.conda_env, "poetry", "--version"],
capture_output=True,
text=True,
env=env,
)
if check.returncode != 0:
print("[ERROR] Poetry is not available in the container environment")
print(check.stderr)
return False
# Install into the conda env (no separate venv).
install = subprocess.run(
[
"conda",
"run",
"-n",
self.conda_env,
"poetry",
"install",
"--no-ansi",
],
capture_output=True,
text=True,
timeout=900,
cwd=str(self.workspace_dir),
env=env,
)
if install.returncode != 0:
print("[ERROR] Poetry install failed")
print(install.stderr)
return False
return True
if name == "pyproject.toml":
# Use pip's PEP517/pyproject support (no Poetry required).
# This installs the project itself; dependencies may be fetched as needed.
print(f"[SETUP] Installing project from pyproject.toml: {deps_file}")
cmd = [
"conda",
"run",
"-n",
self.conda_env,
"pip",
"install",
str(self.workspace_dir),
"--no-cache-dir",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=900)
if result.returncode != 0:
print(f"[ERROR] Failed to install project from pyproject.toml: {result.stderr}")
return False
return True
if name == "requirements.txt":
# Read requirements
with open(deps_file, "r") as f:
requirements = [
line.strip()
for line in f
if line.strip() and not line.startswith("#")
]
# Check each package for security
for req in requirements:
package_name = (
req.split("==")[0].split(">=")[0].split("<=")[0].strip()
)
if not self.security_policy.check_package_safety(package_name):
print(
f"[SECURITY] Package '{package_name}' is blocked for security reasons"
)
return False
# Install packages with mamba (super fast!)
for req in requirements:
package_name = (
req.split("==")[0].split(">=")[0].split("<=")[0].strip()
)
# Check if already installed with conda
check_cmd = [
"conda",
"run",
"-n",
self.conda_env,
"python",
"-c",
f"import {package_name.replace('-', '_')}",
]
result = subprocess.run(
check_cmd, capture_output=True, text=True
)
if result.returncode == 0:
print(f"[OK] {package_name} already installed in conda env")
continue
# Try conda-forge first (faster and more reliable)
print(
f"[INSTALL] Installing {req} with {self.package_manager}..."
)
install_cmd = [
self.package_manager,
"install",
"-n",
self.conda_env,
req,
"-c",
"conda-forge",
"-y",
]
result = subprocess.run(
install_cmd, capture_output=True, text=True, timeout=300
)
if result.returncode == 0:
print(f"[OK] Installed {req} with {self.package_manager}")
continue
# Fallback to pip if conda fails
print(f"[FALLBACK] Trying pip for {req}...")
pip_cmd = [
"conda",
"run",
"-n",
self.conda_env,
"pip",
"install",
req,
"--no-cache-dir",
]
result = subprocess.run(
pip_cmd, capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
print(f"[ERROR] Failed to install {req}: {result.stderr}")
return False
print(f"[OK] Installed {req} with pip")
return True
print(f"[ERROR] Unsupported dependency manifest: {deps_file}")
print("Supported: environment.yml, environment.yaml, poetry.lock (requires pyproject.toml), pyproject.toml, requirements.txt")
return False
except Exception as e:
print(f"[ERROR] Environment setup failed: {e}")
return False
def run_experiment(self, train_script: Path, args: list[str]) -> bool:
"""Run experiment in secure Conda environment"""
try:
if not train_script.exists():
print(f"[ERROR] Training script not found: {train_script}")
return False
# Create results directory
self.results_dir.mkdir(exist_ok=True)
# Setup environment variables for security
env = os.environ.copy()
env.update(
{
"CONDA_DEFAULT_ENV": self.conda_env,
"CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", ""), # Allow GPU access
"SECURE_MODE": "1",
"NETWORK_ACCESS": (
"1"
if self.security_policy.check_network_access(None)
else "0"
),
"CONDA_MODE": "1",
}
)
# Prepare command
cmd = [
"conda",
"run",
"-n",
self.conda_env,
"python",
str(train_script),
] + (args or [])
# Add default output directory if not provided
if "--output_dir" not in " ".join(args or []):
cmd.extend(["--output_dir", str(self.results_dir)])
print(f"[CMD] Running command: {' '.join(cmd)}")
print(f"[ENV] Conda environment: {self.conda_env}")
print(f"[PKG] Package manager: {self.package_manager}")
# Run with timeout and resource limits
start_time = time.time()
max_time = self.security_policy.policy.get(
"max_execution_time", 3600
)
print(f"[RUN] Starting experiment: {train_script.name}")
print(f"[TIME] Time limit: {max_time}s")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
cwd=str(self.workspace_dir),
)
try:
stdout, stderr = process.communicate(timeout=max_time)
execution_time = time.time() - start_time
if process.returncode == 0:
print(
f"[DONE] Experiment completed successfully in {execution_time:.1f}s"
)
# Save execution results
results = {
"status": "success",
"execution_time": execution_time,
"stdout": stdout,
"stderr": stderr,
"return_code": process.returncode,
"gpu_accessible": len(self.gpu_devices) > 0,
"security_mode": "enabled",
"container_type": "conda",
"conda_env": self.conda_env,
"package_manager": self.package_manager,
"ds_friendly": True,
}
results_file = self.results_dir / "execution_results.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
return True
else:
print(
f"[ERROR] Experiment failed with return code {process.returncode}"
)
print(f"STDERR: {stderr}")
return False
except subprocess.TimeoutExpired:
process.kill()
print(f"[TIMEOUT] Experiment timed out after {max_time}s")
return False
except Exception as e:
print(f"[ERROR] Experiment execution failed: {e}")
return False
def check_gpu_access(self) -> bool:
"""Check if GPU is accessible"""
try:
# Check with conda environment
result = subprocess.run(
[
"conda",
"run",
"-n",
self.conda_env,
"python",
"-c",
"import torch; print('CUDA available:', torch.cuda.is_available())",
],
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0
except Exception as e:
print("[ERROR] GPU access check failed:", e)
return False
def main():
parser = argparse.ArgumentParser(description="Secure ML Experiment Runner")
parser.add_argument(
"--workspace", default="/workspace", help="Workspace directory"
)
parser.add_argument("--deps", help="Dependency manifest path (environment.yml | poetry.lock | pyproject.toml | requirements.txt)")
parser.add_argument("--requirements", help="Deprecated alias for --deps")
parser.add_argument("--script", help="Training script path")
parser.add_argument(
"--prepare-only", action="store_true", help="Only prepare dependencies and exit"
)
parser.add_argument(
"--args",
nargs=argparse.REMAINDER,
default=[],
help="Additional script arguments",
)
parser.add_argument(
"--check-gpu", action="store_true", help="Check GPU access"
)
args = parser.parse_args()
# Initialize secure runner
runner = CondaRunner(args.workspace)
# Check GPU access if requested
if args.check_gpu:
if runner.check_gpu_access():
print("[OK] GPU access available")
# Show GPU info with conda
result = subprocess.run(
[
"conda",
"run",
"-n",
runner.conda_env,
"python",
"-c",
"import torch; print(f'GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else \"None\"}')",
],
capture_output=True,
text=True,
)
if result.returncode == 0:
print(f"GPU Info: {result.stdout.strip()}")
else:
print("[ERROR] No GPU access available")
return 1
# If only checking GPU, exit here
if args.check_gpu:
return 0
deps_arg = args.deps or args.requirements
if not deps_arg:
print("[ERROR] Missing dependency manifest. Provide --deps.")
return 1
# Setup environment
deps_path = Path(deps_arg)
if not deps_path.exists():
print(f"[ERROR] Dependency manifest not found: {deps_path}")
return 1
print("[SETUP] Setting up secure environment...")
if not runner.setup_environment(deps_path):
print("[ERROR] Failed to setup secure environment")
return 1
if args.prepare_only:
print("[DONE] Environment prepared successfully")
return 0
# Run experiment
script_path = Path(args.script)
if not script_path.exists():
print(f"[ERROR] Training script not found: {script_path}")
return 1
print("[RUN] Running experiment in secure container...")
if runner.run_experiment(script_path, args.args):
print("[DONE] Experiment completed successfully!")
return 0
else:
print("[ERROR] Experiment failed!")
return 1
if __name__ == "__main__":
sys.exit(main())