refactor(scripts): simplify forgejo actions secret helper

Keep only app_ssh_access essentials: generate keypairs, upload plaintext Actions secrets, optionally update vault public keys.
This commit is contained in:
Jeremie Fraeys 2026-01-21 23:15:38 -05:00
parent 872d0cbe49
commit 67eb2227dd
No known key found for this signature in database

View file

@ -1,33 +1,17 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import tempfile
from typing import List, Optional, Tuple
from typing import List, Optional
from urllib.parse import urljoin
import requests
try:
import yaml
except Exception: # pragma: no cover
yaml = None
def _require_pynacl():
try:
from nacl.public import PublicKey, SealedBox
except Exception as e:
raise RuntimeError(
"PyNaCl is required for secrets encryption. Install with: pip install pynacl"
) from e
return PublicKey, SealedBox
def _api_headers(token: str) -> dict:
# Gitea/Forgejo uses "token <PAT>" in Authorization.
@ -38,21 +22,6 @@ def _api_headers(token: str) -> dict:
}
def _public_key_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str]) -> str:
base = base_url.rstrip("/") + "/"
if scope == "repo":
if not owner or not repo:
raise ValueError("owner/repo required for repo scope")
return urljoin(base, f"api/v1/repos/{owner}/{repo}/actions/secrets/public-key")
if scope == "org":
if not org:
raise ValueError("org required for org scope")
return urljoin(base, f"api/v1/orgs/{org}/actions/secrets/public-key")
if scope == "user":
return urljoin(base, "api/v1/user/actions/secrets/public-key")
raise ValueError(f"unsupported scope: {scope}")
def _secret_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str], name: str) -> str:
base = base_url.rstrip("/") + "/"
if scope == "repo":
@ -68,52 +37,6 @@ def _secret_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[
raise ValueError(f"unsupported scope: {scope}")
def get_public_key(base_url: str, token: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str]) -> dict:
url = _public_key_url(base_url, scope, owner, repo, org)
r = requests.get(url, headers=_api_headers(token), timeout=30)
# Some Forgejo/Gitea versions do not expose a public-key endpoint.
# In that case, secrets are submitted in plaintext (over TLS) and encrypted server-side.
if r.status_code in (404, 405):
raise NotImplementedError("public-key endpoint not available")
r.raise_for_status()
data = r.json()
# Expected: {"key_id": "...", "key": "base64..."}
if "key" not in data or "key_id" not in data:
raise RuntimeError(f"Unexpected response from {url}: {data}")
return data
def _put_secret_auto(
base_url: str,
token: str,
scope: str,
owner: Optional[str],
repo: Optional[str],
org: Optional[str],
name: str,
value: str,
) -> None:
try:
public = get_public_key(base_url, token, scope, owner, repo, org)
encrypted = encrypt_secret(public["key"], value)
put_secret(base_url, token, scope, owner, repo, org, name, encrypted, public["key_id"])
except NotImplementedError:
put_secret_plain(base_url, token, scope, owner, repo, org, name, value)
def encrypt_secret(public_key_b64: str, secret_value: str) -> str:
PublicKey, SealedBox = _require_pynacl()
pk_bytes = base64.b64decode(public_key_b64)
pk = PublicKey(pk_bytes)
box = SealedBox(pk)
encrypted = box.encrypt(secret_value.encode("utf-8"))
return base64.b64encode(encrypted).decode("ascii")
def _ensure_ssh_keypair(private_key_path: str, comment: str, force: bool) -> None:
pub_path = private_key_path + ".pub"
if not force and (os.path.exists(private_key_path) or os.path.exists(pub_path)):
@ -149,26 +72,6 @@ def _ensure_ssh_keypair(private_key_path: str, comment: str, force: bool) -> Non
raise RuntimeError("ssh-keygen failed to generate keypair") from e
def put_secret(
base_url: str,
token: str,
scope: str,
owner: Optional[str],
repo: Optional[str],
org: Optional[str],
name: str,
encrypted_value_b64: str,
key_id: str,
) -> None:
url = _secret_url(base_url, scope, owner, repo, org, name)
payload = {
"encrypted_value": encrypted_value_b64,
"key_id": key_id,
}
r = requests.put(url, headers=_api_headers(token), json=payload, timeout=30)
r.raise_for_status()
def put_secret_plain(
base_url: str,
token: str,
@ -186,28 +89,6 @@ def put_secret_plain(
r.raise_for_status()
def _read_ansible_vault_yaml(vault_file: str, vault_password_file: Optional[str]) -> dict:
if yaml is None:
raise RuntimeError("PyYAML is required to read tokens from ansible-vault. Install with: pip install pyyaml")
cmd = ["ansible-vault", "view", vault_file]
if vault_password_file:
cmd.extend(["--vault-password-file", vault_password_file])
try:
p = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
except FileNotFoundError:
raise RuntimeError("ansible-vault not found in PATH")
except subprocess.CalledProcessError as e:
# Don't echo stderr; it may contain hints about vault location.
raise RuntimeError("Failed to read vault file via ansible-vault") from e
data = yaml.safe_load(p.stdout) or {}
if not isinstance(data, dict):
raise RuntimeError(f"Vault file {vault_file} did not parse to a YAML mapping")
return data
def _read_ansible_vault_text(vault_file: str, vault_password_file: Optional[str]) -> str:
cmd = ["ansible-vault", "view", vault_file]
if vault_password_file:
@ -263,15 +144,6 @@ def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vau
_write_ansible_vault_text(vault_file, plaintext, vault_password_file)
def _load_token_from_ansible_vault(vault_file: str, token_var: str, vault_password_file: Optional[str]) -> Tuple[Optional[str], List[str]]:
data = _read_ansible_vault_yaml(vault_file, vault_password_file)
keys = sorted([str(k) for k in data.keys()])
val = data.get(token_var)
if isinstance(val, str) and val.strip():
return val.strip(), keys
return None, keys
def args_parse():
p = argparse.ArgumentParser(
description="Create/update Forgejo Actions secrets for app_ssh_access (SSH register/deregister keys).",
@ -284,7 +156,6 @@ def args_parse():
p.add_argument("--token", default=os.environ.get("FORGEJO_API_TOKEN"))
p.add_argument("--vault-file", default="./secrets/vault.yml")
p.add_argument("--vault-password-file", default="./secrets/.vault_pass")
p.add_argument("--token-var", default="FORGEJO_API_TOKEN")
p.add_argument("--scope", choices=["user", "org", "repo"], default="user")
p.add_argument("--org", help="Organization name (required when --scope org)")
p.add_argument("--repo", help="owner/repo (required when --scope repo)")
@ -309,20 +180,11 @@ def main() -> int:
args = args_parse()
token = args.token
vault_keys: List[str] = []
if not token:
token, vault_keys = _load_token_from_ansible_vault(args.vault_file, args.token_var, args.vault_password_file)
if not token:
print(
"FORGEJO_API_TOKEN is required (pass --token, set env FORGEJO_API_TOKEN, or store it in ansible-vault secrets/vault.yml)",
"FORGEJO_API_TOKEN is required (pass --token or set env FORGEJO_API_TOKEN)",
file=sys.stderr,
)
if vault_keys:
print(
f"Vault parsed OK but key '{args.token_var}' not found. Available top-level keys: {', '.join(vault_keys)}",
file=sys.stderr,
)
return 2
owner: Optional[str] = None
@ -368,8 +230,8 @@ def main() -> int:
deregister_secret = f.read()
try:
_put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret)
_put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret)
put_secret_plain(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret)
put_secret_plain(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret)
except requests.exceptions.HTTPError as e:
status = getattr(e.response, "status_code", None)
url = getattr(e.response, "url", "")