fix(scripts): python3.9 compatibility + better Forgejo secret errors

- Replace PEP604 unions with typing.Optional for broader Python compatibility
- Print actionable guidance when user/org-scoped secret API calls return 403
This commit is contained in:
Jeremie Fraeys 2026-01-21 23:09:44 -05:00
parent 35796b1069
commit 0814900598
No known key found for this signature in database

View file

@ -8,6 +8,7 @@ import re
import subprocess
import sys
import tempfile
from typing import List, Optional, Tuple
from urllib.parse import urljoin
import requests
@ -37,7 +38,7 @@ def _api_headers(token: str) -> dict:
}
def _public_key_url(base_url: str, scope: str, owner: str | None, repo: str | None, org: str | None) -> str:
def _public_key_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str]) -> str:
base = base_url.rstrip("/") + "/"
if scope == "repo":
if not owner or not repo:
@ -52,7 +53,7 @@ def _public_key_url(base_url: str, scope: str, owner: str | None, repo: str | No
raise ValueError(f"unsupported scope: {scope}")
def _secret_url(base_url: str, scope: str, owner: str | None, repo: str | None, org: str | None, name: str) -> str:
def _secret_url(base_url: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str], name: str) -> str:
base = base_url.rstrip("/") + "/"
if scope == "repo":
if not owner or not repo:
@ -67,7 +68,7 @@ def _secret_url(base_url: str, scope: str, owner: str | None, repo: str | None,
raise ValueError(f"unsupported scope: {scope}")
def get_public_key(base_url: str, token: str, scope: str, owner: str | None, repo: str | None, org: str | None) -> dict:
def get_public_key(base_url: str, token: str, scope: str, owner: Optional[str], repo: Optional[str], org: Optional[str]) -> dict:
url = _public_key_url(base_url, scope, owner, repo, org)
r = requests.get(url, headers=_api_headers(token), timeout=30)
@ -89,9 +90,9 @@ def _put_secret_auto(
base_url: str,
token: str,
scope: str,
owner: str | None,
repo: str | None,
org: str | None,
owner: Optional[str],
repo: Optional[str],
org: Optional[str],
name: str,
value: str,
) -> None:
@ -152,9 +153,9 @@ def put_secret(
base_url: str,
token: str,
scope: str,
owner: str | None,
repo: str | None,
org: str | None,
owner: Optional[str],
repo: Optional[str],
org: Optional[str],
name: str,
encrypted_value_b64: str,
key_id: str,
@ -172,9 +173,9 @@ def put_secret_plain(
base_url: str,
token: str,
scope: str,
owner: str | None,
repo: str | None,
org: str | None,
owner: Optional[str],
repo: Optional[str],
org: Optional[str],
name: str,
value: str,
) -> None:
@ -185,7 +186,7 @@ def put_secret_plain(
r.raise_for_status()
def _read_ansible_vault_yaml(vault_file: str, vault_password_file: str | None) -> dict:
def _read_ansible_vault_yaml(vault_file: str, vault_password_file: Optional[str]) -> dict:
if yaml is None:
raise RuntimeError("PyYAML is required to read tokens from ansible-vault. Install with: pip install pyyaml")
@ -207,7 +208,7 @@ def _read_ansible_vault_yaml(vault_file: str, vault_password_file: str | None) -
return data
def _read_ansible_vault_text(vault_file: str, vault_password_file: str | None) -> str:
def _read_ansible_vault_text(vault_file: str, vault_password_file: Optional[str]) -> str:
cmd = ["ansible-vault", "view", vault_file]
if vault_password_file:
cmd.extend(["--vault-password-file", vault_password_file])
@ -221,7 +222,7 @@ def _read_ansible_vault_text(vault_file: str, vault_password_file: str | None) -
return p.stdout
def _write_ansible_vault_text(vault_file: str, plaintext: str, vault_password_file: str | None) -> None:
def _write_ansible_vault_text(vault_file: str, plaintext: str, vault_password_file: Optional[str]) -> None:
with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8", prefix="vault_", suffix=".yml") as tf:
tmp_path = tf.name
tf.write(plaintext)
@ -242,7 +243,7 @@ def _write_ansible_vault_text(vault_file: str, plaintext: str, vault_password_fi
pass
def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vault_password_file: str | None) -> None:
def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vault_password_file: Optional[str]) -> None:
plaintext = _read_ansible_vault_text(vault_file, vault_password_file)
if plaintext and not plaintext.endswith("\n"):
plaintext += "\n"
@ -262,7 +263,7 @@ def _upsert_ansible_vault_scalar_line(vault_file: str, key: str, value: str, vau
_write_ansible_vault_text(vault_file, plaintext, vault_password_file)
def _load_token_from_ansible_vault(vault_file: str, token_var: str, vault_password_file: str | None) -> tuple[str | None, list[str]]:
def _load_token_from_ansible_vault(vault_file: str, token_var: str, vault_password_file: Optional[str]) -> Tuple[Optional[str], List[str]]:
data = _read_ansible_vault_yaml(vault_file, vault_password_file)
keys = sorted([str(k) for k in data.keys()])
val = data.get(token_var)
@ -308,7 +309,7 @@ def main() -> int:
args = args_parse()
token = args.token
vault_keys: list[str] = []
vault_keys: List[str] = []
if not token:
token, vault_keys = _load_token_from_ansible_vault(args.vault_file, args.token_var, args.vault_password_file)
@ -324,9 +325,9 @@ def main() -> int:
)
return 2
owner: str | None = None
repo: str | None = None
org: str | None = None
owner: Optional[str] = None
repo: Optional[str] = None
org: Optional[str] = None
if args.scope == "repo":
if not args.repo or "/" not in args.repo:
@ -366,8 +367,35 @@ def main() -> int:
with open(args.deregister_key_path, "r", encoding="utf-8") as f:
deregister_secret = f.read()
_put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret)
_put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret)
try:
_put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_REGISTER", register_secret)
_put_secret_auto(args.base_url, token, args.scope, owner, repo, org, "SERVICE_SSH_KEY_DEREGISTER", deregister_secret)
except requests.exceptions.HTTPError as e:
status = getattr(e.response, "status_code", None)
url = getattr(e.response, "url", "")
body = ""
try:
body = (e.response.text or "").strip()
except Exception:
pass
if status == 403 and args.scope in ("user", "org"):
print(
f"ERROR: Forgejo API returned 403 Forbidden when setting {args.scope}-scoped Actions secrets: {url}",
file=sys.stderr,
)
if body:
print(f"Response body: {body}", file=sys.stderr)
print(
"This usually means your Forgejo personal access token does not have sufficient permissions, "
"or your Forgejo instance does not allow managing user/org Actions secrets via the API.\n"
"Workarounds:\n"
"- Use the web UI to add secrets at /user/settings/actions/secrets (or /org/<org>/settings/actions/secrets)\n"
"- Or use repo-scoped secrets with: --scope repo --repo owner/repo\n",
file=sys.stderr,
)
raise
raise
if args.update_vault_both_public_keys:
with open(args.register_key_path + ".pub", "r", encoding="utf-8") as f: