Keep only app_ssh_access essentials: generate keypairs, upload plaintext Actions secrets, optionally update vault public keys.
297 lines
11 KiB
Python
297 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from typing import List, Optional
|
|
from urllib.parse import urljoin
|
|
|
|
import requests
|
|
|
|
|
|
def _api_headers(token: str) -> dict:
|
|
# Gitea/Forgejo uses "token <PAT>" in Authorization.
|
|
return {
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
def _secret_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str], name: str) -> str:
|
|
base = base_url.rstrip("/") + "/"
|
|
if scope == "repo":
|
|
if not owner or not repo:
|
|
raise ValueError("owner/repo required for repo scope")
|
|
return urljoin(base, f"api/v1/repos/{owner}/{repo}/actions/secrets/{name}")
|
|
if scope == "org":
|
|
if not org:
|
|
raise ValueError("org required for org scope")
|
|
return urljoin(base, f"api/v1/orgs/{org}/actions/secrets/{name}")
|
|
if scope == "user":
|
|
return urljoin(base, f"api/v1/user/actions/secrets/{name}")
|
|
raise ValueError(f"unsupported scope: {scope}")
|
|
|
|
|
|
def _ensure_ssh_keypair(private_key_path: str, comment: str, force: bool) -> None:
|
|
pub_path = private_key_path + ".pub"
|
|
if not force and (os.path.exists(private_key_path) or os.path.exists(pub_path)):
|
|
raise RuntimeError(
|
|
f"Refusing to overwrite existing key material: {private_key_path} / {pub_path} (use --force-generate-ssh-key)"
|
|
)
|
|
|
|
if force:
|
|
for p in (private_key_path, pub_path):
|
|
try:
|
|
if os.path.exists(p):
|
|
os.unlink(p)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to remove existing key material: {p}") from e
|
|
|
|
cmd = [
|
|
"ssh-keygen",
|
|
"-t",
|
|
"ed25519",
|
|
"-N",
|
|
"",
|
|
"-f",
|
|
private_key_path,
|
|
"-C",
|
|
comment,
|
|
]
|
|
|
|
try:
|
|
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
except FileNotFoundError:
|
|
raise RuntimeError("ssh-keygen not found in PATH")
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("ssh-keygen failed to generate keypair") from e
|
|
|
|
|
|
def put_secret_plain(
|
|
base_url: str,
|
|
token: str,
|
|
scope: str,
|
|
owner: Optional[str],
|
|
repo: Optional[str],
|
|
org: Optional[str],
|
|
name: str,
|
|
value: str,
|
|
) -> None:
|
|
url = _secret_url(base_url, scope, owner, repo, org, name)
|
|
# Some Forgejo/Gitea instances accept a plaintext secret value and encrypt server-side.
|
|
payload = {"data": value}
|
|
r = requests.put(url, headers=_api_headers(token), json=payload, timeout=30)
|
|
r.raise_for_status()
|
|
|
|
|
|
def _read_ansible_vault_text(vault_file: str, vault_password_file: Optional[str]) -> str:
|
|
cmd = ["ansible-vault", "view", vault_file]
|
|
if vault_password_file:
|
|
cmd.extend(["--vault-password-file", vault_password_file])
|
|
|
|
try:
|
|
p = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
except FileNotFoundError:
|
|
raise RuntimeError("ansible-vault not found in PATH")
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to read vault file via ansible-vault") from e
|
|
return p.stdout
|
|
|
|
|
|
def _write_ansible_vault_text(vault_file: str, plaintext: str, vault_password_file: Optional[str]) -> None:
|
|
with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8", prefix="vault_", suffix=".yml") as tf:
|
|
tmp_path = tf.name
|
|
tf.write(plaintext)
|
|
|
|
try:
|
|
cmd = ["ansible-vault", "encrypt", tmp_path, "--output", vault_file]
|
|
if vault_password_file:
|
|
cmd.extend(["--vault-password-file", vault_password_file])
|
|
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
except FileNotFoundError:
|
|
raise RuntimeError("ansible-vault not found in PATH")
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to write vault file via ansible-vault") from e
|
|
finally:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vault_password_file: Optional[str]) -> None:
|
|
plaintext = _read_ansible_vault_text(vault_file, vault_password_file)
|
|
if plaintext and not plaintext.endswith("\n"):
|
|
plaintext += "\n"
|
|
|
|
# Quote the value safely (SSH public keys contain spaces).
|
|
rendered_value = json.dumps(value)
|
|
new_line = f"{key}: {rendered_value}"
|
|
|
|
pattern = re.compile(rf"^(?P<k>{re.escape(key)})\s*:\s*.*$", re.MULTILINE)
|
|
if pattern.search(plaintext):
|
|
plaintext = pattern.sub(new_line, plaintext, count=1)
|
|
else:
|
|
if plaintext and not plaintext.endswith("\n"):
|
|
plaintext += "\n"
|
|
plaintext += new_line + "\n"
|
|
|
|
_write_ansible_vault_text(vault_file, plaintext, vault_password_file)
|
|
|
|
|
|
def args_parse():
|
|
p = argparse.ArgumentParser(
|
|
description="Create/update Forgejo Actions secrets for app_ssh_access (SSH register/deregister keys).",
|
|
epilog=(
|
|
"Security caveat: user/org scoped secrets are available to workflows in all repos under that user/org. "
|
|
"Only store deploy keys here if you trust the workflows/repositories that can access them."
|
|
),
|
|
)
|
|
p.add_argument("--base-url", default=os.environ.get("FORGEJO_BASE_URL", "https://git.jfraeys.com"))
|
|
p.add_argument("--token", default=os.environ.get("FORGEJO_API_TOKEN"))
|
|
p.add_argument("--vault-file", default="./secrets/vault.yml")
|
|
p.add_argument("--vault-password-file", default="./secrets/.vault_pass")
|
|
p.add_argument("--scope", choices=["user", "org", "repo"], default="user")
|
|
p.add_argument("--org", help="Organization name (required when --scope org)")
|
|
p.add_argument("--repo", help="owner/repo (required when --scope repo)")
|
|
p.add_argument("--force-generate-ssh-key", action="store_true")
|
|
p.add_argument(
|
|
"--generate-ssh-keys",
|
|
action="store_true",
|
|
help="Generate both register and deregister SSH key pairs.",
|
|
)
|
|
p.add_argument("--register-key-path", default="./secrets/service_ssh_key_register")
|
|
p.add_argument("--deregister-key-path", default="./secrets/service_ssh_key_deregister")
|
|
p.add_argument(
|
|
"--update-vault-both-public-keys",
|
|
action="store_true",
|
|
help="Set both SERVICE_SSH_REGISTER_PUBLIC_KEY and SERVICE_SSH_DEREGISTER_PUBLIC_KEY in vault",
|
|
)
|
|
|
|
args = p.parse_args()
|
|
return args
|
|
|
|
def main() -> int:
|
|
args = args_parse()
|
|
|
|
token = args.token
|
|
if not token:
|
|
print(
|
|
"FORGEJO_API_TOKEN is required (pass --token or set env FORGEJO_API_TOKEN)",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
owner: Optional[str] = None
|
|
repo: Optional[str] = None
|
|
org: Optional[str] = None
|
|
|
|
if args.scope == "repo":
|
|
if not args.repo or "/" not in args.repo:
|
|
print("--repo must be in the form owner/repo when --scope repo", file=sys.stderr)
|
|
return 2
|
|
owner, repo = args.repo.split("/", 1)
|
|
elif args.scope == "org":
|
|
if not args.org:
|
|
print("--org is required when --scope org", file=sys.stderr)
|
|
return 2
|
|
org = args.org
|
|
|
|
if args.scope in ("user", "org"):
|
|
print(
|
|
"WARNING: Using user/org scoped secrets. Ensure only trusted workflows/repositories can access them.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
if args.generate_ssh_keys:
|
|
_ensure_ssh_keypair(args.register_key_path, "actions-register", args.force_generate_ssh_key)
|
|
_ensure_ssh_keypair(args.deregister_key_path, "actions-deregister", args.force_generate_ssh_key)
|
|
else:
|
|
missing: list[str] = []
|
|
if not os.path.exists(args.register_key_path):
|
|
missing.append(args.register_key_path)
|
|
if not os.path.exists(args.deregister_key_path):
|
|
missing.append(args.deregister_key_path)
|
|
if missing:
|
|
print(
|
|
"Missing SSH private key(s): " + ", ".join(missing) + ". Use --generate-ssh-keys to create them.",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
with open(args.register_key_path, "r", encoding="utf-8") as f:
|
|
register_secret = f.read()
|
|
with open(args.deregister_key_path, "r", encoding="utf-8") as f:
|
|
deregister_secret = f.read()
|
|
|
|
try:
|
|
put_secret_plain(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret)
|
|
put_secret_plain(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret)
|
|
except requests.exceptions.HTTPError as e:
|
|
status = getattr(e.response, "status_code", None)
|
|
url = getattr(e.response, "url", "")
|
|
body = ""
|
|
required_scopes: List[str] = []
|
|
try:
|
|
body = (e.response.text or "").strip()
|
|
if body:
|
|
parsed = e.response.json()
|
|
scopes = parsed.get("message", "")
|
|
m = re.search(r"required scope\(s\):\s*\[(?P<scopes>[^\]]+)\]", str(scopes))
|
|
if m:
|
|
required_scopes = [s.strip() for s in m.group("scopes").split(",") if s.strip()]
|
|
except Exception:
|
|
pass
|
|
|
|
if status == 403 and args.scope in ("user", "org"):
|
|
print(
|
|
f"ERROR: Forgejo API returned 403 Forbidden when setting {args.scope}-scoped Actions secrets: {url}",
|
|
file=sys.stderr,
|
|
)
|
|
if body:
|
|
print(f"Response body: {body}", file=sys.stderr)
|
|
if required_scopes:
|
|
print(
|
|
"Missing token scope(s): " + ", ".join(required_scopes),
|
|
file=sys.stderr,
|
|
)
|
|
print(
|
|
"This usually means your Forgejo personal access token does not have sufficient permissions, "
|
|
"or your Forgejo instance does not allow managing user/org Actions secrets via the API.\n"
|
|
"Workarounds:\n"
|
|
"- Use the web UI to add secrets at /user/settings/actions/secrets (or /org/<org>/settings/actions/secrets)\n"
|
|
"- Or use repo-scoped secrets with: --scope repo --repo owner/repo\n",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
raise
|
|
|
|
if args.update_vault_both_public_keys:
|
|
with open(args.register_key_path + ".pub", "r", encoding="utf-8") as f:
|
|
_upsert_ansible_vault_scalar_line(
|
|
args.vault_file,
|
|
"SERVICE_SSH_REGISTER_PUBLIC_KEY",
|
|
f.read().strip(),
|
|
args.vault_password_file,
|
|
)
|
|
|
|
with open(args.deregister_key_path + ".pub", "r", encoding="utf-8") as f:
|
|
_upsert_ansible_vault_scalar_line(
|
|
args.vault_file,
|
|
"SERVICE_SSH_DEREGISTER_PUBLIC_KEY",
|
|
f.read().strip(),
|
|
args.vault_password_file,
|
|
)
|
|
|
|
target = "user" if args.scope == "user" else (f"org:{org}" if args.scope == "org" else f"repo:{owner}/{repo}")
|
|
print(f"Updated Actions secrets SERVICE_SSH_KEY_REGISTER and SERVICE_SSH_KEY_DEREGISTER for {target}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|