#!/usr/bin/env python3 import argparse import base64 import json import os import re import subprocess import sys import tempfile from typing import Optional, Tuple from urllib.parse import urljoin import requests try: import yaml except Exception: # pragma: no cover yaml = None def _require_pynacl(): try: from nacl.public import PublicKey, SealedBox except Exception as e: raise RuntimeError( "PyNaCl is required for secrets encryption. Install with: pip install pynacl" ) from e return PublicKey, SealedBox def _api_headers(token: str) -> dict: # Gitea/Forgejo uses "token " in Authorization. return { "Authorization": f"token {token}", "Accept": "application/json", "Content-Type": "application/json", } def get_repo_public_key(base_url: str, token: str, owner: str, repo: str) -> dict: url = urljoin(base_url.rstrip("/") + "/", f"api/v1/repos/{owner}/{repo}/actions/secrets/public-key") r = requests.get(url, headers=_api_headers(token), timeout=30) # Some Forgejo/Gitea versions do not expose a public-key endpoint. # In that case, secrets are submitted in plaintext (over TLS) and encrypted server-side. if r.status_code in (404, 405): raise NotImplementedError("public-key endpoint not available") r.raise_for_status() data = r.json() # Expected: {"key_id": "...", "key": "base64..."} if "key" not in data or "key_id" not in data: raise RuntimeError(f"Unexpected response from {url}: {data}") return data def _put_repo_secret_auto(base_url: str, token: str, owner: str, repo: str, name: str, value: str) -> None: try: public = get_repo_public_key(base_url, token, owner, repo) encrypted = encrypt_secret(public["key"], value) put_repo_secret(base_url, token, owner, repo, name, encrypted, public["key_id"]) except NotImplementedError: put_repo_secret_plain(base_url, token, owner, repo, name, value) def encrypt_secret(public_key_b64: str, secret_value: str) -> str: PublicKey, SealedBox = _require_pynacl() pk_bytes = base64.b64decode(public_key_b64) pk = PublicKey(pk_bytes) box = SealedBox(pk) encrypted = box.encrypt(secret_value.encode("utf-8")) return base64.b64encode(encrypted).decode("ascii") def _ensure_ssh_keypair(private_key_path: str, comment: str, force: bool) -> None: pub_path = private_key_path + ".pub" if not force and (os.path.exists(private_key_path) or os.path.exists(pub_path)): raise RuntimeError( f"Refusing to overwrite existing key material: {private_key_path} / {pub_path} (use --force-generate-ssh-key)" ) if force: for p in (private_key_path, pub_path): try: if os.path.exists(p): os.unlink(p) except Exception as e: raise RuntimeError(f"Failed to remove existing key material: {p}") from e cmd = [ "ssh-keygen", "-t", "ed25519", "-N", "", "-f", private_key_path, "-C", comment, ] try: subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) except FileNotFoundError: raise RuntimeError("ssh-keygen not found in PATH") except subprocess.CalledProcessError as e: raise RuntimeError("ssh-keygen failed to generate keypair") from e def put_repo_secret(base_url: str, token: str, owner: str, repo: str, name: str, encrypted_value_b64: str, key_id: str) -> None: url = urljoin(base_url.rstrip("/") + "/", f"api/v1/repos/{owner}/{repo}/actions/secrets/{name}") payload = { "encrypted_value": encrypted_value_b64, "key_id": key_id, } r = requests.put(url, headers=_api_headers(token), json=payload, timeout=30) r.raise_for_status() def put_repo_secret_plain(base_url: str, token: str, owner: str, repo: str, name: str, value: str) -> None: url = urljoin(base_url.rstrip("/") + "/", f"api/v1/repos/{owner}/{repo}/actions/secrets/{name}") # Some Forgejo/Gitea instances accept a plaintext secret value and encrypt server-side. payload = {"data": value} r = requests.put(url, headers=_api_headers(token), json=payload, timeout=30) r.raise_for_status() def _read_ansible_vault_yaml(vault_file: str, vault_password_file: Optional[str]) -> dict: if yaml is None: raise RuntimeError("PyYAML is required to read tokens from ansible-vault. Install with: pip install pyyaml") cmd = ["ansible-vault", "view", vault_file] if vault_password_file: cmd.extend(["--vault-password-file", vault_password_file]) try: p = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) except FileNotFoundError: raise RuntimeError("ansible-vault not found in PATH") except subprocess.CalledProcessError as e: # Don't echo stderr; it may contain hints about vault location. raise RuntimeError("Failed to read vault file via ansible-vault") from e data = yaml.safe_load(p.stdout) or {} if not isinstance(data, dict): raise RuntimeError(f"Vault file {vault_file} did not parse to a YAML mapping") return data def _read_ansible_vault_text(vault_file: str, vault_password_file: Optional[str]) -> str: cmd = ["ansible-vault", "view", vault_file] if vault_password_file: cmd.extend(["--vault-password-file", vault_password_file]) try: p = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) except FileNotFoundError: raise RuntimeError("ansible-vault not found in PATH") except subprocess.CalledProcessError as e: raise RuntimeError("Failed to read vault file via ansible-vault") from e return p.stdout def _write_ansible_vault_text(vault_file: str, plaintext: str, vault_password_file: Optional[str]) -> None: with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8", prefix="vault_", suffix=".yml") as tf: tmp_path = tf.name tf.write(plaintext) try: cmd = ["ansible-vault", "encrypt", tmp_path, "--output", vault_file] if vault_password_file: cmd.extend(["--vault-password-file", vault_password_file]) subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) except FileNotFoundError: raise RuntimeError("ansible-vault not found in PATH") except subprocess.CalledProcessError as e: raise RuntimeError("Failed to write vault file via ansible-vault") from e finally: try: os.unlink(tmp_path) except Exception: pass def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vault_password_file: Optional[str]) -> None: plaintext = _read_ansible_vault_text(vault_file, vault_password_file) if plaintext and not plaintext.endswith("\n"): plaintext += "\n" # Quote the value safely (SSH public keys contain spaces). rendered_value = json.dumps(value) new_line = f"{key}: {rendered_value}" pattern = re.compile(rf"^(?P{re.escape(key)})\s*:\s*.*$", re.MULTILINE) if pattern.search(plaintext): plaintext = pattern.sub(new_line, plaintext, count=1) else: if plaintext and not plaintext.endswith("\n"): plaintext += "\n" plaintext += new_line + "\n" _write_ansible_vault_text(vault_file, plaintext, vault_password_file) def _load_token_from_ansible_vault(vault_file: str, token_var: str, vault_password_file: Optional[str]) -> Tuple[Optional[str], list[str]]: data = _read_ansible_vault_yaml(vault_file, vault_password_file) keys = sorted([str(k) for k in data.keys()]) val = data.get(token_var) if isinstance(val, str) and val.strip(): return val.strip(), keys return None, keys def args_parse(): p = argparse.ArgumentParser(description="Create/update a Forgejo Actions secret in a repository.") p.add_argument("--base-url", default=os.environ.get("FORGEJO_BASE_URL", "https://git.jfraeys.com")) p.add_argument("--token", default=os.environ.get("FORGEJO_API_TOKEN")) p.add_argument("--vault-file", default="./secrets/vault.yml") p.add_argument("--vault-password-file", default="./secrets/.vault_pass") p.add_argument("--token-var", default="FORGEJO_API_TOKEN") p.add_argument("--repo", required=True, help="owner/repo") p.add_argument("--force-generate-ssh-key", action="store_true") p.add_argument( "--generate-ssh-keys", action="store_true", help="Generate both register and deregister SSH key pairs.", ) p.add_argument("--register-key-path", default="./secrets/service_ssh_key_register") p.add_argument("--deregister-key-path", default="./secrets/service_ssh_key_deregister") p.add_argument( "--update-vault-both-public-keys", action="store_true", help="Set both SERVICE_SSH_REGISTER_PUBLIC_KEY and SERVICE_SSH_DEREGISTER_PUBLIC_KEY in vault", ) args = p.parse_args() return args def main() -> int: args = args_parse() token = args.token vault_keys: list[str] = [] if not token: token, vault_keys = _load_token_from_ansible_vault(args.vault_file, args.token_var, args.vault_password_file) if not token: print( "FORGEJO_API_TOKEN is required (pass --token, set env FORGEJO_API_TOKEN, or store it in ansible-vault secrets/vault.yml)", file=sys.stderr, ) if vault_keys: print( f"Vault parsed OK but key '{args.token_var}' not found. Available top-level keys: {', '.join(vault_keys)}", file=sys.stderr, ) return 2 if "/" not in args.repo: print("--repo must be in the form owner/repo", file=sys.stderr) return 2 owner, repo = args.repo.split("/", 1) if args.generate_ssh_keys: _ensure_ssh_keypair(args.register_key_path, "actions-register", args.force_generate_ssh_key) _ensure_ssh_keypair(args.deregister_key_path, "actions-deregister", args.force_generate_ssh_key) else: missing: list[str] = [] if not os.path.exists(args.register_key_path): missing.append(args.register_key_path) if not os.path.exists(args.deregister_key_path): missing.append(args.deregister_key_path) if missing: print( "Missing SSH private key(s): " + ", ".join(missing) + ". Use --generate-ssh-keys to create them.", file=sys.stderr, ) return 2 with open(args.register_key_path, "r", encoding="utf-8") as f: register_secret = f.read() with open(args.deregister_key_path, "r", encoding="utf-8") as f: deregister_secret = f.read() _put_repo_secret_auto(args.base_url, token, owner, repo, "SERVICE_SSH_KEY_REGISTER", register_secret) _put_repo_secret_auto(args.base_url, token, owner, repo, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret) if args.update_vault_both_public_keys: with open(args.register_key_path + ".pub", "r", encoding="utf-8") as f: _upsert_ansible_vault_scalar_line( args.vault_file, "SERVICE_SSH_REGISTER_PUBLIC_KEY", f.read().strip(), args.vault_password_file, ) with open(args.deregister_key_path + ".pub", "r", encoding="utf-8") as f: _upsert_ansible_vault_scalar_line( args.vault_file, "SERVICE_SSH_DEREGISTER_PUBLIC_KEY", f.read().strip(), args.vault_password_file, ) print(f"Updated Actions secrets SERVICE_SSH_KEY_REGISTER and SERVICE_SSH_KEY_DEREGISTER for {owner}/{repo}") return 0 if __name__ == "__main__": raise SystemExit(main())