.local-bin/scripts/aactivator.py
2025-06-21 13:39:18 -04:00

358 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""\
Usage: eval "$(aactivator init)"
aactivator is a script for automatically sourcing environments in an interactive shell.
The interface for using this is two files:
- .activate.sh: when sourced, this file activates your environment
- .deactivate.sh: when sourced, this file deactivates your environment
A typical setup in a python project:
$ ln -vs venv/bin/activate .activate.sh
$ echo deactivate > .deactivate.sh
If an environment is already active it will not be re-activated.
If a different project is activated, the previous project will be deactivated beforehand.
aactivator will ask before automatically sourcing environments, and optionally
remember your answer. You can later adjust your per-project preferences in the
~/.cache/aactivator/ directory.
see also: https://github.com/Yelp/aactivator
"""
from __future__ import absolute_import, print_function, unicode_literals
import io
import os.path
import stat
import sys
from os.path import relpath
from shlex import quote
ENVIRONMENT_VARIABLE = "AACTIVATOR_ACTIVE"
ACTIVATE = ".activate.sh"
DEACTIVATE = ".deactivate.sh"
__version__ = "2.0.0"
def init(arg0):
arg0 = os.path.realpath(arg0)
cmd = 'if [ -x {exe} ]; then eval "`{exe}`"; fi'.format(exe=arg0)
return """\
export AACTIVATOR_VERSION={version}
alias aactivator={arg0}
unset {varname}
if [ "$ZSH_VERSION" ]; then
precmd_aactivator() {{ {cmd}; }}
if ! [ "${{precmd_functions[(r)precmd_aactivator]}}" ]; then
precmd_functions=(precmd_aactivator $precmd_functions)
fi
else
if ! ( echo "$PROMPT_COMMAND" | grep -Fq '{cmd}' ); then
PROMPT_COMMAND='{cmd}; '"$PROMPT_COMMAND"
fi
fi""".format(version=__version__, arg0=arg0, cmd=cmd, varname=ENVIRONMENT_VARIABLE)
def get_filesystem_id(path):
try:
return os.stat(path).st_dev
except OSError as error:
if error.errno == 2: # no such file
return None
else:
raise
def insecure_inode(path):
"""This particular inode can be altered by someone other than the owner"""
pathstat = os.stat(path).st_mode
# Directories with a sticky bit are always acceptable.
if os.path.isdir(path) and pathstat & stat.S_ISVTX:
return False
# The path is writable by someone who is not us.
elif pathstat & (stat.S_IWGRP | stat.S_IWOTH):
return True
else:
return False
def first(iterable, predicate):
for x in iterable:
if predicate(x):
return x
def insecure(path):
"""Find an insecure path, at or above this one"""
return first(search_parent_paths(path), insecure_inode)
def search_parent_paths(path):
path = os.path.abspath(path)
original_fs_id = fs_id = get_filesystem_id(path)
previous_path = None
while original_fs_id == fs_id and path != previous_path:
yield path
previous_path = path
path = os.path.dirname(path)
fs_id = get_filesystem_id(path)
def error_command(message):
return "echo %s >&2" % quote("aactivator: " + message)
def mkdirp(path):
try:
os.makedirs(path)
except OSError:
if os.path.isdir(path):
return
else:
raise
def _get_lines_if_there(path):
if os.path.exists(path):
return io.open(path).read().splitlines()
else:
return []
class ConfigFile(object):
def __init__(self, directory, name):
self.path = os.path.join(directory, name)
self.lines = frozenset(_get_lines_if_there(self.path))
def write(self, mode, value):
mkdirp(os.path.dirname(self.path))
with io.open(self.path, mode) as file_obj:
file_obj.write(value)
def append(self, value):
self.write("a", value + "\n")
def path_is_under(path, under):
relpath = os.path.relpath(path, under).split("/")
return not relpath[:1] == [".."]
def user_cache_dir(env):
# stolen from pip.utils.appdirs.user_cache_dir
# expanduser doesn't take an env argument -.-
from os.path import expanduser
orig, os.environ = os.environ, env
try:
return expanduser(env.get("XDG_CACHE_HOME", "~/.cache"))
finally:
os.environ = orig
class ActivateConfig(object):
def __init__(self, env, get_input):
self.env = env
self.get_input = get_input
self.path = os.path.join(user_cache_dir(self.env), "aactivator")
self.allowed = ConfigFile(self.path, "allowed")
self.not_now = ConfigFile(self.path, "not-now")
self.disallowed = ConfigFile(self.path, "disallowed")
def refresh_not_now(self, pwd):
result = []
for path in self.not_now.lines:
dirname = os.path.dirname(path)
if path_is_under(pwd, dirname):
result.append(path)
self.not_now.write("w", "\n".join(result))
def _prompt_user(self, path):
print(
"aactivator will source {0} and {1} at {2}.".format(
ACTIVATE,
DEACTIVATE,
path,
),
file=sys.stderr,
)
while True:
print("Acceptable? (y)es (n)o (N)ever: ", file=sys.stderr, end="")
sys.stderr.flush()
try:
response = self.get_input()
# Allow ^D to be "no"
except EOFError:
response = "n"
if response.startswith("N"):
self.disallowed.append(path)
print(
"aactivator will remember this: ~/.cache/aactivator/disallowed",
file=sys.stderr,
)
return False
response = response.lower()
if response.startswith("n"):
self.not_now.append(path)
return False
elif response.startswith("y"):
self.allowed.append(path)
print(
"aactivator will remember this: ~/.cache/aactivator/allowed",
file=sys.stderr,
)
return True
else:
print("I didn't understand your response.", file=sys.stderr)
print(file=sys.stderr)
def find_allowed(self, path):
self.refresh_not_now(path)
return first(search_parent_paths(path), self.is_allowed)
def is_allowed(self, path, _getuid=os.getuid):
activate = os.path.join(path, ACTIVATE)
if not os.path.exists(activate):
return False
elif os.stat(activate).st_uid != _getuid():
# If we do not own this path, short circuit on activating
return False
elif path in self.disallowed.lines or path in self.not_now.lines:
return False
elif path in self.allowed.lines:
return True
else:
return self._prompt_user(path)
def security_check(path):
if not os.path.exists(path):
return "aactivator: File does not exist: " + path
insecure_path = insecure(path)
if insecure_path is not None:
return "aactivator: Cowardly refusing to source {0} because writeable by others: {1}".format(
relpath(path), relpath(insecure_path)
)
def command_for_path(cmd, path, pwd):
if path == pwd:
return cmd
else:
return " &&\n".join(
(
'OLDPWD_bak="$OLDPWD"',
"cd " + quote(path),
cmd,
'cd "$OLDPWD_bak"',
"cd " + quote(pwd),
"unset OLDPWD_bak",
)
)
def aactivate(path, pwd):
return command_for_path(
" &&\n".join(
(
"aactivator security-check " + ACTIVATE,
"source ./" + ACTIVATE,
"export %s=%s" % (ENVIRONMENT_VARIABLE, quote(path)),
)
),
path,
pwd,
)
def deaactivate(path, pwd):
unset = "unset " + ENVIRONMENT_VARIABLE
deactivate_path = os.path.join(path, DEACTIVATE)
if os.path.exists(deactivate_path):
return command_for_path(
" &&\n".join(
(
"aactivator security-check " + DEACTIVATE,
"source ./" + DEACTIVATE,
)
)
+ "\n"
+ unset,
path,
pwd,
)
else:
return " &&\n".join(
(
unset,
error_command(
"Cannot deactivate. File missing: {0}".format(deactivate_path)
),
)
)
def get_output(
environ, pwd=".", get_input=sys.stdin.readline, arg0="/path/to/aactivator"
):
try:
pwd = os.path.realpath(pwd)
except OSError as error:
if error.errno == 2: # no such file
return ""
else:
raise
config = ActivateConfig(environ, get_input)
activate_path = config.find_allowed(pwd)
result = []
if environ.get("AACTIVATOR_VERSION") == __version__:
activated_env = environ.get(ENVIRONMENT_VARIABLE)
else:
result.append(init(arg0))
activated_env = None
if (
activated_env != activate_path
): # did we already activate the current environment?
if activated_env: # deactivate it
result.append(deaactivate(activated_env, pwd))
if activate_path:
result.append(aactivate(activate_path, pwd))
return " &&\n".join(result)
def aactivator(args, env):
if len(args) == 1:
return get_output(env, arg0=args[0])
elif len(args) == 2 and args[1] == "init":
return init(args[0])
elif len(args) == 3 and args[1] == "security-check":
exit(security_check(args[2]))
else:
return __doc__ + "\nVersion: " + __version__
def main():
try:
print(aactivator(tuple(sys.argv), os.environ.copy()))
except KeyboardInterrupt: # pragma: no cover
# Silence ^C
pass
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
# Silence ^C
pass