diff --git a/scripts/forgejo_set_actions_secret.py b/scripts/forgejo_set_actions_secret.py index d3a0538..c478763 100644 --- a/scripts/forgejo_set_actions_secret.py +++ b/scripts/forgejo_set_actions_secret.py @@ -1,33 +1,17 @@ #!/usr/bin/env python3 import argparse -import base64 import json import os import re import subprocess import sys import tempfile -from typing import List, Optional, Tuple +from typing import List, Optional from urllib.parse import urljoin import requests -try: - import yaml -except Exception: # pragma: no cover - yaml = None - - -def _require_pynacl(): - try: - from nacl.public import PublicKey, SealedBox - except Exception as e: - raise RuntimeError( - "PyNaCl is required for secrets encryption. Install with: pip install pynacl" - ) from e - return PublicKey, SealedBox - def _api_headers(token: str) -> dict: # Gitea/Forgejo uses "token " in Authorization. @@ -38,21 +22,6 @@ def _api_headers(token: str) -> dict: } -def _public_key_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str]) -> str: - base = base_url.rstrip("/") + "/" - if scope == "repo": - if not owner or not repo: - raise ValueError("owner/repo required for repo scope") - return urljoin(base, f"api/v1/repos/{owner}/{repo}/actions/secrets/public-key") - if scope == "org": - if not org: - raise ValueError("org required for org scope") - return urljoin(base, f"api/v1/orgs/{org}/actions/secrets/public-key") - if scope == "user": - return urljoin(base, "api/v1/user/actions/secrets/public-key") - raise ValueError(f"unsupported scope: {scope}") - - def _secret_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str], name: str) -> str: base = base_url.rstrip("/") + "/" if scope == "repo": @@ -68,52 +37,6 @@ def _secret_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[ raise ValueError(f"unsupported scope: {scope}") -def get_public_key(base_url: str, token: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str]) -> dict: - url = _public_key_url(base_url, scope, owner, repo, org) - r = requests.get(url, headers=_api_headers(token), timeout=30) - - # Some Forgejo/Gitea versions do not expose a public-key endpoint. - # In that case, secrets are submitted in plaintext (over TLS) and encrypted server-side. - if r.status_code in (404, 405): - raise NotImplementedError("public-key endpoint not available") - - r.raise_for_status() - - data = r.json() - # Expected: {"key_id": "...", "key": "base64..."} - if "key" not in data or "key_id" not in data: - raise RuntimeError(f"Unexpected response from {url}: {data}") - return data - - -def _put_secret_auto( - base_url: str, - token: str, - scope: str, - owner: Optional[str], - repo: Optional[str], - org: Optional[str], - name: str, - value: str, -) -> None: - try: - public = get_public_key(base_url, token, scope, owner, repo, org) - encrypted = encrypt_secret(public["key"], value) - put_secret(base_url, token, scope, owner, repo, org, name, encrypted, public["key_id"]) - except NotImplementedError: - put_secret_plain(base_url, token, scope, owner, repo, org, name, value) - - -def encrypt_secret(public_key_b64: str, secret_value: str) -> str: - PublicKey, SealedBox = _require_pynacl() - - pk_bytes = base64.b64decode(public_key_b64) - pk = PublicKey(pk_bytes) - box = SealedBox(pk) - encrypted = box.encrypt(secret_value.encode("utf-8")) - return base64.b64encode(encrypted).decode("ascii") - - def _ensure_ssh_keypair(private_key_path: str, comment: str, force: bool) -> None: pub_path = private_key_path + ".pub" if not force and (os.path.exists(private_key_path) or os.path.exists(pub_path)): @@ -149,26 +72,6 @@ def _ensure_ssh_keypair(private_key_path: str, comment: str, force: bool) -> Non raise RuntimeError("ssh-keygen failed to generate keypair") from e -def put_secret( - base_url: str, - token: str, - scope: str, - owner: Optional[str], - repo: Optional[str], - org: Optional[str], - name: str, - encrypted_value_b64: str, - key_id: str, -) -> None: - url = _secret_url(base_url, scope, owner, repo, org, name) - payload = { - "encrypted_value": encrypted_value_b64, - "key_id": key_id, - } - r = requests.put(url, headers=_api_headers(token), json=payload, timeout=30) - r.raise_for_status() - - def put_secret_plain( base_url: str, token: str, @@ -186,28 +89,6 @@ def put_secret_plain( r.raise_for_status() -def _read_ansible_vault_yaml(vault_file: str, vault_password_file: Optional[str]) -> dict: - if yaml is None: - raise RuntimeError("PyYAML is required to read tokens from ansible-vault. Install with: pip install pyyaml") - - cmd = ["ansible-vault", "view", vault_file] - if vault_password_file: - cmd.extend(["--vault-password-file", vault_password_file]) - - try: - p = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - except FileNotFoundError: - raise RuntimeError("ansible-vault not found in PATH") - except subprocess.CalledProcessError as e: - # Don't echo stderr; it may contain hints about vault location. - raise RuntimeError("Failed to read vault file via ansible-vault") from e - - data = yaml.safe_load(p.stdout) or {} - if not isinstance(data, dict): - raise RuntimeError(f"Vault file {vault_file} did not parse to a YAML mapping") - return data - - def _read_ansible_vault_text(vault_file: str, vault_password_file: Optional[str]) -> str: cmd = ["ansible-vault", "view", vault_file] if vault_password_file: @@ -263,15 +144,6 @@ def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vau _write_ansible_vault_text(vault_file, plaintext, vault_password_file) -def _load_token_from_ansible_vault(vault_file: str, token_var: str, vault_password_file: Optional[str]) -> Tuple[Optional[str], List[str]]: - data = _read_ansible_vault_yaml(vault_file, vault_password_file) - keys = sorted([str(k) for k in data.keys()]) - val = data.get(token_var) - if isinstance(val, str) and val.strip(): - return val.strip(), keys - return None, keys - - def args_parse(): p = argparse.ArgumentParser( description="Create/update Forgejo Actions secrets for app_ssh_access (SSH register/deregister keys).", @@ -284,7 +156,6 @@ def args_parse(): p.add_argument("--token", default=os.environ.get("FORGEJO_API_TOKEN")) p.add_argument("--vault-file", default="./secrets/vault.yml") p.add_argument("--vault-password-file", default="./secrets/.vault_pass") - p.add_argument("--token-var", default="FORGEJO_API_TOKEN") p.add_argument("--scope", choices=["user", "org", "repo"], default="user") p.add_argument("--org", help="Organization name (required when --scope org)") p.add_argument("--repo", help="owner/repo (required when --scope repo)") @@ -309,20 +180,11 @@ def main() -> int: args = args_parse() token = args.token - vault_keys: List[str] = [] - if not token: - token, vault_keys = _load_token_from_ansible_vault(args.vault_file, args.token_var, args.vault_password_file) - if not token: print( - "FORGEJO_API_TOKEN is required (pass --token, set env FORGEJO_API_TOKEN, or store it in ansible-vault secrets/vault.yml)", + "FORGEJO_API_TOKEN is required (pass --token or set env FORGEJO_API_TOKEN)", file=sys.stderr, ) - if vault_keys: - print( - f"Vault parsed OK but key '{args.token_var}' not found. Available top-level keys: {', '.join(vault_keys)}", - file=sys.stderr, - ) return 2 owner: Optional[str] = None @@ -368,8 +230,8 @@ def main() -> int: deregister_secret = f.read() try: - _put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret) - _put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret) + put_secret_plain(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret) + put_secret_plain(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret) except requests.exceptions.HTTPError as e: status = getattr(e.response, "status_code", None) url = getattr(e.response, "url", "")