#!/usr/bin/env python3 """ Secure ML Experiment Runner Optimized for data scientists with maximum speed """ import argparse import json import os from pathlib import Path import subprocess import sys import time class SecurityPolicy: """Manages security policies for experiment execution""" def __init__( self, policy_file: str = "/etc/ml_runner/security_policy.json" ): self.policy_file = policy_file self.policy = self._load_policy() def _load_policy(self) -> dict: """Load security policy from file""" try: with open(self.policy_file, "r") as f: return json.load(f) except FileNotFoundError: # Default restrictive policy for Conda return { "allow_network": False, "blocked_packages": [ "requests", "urllib3", "httpx", "aiohttp", "socket", "telnetlib", "ftplib", "smtplib", "paramiko", "fabric", ], "max_execution_time": 3600, "max_memory_gb": 16, "gpu_devices": ["/dev/dri"], "allow_file_writes": True, "resource_limits": { "cpu_count": 4, "memory_gb": 16, "gpu_memory_gb": 12, }, # Conda-specific settings "conda_env": "ml_env", "package_manager": "mamba", "ds_friendly": True, } def check_package_safety(self, package_name: str) -> bool: """Check if a package is allowed""" # Always allow ML tools even if they might be in blocked list allowed_tools = self.policy.get("allowed_network_tools", []) if package_name in allowed_tools: return True if package_name in self.policy.get("blocked_packages", []): return False return True def check_network_access(self, domain: str | None = None) -> bool: """Check if network access is allowed""" if not self.policy.get("allow_network", False): return False # Check if domain is in whitelist if domain: whitelist = self.policy.get("network_whitelist", []) return any(allowed in domain for allowed in whitelist) return True def check_tool_allowed(self, tool_name: str) -> bool: """Check if a specific tool is allowed network access""" allowed_tools = self.policy.get("allowed_network_tools", []) return tool_name in allowed_tools class CondaRunner: """Secure experiment runner with Conda + Mamba""" def __init__(self, workspace_dir: str = "/workspace"): self.workspace_dir = Path(workspace_dir) self.security_policy = SecurityPolicy() self.conda_env = self.security_policy.policy.get("conda_env", "ml_env") self.package_manager = self.security_policy.policy.get( "package_manager", "mamba" ) self.results_dir = self.workspace_dir / "results" # Detect if running in conda environment self.is_conda = os.environ.get("CONDA_DEFAULT_ENV") is not None # Conda paths self.conda_prefix = os.environ.get("CONDA_PREFIX", "/opt/conda") self.env_path = f"{self.conda_prefix}/envs/{self.conda_env}" self.gpu_devices = self.security_policy.policy.get("gpu_devices", []) def setup_environment(self, deps_file: Path) -> bool: """Setup Conda environment based on a dependency manifest.""" try: name = deps_file.name print(f"[MANIFEST] Using dependency manifest: {name}") if name in ("environment.yml", "environment.yaml"): print(f"[SETUP] Applying conda environment file: {deps_file}") cmd = [ self.package_manager, "env", "update", "-n", self.conda_env, "-f", str(deps_file), "-y", ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=900) if result.returncode != 0: print(f"[ERROR] Failed to apply environment file: {result.stderr}") return False return True if name == "poetry.lock": pyproject = self.workspace_dir / "pyproject.toml" if not pyproject.exists(): print("[ERROR] poetry.lock provided but pyproject.toml is missing") return False print(f"[SETUP] Installing dependencies from Poetry lockfile: {deps_file}") env = os.environ.copy() env.update( { "POETRY_VIRTUALENVS_CREATE": "false", "POETRY_NO_INTERACTION": "1", } ) # Ensure Poetry is available in the conda env. check = subprocess.run( ["conda", "run", "-n", self.conda_env, "poetry", "--version"], capture_output=True, text=True, env=env, ) if check.returncode != 0: print("[ERROR] Poetry is not available in the container environment") print(check.stderr) return False # Install into the conda env (no separate venv). install = subprocess.run( [ "conda", "run", "-n", self.conda_env, "poetry", "install", "--no-ansi", ], capture_output=True, text=True, timeout=900, cwd=str(self.workspace_dir), env=env, ) if install.returncode != 0: print("[ERROR] Poetry install failed") print(install.stderr) return False return True if name == "pyproject.toml": # Use pip's PEP517/pyproject support (no Poetry required). # This installs the project itself; dependencies may be fetched as needed. print(f"[SETUP] Installing project from pyproject.toml: {deps_file}") cmd = [ "conda", "run", "-n", self.conda_env, "pip", "install", str(self.workspace_dir), "--no-cache-dir", ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=900) if result.returncode != 0: print(f"[ERROR] Failed to install project from pyproject.toml: {result.stderr}") return False return True if name == "requirements.txt": # Read requirements with open(deps_file, "r") as f: requirements = [ line.strip() for line in f if line.strip() and not line.startswith("#") ] # Check each package for security for req in requirements: package_name = ( req.split("==")[0].split(">=")[0].split("<=")[0].strip() ) if not self.security_policy.check_package_safety(package_name): print( f"[SECURITY] Package '{package_name}' is blocked for security reasons" ) return False # Install packages with mamba (super fast!) for req in requirements: package_name = ( req.split("==")[0].split(">=")[0].split("<=")[0].strip() ) # Check if already installed with conda check_cmd = [ "conda", "run", "-n", self.conda_env, "python", "-c", f"import {package_name.replace('-', '_')}", ] result = subprocess.run( check_cmd, capture_output=True, text=True ) if result.returncode == 0: print(f"[OK] {package_name} already installed in conda env") continue # Try conda-forge first (faster and more reliable) print( f"[INSTALL] Installing {req} with {self.package_manager}..." ) install_cmd = [ self.package_manager, "install", "-n", self.conda_env, req, "-c", "conda-forge", "-y", ] result = subprocess.run( install_cmd, capture_output=True, text=True, timeout=300 ) if result.returncode == 0: print(f"[OK] Installed {req} with {self.package_manager}") continue # Fallback to pip if conda fails print(f"[FALLBACK] Trying pip for {req}...") pip_cmd = [ "conda", "run", "-n", self.conda_env, "pip", "install", req, "--no-cache-dir", ] result = subprocess.run( pip_cmd, capture_output=True, text=True, timeout=300 ) if result.returncode != 0: print(f"[ERROR] Failed to install {req}: {result.stderr}") return False print(f"[OK] Installed {req} with pip") return True print(f"[ERROR] Unsupported dependency manifest: {deps_file}") print("Supported: environment.yml, environment.yaml, poetry.lock (requires pyproject.toml), pyproject.toml, requirements.txt") return False except Exception as e: print(f"[ERROR] Environment setup failed: {e}") return False def run_experiment(self, train_script: Path, args: list[str]) -> bool: """Run experiment in secure Conda environment""" try: if not train_script.exists(): print(f"[ERROR] Training script not found: {train_script}") return False # Create results directory self.results_dir.mkdir(exist_ok=True) # Setup environment variables for security env = os.environ.copy() env.update( { "CONDA_DEFAULT_ENV": self.conda_env, "CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", ""), # Allow GPU access "SECURE_MODE": "1", "NETWORK_ACCESS": ( "1" if self.security_policy.check_network_access(None) else "0" ), "CONDA_MODE": "1", } ) # Prepare command cmd = [ "conda", "run", "-n", self.conda_env, "python", str(train_script), ] + (args or []) # Add default output directory if not provided if "--output_dir" not in " ".join(args or []): cmd.extend(["--output_dir", str(self.results_dir)]) print(f"[CMD] Running command: {' '.join(cmd)}") print(f"[ENV] Conda environment: {self.conda_env}") print(f"[PKG] Package manager: {self.package_manager}") # Run with timeout and resource limits start_time = time.time() max_time = self.security_policy.policy.get( "max_execution_time", 3600 ) print(f"[RUN] Starting experiment: {train_script.name}") print(f"[TIME] Time limit: {max_time}s") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, cwd=str(self.workspace_dir), ) try: stdout, stderr = process.communicate(timeout=max_time) execution_time = time.time() - start_time if process.returncode == 0: print( f"[DONE] Experiment completed successfully in {execution_time:.1f}s" ) # Save execution results results = { "status": "success", "execution_time": execution_time, "stdout": stdout, "stderr": stderr, "return_code": process.returncode, "gpu_accessible": len(self.gpu_devices) > 0, "security_mode": "enabled", "container_type": "conda", "conda_env": self.conda_env, "package_manager": self.package_manager, "ds_friendly": True, } results_file = self.results_dir / "execution_results.json" with open(results_file, "w") as f: json.dump(results, f, indent=2) return True else: print( f"[ERROR] Experiment failed with return code {process.returncode}" ) print(f"STDERR: {stderr}") return False except subprocess.TimeoutExpired: process.kill() print(f"[TIMEOUT] Experiment timed out after {max_time}s") return False except Exception as e: print(f"[ERROR] Experiment execution failed: {e}") return False def check_gpu_access(self) -> bool: """Check if GPU is accessible""" try: # Check with conda environment result = subprocess.run( [ "conda", "run", "-n", self.conda_env, "python", "-c", "import torch; print('CUDA available:', torch.cuda.is_available())", ], capture_output=True, text=True, timeout=10, ) return result.returncode == 0 except Exception as e: print("[ERROR] GPU access check failed:", e) return False def main(): parser = argparse.ArgumentParser(description="Secure ML Experiment Runner") parser.add_argument( "--workspace", default="/workspace", help="Workspace directory" ) parser.add_argument("--deps", help="Dependency manifest path (environment.yml | poetry.lock | pyproject.toml | requirements.txt)") parser.add_argument("--requirements", help="Deprecated alias for --deps") parser.add_argument("--script", help="Training script path") parser.add_argument( "--prepare-only", action="store_true", help="Only prepare dependencies and exit" ) parser.add_argument( "--args", nargs=argparse.REMAINDER, default=[], help="Additional script arguments", ) parser.add_argument( "--check-gpu", action="store_true", help="Check GPU access" ) args = parser.parse_args() # Initialize secure runner runner = CondaRunner(args.workspace) # Check GPU access if requested if args.check_gpu: if runner.check_gpu_access(): print("[OK] GPU access available") # Show GPU info with conda result = subprocess.run( [ "conda", "run", "-n", runner.conda_env, "python", "-c", "import torch; print(f'GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else \"None\"}')", ], capture_output=True, text=True, ) if result.returncode == 0: print(f"GPU Info: {result.stdout.strip()}") else: print("[ERROR] No GPU access available") return 1 # If only checking GPU, exit here if args.check_gpu: return 0 deps_arg = args.deps or args.requirements if not deps_arg: print("[ERROR] Missing dependency manifest. Provide --deps.") return 1 # Setup environment deps_path = Path(deps_arg) if not deps_path.exists(): print(f"[ERROR] Dependency manifest not found: {deps_path}") return 1 print("[SETUP] Setting up secure environment...") if not runner.setup_environment(deps_path): print("[ERROR] Failed to setup secure environment") return 1 if args.prepare_only: print("[DONE] Environment prepared successfully") return 0 # Run experiment script_path = Path(args.script) if not script_path.exists(): print(f"[ERROR] Training script not found: {script_path}") return 1 print("[RUN] Running experiment in secure container...") if runner.run_experiment(script_path, args.args): print("[DONE] Experiment completed successfully!") return 0 else: print("[ERROR] Experiment failed!") return 1 if __name__ == "__main__": sys.exit(main())