Organize podman/ directory into logical subdirectories: New structure: - docs/ - ML_TOOLS_GUIDE.md, jupyter_workflow.md - configs/ - environment*.yml, security_policy.json - containers/ - *.dockerfile, *.podfile - scripts/ - *.sh, *.py (secure_runner, cli_integration, etc.) - jupyter/ - jupyter_cookie_secret (flattened from jupyter_runtime/runtime/) - workspace/ - Example projects (cleaned of temp files) Cleaned workspace: - Removed .DS_Store, mlflow.db, cache/ - Removed duplicate cli_integration.py Removed unnecessary nesting: - Flattened jupyter_runtime/runtime/ to just jupyter/ Improves maintainability by grouping files by purpose and eliminating root directory clutter.
526 lines
18 KiB
Python
526 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Secure ML Experiment Runner
|
|
Optimized for data scientists with maximum speed
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
|
|
class SecurityPolicy:
|
|
"""Manages security policies for experiment execution"""
|
|
|
|
def __init__(
|
|
self, policy_file: str = "/etc/ml_runner/security_policy.json"
|
|
):
|
|
self.policy_file = policy_file
|
|
self.policy = self._load_policy()
|
|
|
|
def _load_policy(self) -> dict:
|
|
"""Load security policy from file"""
|
|
try:
|
|
with open(self.policy_file, "r") as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
# Default restrictive policy for Conda
|
|
return {
|
|
"allow_network": False,
|
|
"blocked_packages": [
|
|
"requests",
|
|
"urllib3",
|
|
"httpx",
|
|
"aiohttp",
|
|
"socket",
|
|
"telnetlib",
|
|
"ftplib",
|
|
"smtplib",
|
|
"paramiko",
|
|
"fabric",
|
|
],
|
|
"max_execution_time": 3600,
|
|
"max_memory_gb": 16,
|
|
"gpu_devices": ["/dev/dri"],
|
|
"allow_file_writes": True,
|
|
"resource_limits": {
|
|
"cpu_count": 4,
|
|
"memory_gb": 16,
|
|
"gpu_memory_gb": 12,
|
|
},
|
|
# Conda-specific settings
|
|
"conda_env": "ml_env",
|
|
"package_manager": "mamba",
|
|
"ds_friendly": True,
|
|
}
|
|
|
|
def check_package_safety(self, package_name: str) -> bool:
|
|
"""Check if a package is allowed"""
|
|
# Always allow ML tools even if they might be in blocked list
|
|
allowed_tools = self.policy.get("allowed_network_tools", [])
|
|
if package_name in allowed_tools:
|
|
return True
|
|
|
|
if package_name in self.policy.get("blocked_packages", []):
|
|
return False
|
|
return True
|
|
|
|
def check_network_access(self, domain: str | None = None) -> bool:
|
|
"""Check if network access is allowed"""
|
|
if not self.policy.get("allow_network", False):
|
|
return False
|
|
|
|
# Check if domain is in whitelist
|
|
if domain:
|
|
whitelist = self.policy.get("network_whitelist", [])
|
|
return any(allowed in domain for allowed in whitelist)
|
|
|
|
return True
|
|
|
|
def check_tool_allowed(self, tool_name: str) -> bool:
|
|
"""Check if a specific tool is allowed network access"""
|
|
allowed_tools = self.policy.get("allowed_network_tools", [])
|
|
return tool_name in allowed_tools
|
|
|
|
|
|
class CondaRunner:
|
|
"""Secure experiment runner with Conda + Mamba"""
|
|
|
|
def __init__(self, workspace_dir: str = "/workspace"):
|
|
self.workspace_dir = Path(workspace_dir)
|
|
self.security_policy = SecurityPolicy()
|
|
self.conda_env = self.security_policy.policy.get("conda_env", "ml_env")
|
|
self.package_manager = self.security_policy.policy.get(
|
|
"package_manager", "mamba"
|
|
)
|
|
self.results_dir = self.workspace_dir / "results"
|
|
|
|
# Detect if running in conda environment
|
|
self.is_conda = os.environ.get("CONDA_DEFAULT_ENV") is not None
|
|
|
|
# Conda paths
|
|
self.conda_prefix = os.environ.get("CONDA_PREFIX", "/opt/conda")
|
|
self.env_path = f"{self.conda_prefix}/envs/{self.conda_env}"
|
|
|
|
self.gpu_devices = self.security_policy.policy.get("gpu_devices", [])
|
|
|
|
def setup_environment(self, deps_file: Path) -> bool:
|
|
"""Setup Conda environment based on a dependency manifest."""
|
|
try:
|
|
name = deps_file.name
|
|
|
|
print(f"[MANIFEST] Using dependency manifest: {name}")
|
|
|
|
if name in ("environment.yml", "environment.yaml"):
|
|
print(f"[SETUP] Applying conda environment file: {deps_file}")
|
|
cmd = [
|
|
self.package_manager,
|
|
"env",
|
|
"update",
|
|
"-n",
|
|
self.conda_env,
|
|
"-f",
|
|
str(deps_file),
|
|
"-y",
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=900)
|
|
if result.returncode != 0:
|
|
print(f"[ERROR] Failed to apply environment file: {result.stderr}")
|
|
return False
|
|
return True
|
|
|
|
if name == "poetry.lock":
|
|
pyproject = self.workspace_dir / "pyproject.toml"
|
|
if not pyproject.exists():
|
|
print("[ERROR] poetry.lock provided but pyproject.toml is missing")
|
|
return False
|
|
|
|
print(f"[SETUP] Installing dependencies from Poetry lockfile: {deps_file}")
|
|
env = os.environ.copy()
|
|
env.update(
|
|
{
|
|
"POETRY_VIRTUALENVS_CREATE": "false",
|
|
"POETRY_NO_INTERACTION": "1",
|
|
}
|
|
)
|
|
|
|
# Ensure Poetry is available in the conda env.
|
|
check = subprocess.run(
|
|
["conda", "run", "-n", self.conda_env, "poetry", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
)
|
|
if check.returncode != 0:
|
|
print("[ERROR] Poetry is not available in the container environment")
|
|
print(check.stderr)
|
|
return False
|
|
|
|
# Install into the conda env (no separate venv).
|
|
install = subprocess.run(
|
|
[
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
self.conda_env,
|
|
"poetry",
|
|
"install",
|
|
"--no-ansi",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=900,
|
|
cwd=str(self.workspace_dir),
|
|
env=env,
|
|
)
|
|
if install.returncode != 0:
|
|
print("[ERROR] Poetry install failed")
|
|
print(install.stderr)
|
|
return False
|
|
|
|
return True
|
|
|
|
if name == "pyproject.toml":
|
|
# Use pip's PEP517/pyproject support (no Poetry required).
|
|
# This installs the project itself; dependencies may be fetched as needed.
|
|
print(f"[SETUP] Installing project from pyproject.toml: {deps_file}")
|
|
cmd = [
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
self.conda_env,
|
|
"pip",
|
|
"install",
|
|
str(self.workspace_dir),
|
|
"--no-cache-dir",
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=900)
|
|
if result.returncode != 0:
|
|
print(f"[ERROR] Failed to install project from pyproject.toml: {result.stderr}")
|
|
return False
|
|
return True
|
|
|
|
if name == "requirements.txt":
|
|
# Read requirements
|
|
with open(deps_file, "r") as f:
|
|
requirements = [
|
|
line.strip()
|
|
for line in f
|
|
if line.strip() and not line.startswith("#")
|
|
]
|
|
|
|
# Check each package for security
|
|
for req in requirements:
|
|
package_name = (
|
|
req.split("==")[0].split(">=")[0].split("<=")[0].strip()
|
|
)
|
|
if not self.security_policy.check_package_safety(package_name):
|
|
print(
|
|
f"[SECURITY] Package '{package_name}' is blocked for security reasons"
|
|
)
|
|
return False
|
|
|
|
# Install packages with mamba (super fast!)
|
|
for req in requirements:
|
|
package_name = (
|
|
req.split("==")[0].split(">=")[0].split("<=")[0].strip()
|
|
)
|
|
|
|
# Check if already installed with conda
|
|
check_cmd = [
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
self.conda_env,
|
|
"python",
|
|
"-c",
|
|
f"import {package_name.replace('-', '_')}",
|
|
]
|
|
result = subprocess.run(
|
|
check_cmd, capture_output=True, text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
print(f"[OK] {package_name} already installed in conda env")
|
|
continue
|
|
|
|
# Try conda-forge first (faster and more reliable)
|
|
print(
|
|
f"[INSTALL] Installing {req} with {self.package_manager}..."
|
|
)
|
|
install_cmd = [
|
|
self.package_manager,
|
|
"install",
|
|
"-n",
|
|
self.conda_env,
|
|
req,
|
|
"-c",
|
|
"conda-forge",
|
|
"-y",
|
|
]
|
|
result = subprocess.run(
|
|
install_cmd, capture_output=True, text=True, timeout=300
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
print(f"[OK] Installed {req} with {self.package_manager}")
|
|
continue
|
|
|
|
# Fallback to pip if conda fails
|
|
print(f"[FALLBACK] Trying pip for {req}...")
|
|
pip_cmd = [
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
self.conda_env,
|
|
"pip",
|
|
"install",
|
|
req,
|
|
"--no-cache-dir",
|
|
]
|
|
result = subprocess.run(
|
|
pip_cmd, capture_output=True, text=True, timeout=300
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f"[ERROR] Failed to install {req}: {result.stderr}")
|
|
return False
|
|
|
|
print(f"[OK] Installed {req} with pip")
|
|
|
|
return True
|
|
|
|
print(f"[ERROR] Unsupported dependency manifest: {deps_file}")
|
|
print("Supported: environment.yml, environment.yaml, poetry.lock (requires pyproject.toml), pyproject.toml, requirements.txt")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Environment setup failed: {e}")
|
|
return False
|
|
|
|
def run_experiment(self, train_script: Path, args: list[str]) -> bool:
|
|
"""Run experiment in secure Conda environment"""
|
|
try:
|
|
if not train_script.exists():
|
|
print(f"[ERROR] Training script not found: {train_script}")
|
|
return False
|
|
|
|
# Create results directory
|
|
self.results_dir.mkdir(exist_ok=True)
|
|
|
|
# Setup environment variables for security
|
|
env = os.environ.copy()
|
|
env.update(
|
|
{
|
|
"CONDA_DEFAULT_ENV": self.conda_env,
|
|
"CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", ""), # Allow GPU access
|
|
"SECURE_MODE": "1",
|
|
"NETWORK_ACCESS": (
|
|
"1"
|
|
if self.security_policy.check_network_access(None)
|
|
else "0"
|
|
),
|
|
"CONDA_MODE": "1",
|
|
}
|
|
)
|
|
|
|
# Prepare command
|
|
cmd = [
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
self.conda_env,
|
|
"python",
|
|
str(train_script),
|
|
] + (args or [])
|
|
|
|
# Add default output directory if not provided
|
|
if "--output_dir" not in " ".join(args or []):
|
|
cmd.extend(["--output_dir", str(self.results_dir)])
|
|
|
|
print(f"[CMD] Running command: {' '.join(cmd)}")
|
|
print(f"[ENV] Conda environment: {self.conda_env}")
|
|
print(f"[PKG] Package manager: {self.package_manager}")
|
|
|
|
# Run with timeout and resource limits
|
|
start_time = time.time()
|
|
max_time = self.security_policy.policy.get(
|
|
"max_execution_time", 3600
|
|
)
|
|
|
|
print(f"[RUN] Starting experiment: {train_script.name}")
|
|
print(f"[TIME] Time limit: {max_time}s")
|
|
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
env=env,
|
|
cwd=str(self.workspace_dir),
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = process.communicate(timeout=max_time)
|
|
execution_time = time.time() - start_time
|
|
|
|
if process.returncode == 0:
|
|
print(
|
|
f"[DONE] Experiment completed successfully in {execution_time:.1f}s"
|
|
)
|
|
|
|
# Save execution results
|
|
results = {
|
|
"status": "success",
|
|
"execution_time": execution_time,
|
|
"stdout": stdout,
|
|
"stderr": stderr,
|
|
"return_code": process.returncode,
|
|
"gpu_accessible": len(self.gpu_devices) > 0,
|
|
"security_mode": "enabled",
|
|
"container_type": "conda",
|
|
"conda_env": self.conda_env,
|
|
"package_manager": self.package_manager,
|
|
"ds_friendly": True,
|
|
}
|
|
|
|
results_file = self.results_dir / "execution_results.json"
|
|
with open(results_file, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
|
|
return True
|
|
else:
|
|
print(
|
|
f"[ERROR] Experiment failed with return code {process.returncode}"
|
|
)
|
|
print(f"STDERR: {stderr}")
|
|
return False
|
|
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
print(f"[TIMEOUT] Experiment timed out after {max_time}s")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Experiment execution failed: {e}")
|
|
return False
|
|
|
|
def check_gpu_access(self) -> bool:
|
|
"""Check if GPU is accessible"""
|
|
try:
|
|
# Check with conda environment
|
|
result = subprocess.run(
|
|
[
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
self.conda_env,
|
|
"python",
|
|
"-c",
|
|
"import torch; print('CUDA available:', torch.cuda.is_available())",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
return result.returncode == 0
|
|
except Exception as e:
|
|
print("[ERROR] GPU access check failed:", e)
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Secure ML Experiment Runner")
|
|
parser.add_argument(
|
|
"--workspace", default="/workspace", help="Workspace directory"
|
|
)
|
|
parser.add_argument("--deps", help="Dependency manifest path (environment.yml | poetry.lock | pyproject.toml | requirements.txt)")
|
|
parser.add_argument("--requirements", help="Deprecated alias for --deps")
|
|
parser.add_argument("--script", help="Training script path")
|
|
parser.add_argument(
|
|
"--prepare-only", action="store_true", help="Only prepare dependencies and exit"
|
|
)
|
|
parser.add_argument(
|
|
"--args",
|
|
nargs=argparse.REMAINDER,
|
|
default=[],
|
|
help="Additional script arguments",
|
|
)
|
|
parser.add_argument(
|
|
"--check-gpu", action="store_true", help="Check GPU access"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize secure runner
|
|
runner = CondaRunner(args.workspace)
|
|
|
|
# Check GPU access if requested
|
|
if args.check_gpu:
|
|
if runner.check_gpu_access():
|
|
print("[OK] GPU access available")
|
|
# Show GPU info with conda
|
|
result = subprocess.run(
|
|
[
|
|
"conda",
|
|
"run",
|
|
"-n",
|
|
runner.conda_env,
|
|
"python",
|
|
"-c",
|
|
"import torch; print(f'GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else \"None\"}')",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0:
|
|
print(f"GPU Info: {result.stdout.strip()}")
|
|
else:
|
|
print("[ERROR] No GPU access available")
|
|
return 1
|
|
|
|
# If only checking GPU, exit here
|
|
if args.check_gpu:
|
|
return 0
|
|
|
|
deps_arg = args.deps or args.requirements
|
|
if not deps_arg:
|
|
print("[ERROR] Missing dependency manifest. Provide --deps.")
|
|
return 1
|
|
|
|
# Setup environment
|
|
deps_path = Path(deps_arg)
|
|
if not deps_path.exists():
|
|
print(f"[ERROR] Dependency manifest not found: {deps_path}")
|
|
return 1
|
|
|
|
print("[SETUP] Setting up secure environment...")
|
|
if not runner.setup_environment(deps_path):
|
|
print("[ERROR] Failed to setup secure environment")
|
|
return 1
|
|
|
|
if args.prepare_only:
|
|
print("[DONE] Environment prepared successfully")
|
|
return 0
|
|
|
|
# Run experiment
|
|
script_path = Path(args.script)
|
|
if not script_path.exists():
|
|
print(f"[ERROR] Training script not found: {script_path}")
|
|
return 1
|
|
|
|
print("[RUN] Running experiment in secure container...")
|
|
if runner.run_experiment(script_path, args.args):
|
|
print("[DONE] Experiment completed successfully!")
|
|
return 0
|
|
else:
|
|
print("[ERROR] Experiment failed!")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|