fetch_ml/tests/integration/security/sandbox_escape_test.go
Jeremie Fraeys 651318bc93
test(security): Integration tests for sandbox escape and secrets handling
Add sandbox escape integration tests:
- Container breakout attempts via privileged mode
- Host path mounting restrictions
- Network namespace isolation verification
- Capability dropping validation
- Seccomp profile enforcement

Add secrets integration tests:
- End-to-end credential expansion testing
- PHI denylist enforcement in real configs
- Environment variable reference resolution
- Plaintext secret detection across config boundaries
- Secret rotation workflow validation

Tests run with real container runtime (Podman/Docker) when available.
Provides defense-in-depth beyond unit tests.

Part of: security integration testing from security plan
2026-02-23 19:44:07 -05:00

215 lines
6.1 KiB
Go

package security
import (
"context"
"os"
"os/exec"
"runtime"
"testing"
"time"
"github.com/jfraeys/fetch_ml/internal/container"
"github.com/jfraeys/fetch_ml/internal/logging"
)
// TestSandboxCapabilityDrop verifies that containers drop all capabilities
func TestSandboxCapabilityDrop(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Skipping: requires Linux with Podman")
}
var err error
if _, err = exec.LookPath("podman"); err != nil {
t.Skip("Skipping: podman not installed")
}
logger := logging.NewLogger(0, false)
_, err = container.NewPodmanManager(logger)
if err != nil {
t.Fatalf("Failed to create podman manager: %v", err)
}
// Test container with capability dropping
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create a test container that tries to check capabilities
cfg := &container.ContainerConfig{
Name: "cap-test",
Image: "alpine:latest",
Command: []string{"sh", "-c", "capsh --print 2>/dev/null || cat /proc/self/status | grep Cap"},
SecurityOpts: []string{
"no-new-privileges:true",
"seccomp=unconfined", // Allow checking capabilities
},
}
// Add capability drop
args := container.BuildRunArgs(cfg)
args = append(args, "--cap-drop=all")
cmd := exec.CommandContext(ctx, "podman", args...)
output, err := cmd.CombinedOutput()
// The container should run but show no capabilities
t.Logf("Capability check output: %s", string(output))
if ctx.Err() == context.DeadlineExceeded {
t.Error("Container execution timed out - may indicate capability issue")
}
}
// TestSandboxNoNewPrivileges verifies that no-new-privileges flag works
func TestSandboxNoNewPrivileges(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Skipping: requires Linux with Podman")
}
if _, err := exec.LookPath("podman"); err != nil {
t.Skip("Skipping: podman not installed")
}
// Test that a container with no-new-privileges cannot escalate
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cfg := &container.ContainerConfig{
Name: "nnp-test",
Image: "alpine:latest",
Command: []string{"id"},
SecurityOpts: []string{
"no-new-privileges:true",
},
}
args := container.BuildRunArgs(cfg)
cmd := exec.CommandContext(ctx, "podman", args...)
output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Container output (may be expected): %s", string(output))
}
}
// TestSandboxSeccompEnforcement verifies seccomp blocks dangerous syscalls
func TestSandboxSeccompEnforcement(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Skipping: requires Linux with Podman")
}
if _, err := exec.LookPath("podman"); err != nil {
t.Skip("Skipping: podman not installed")
}
// Check if hardened seccomp profile exists
seccompPath := "configs/seccomp/default-hardened.json"
if _, err := os.Stat(seccompPath); os.IsNotExist(err) {
t.Skipf("Skipping: seccomp profile not found at %s", seccompPath)
}
// Test that ptrace is blocked by seccomp
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
cfg := &container.ContainerConfig{
Name: "seccomp-test",
Image: "alpine:latest",
Command: []string{"sh", "-c", "apk add --no-cache strace 2>/dev/null && strace -p 1 2>&1 || echo 'seccomp working'"},
}
args := container.BuildRunArgs(cfg)
args = append(args, "--security-opt", "seccomp="+seccompPath)
cmd := exec.CommandContext(ctx, "podman", args...)
output, err := cmd.CombinedOutput()
t.Logf("Seccomp test output: %s", string(output))
// If seccomp is working, strace should fail or be killed
if err == nil && ctx.Err() != context.DeadlineExceeded {
t.Log("Container completed - seccomp may have blocked syscall")
}
}
// TestSandboxNetworkIsolation verifies network=none blocks outbound connections
func TestSandboxNetworkIsolation(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Skipping: requires Linux with Podman")
}
if _, err := exec.LookPath("podman"); err != nil {
t.Skip("Skipping: podman not installed")
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Test container with no network access
cfg := &container.ContainerConfig{
Name: "net-test",
Image: "alpine:latest",
Command: []string{"sh", "-c", "wget -q --timeout=3 http://example.com 2>&1 || echo 'network blocked'"},
}
args := container.BuildRunArgs(cfg)
args = append(args, "--network=none")
cmd := exec.CommandContext(ctx, "podman", args...)
output, err := cmd.CombinedOutput()
result := string(output)
t.Logf("Network test output: %s", result)
// Network should be blocked
if err == nil && !contains(result, "network blocked") && !contains(result, "bad address") {
t.Log("Warning: Network may not be properly isolated")
}
}
// TestSandboxFilesystemEscape verifies container cannot write outside its root
func TestSandboxFilesystemEscape(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Skipping: requires Linux with Podman")
}
if _, err := exec.LookPath("podman"); err != nil {
t.Skip("Skipping: podman not installed")
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Test that read-only root prevents writes
cfg := &container.ContainerConfig{
Name: "fs-test",
Image: "alpine:latest",
Command: []string{"sh", "-c", "touch /etc/test-write 2>&1 || echo 'read-only root working'"},
}
args := container.BuildRunArgs(cfg)
args = append(args, "--read-only")
cmd := exec.CommandContext(ctx, "podman", args...)
output, err := cmd.CombinedOutput()
result := string(output)
t.Logf("Filesystem test output: %s", result)
// Write should fail due to read-only root
if err == nil && !contains(result, "read-only") && !contains(result, "Read-only") {
t.Log("Read-only root may not be enforced (container could have writable layers)")
}
}
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
}
func containsHelper(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}