$VI_MODE = True $AUTO_PUSHD = True $XONSH_AUTOPAIR = True # tab selection: do not execute cmd when pressing enter $COMPLETIONS_CONFIRM = True $_Z_EXCLUDE_DIRS = ['/tmp'] # alias 'xontrib' wraps xontribs_load() # xontrib load vox z from xonsh.xontribs import xontribs_load xontribs_load(['vox', 'z']) def _last_exit_status(): try: exit_status = __xonsh_history__.rtns[-1] return exit_status if exit_status != 0 else None except IndexError: return None $PROMPT_FIELDS['last_exit_status'] = _last_exit_status $SHLVL = int($SHLVL) + 1 if 'SHLVL' in ${...} else 1 $XONSH_STDERR_PREFIX = '{RED}' $XONSH_STDERR_POSTFIX = '{NO_COLOR}' $DYNAMIC_CWD_WIDTH = '30%' $DYNAMIC_CWD_ELISION_CHAR = '…' $PROMPT = ''.join([ '{RED}{last_exit_status:[{}] }', '{BOLD_GREEN}{user}@{hostname} ', '{YELLOW}{cwd} ', '{{BLUE}}{} '.format('{prompt_end}' * $SHLVL), '{NO_COLOR}', ]) $RIGHT_PROMPT = '{gitstatus}{env_name: {}}' $XONSH_APPEND_NEWLINE = True import contextlib import datetime as dt import io import os import re import shutil import stat import subprocess import sys os.umask(stat.S_IWGRP | stat.S_IRWXO) # 027 # default locale # will be used for all non-explicitly set LC_* variables $LANG = 'en_US.UTF-8' # fallback locales # GNU gettext gives preference to LANGUAGE over LC_ALL and LANG # for the purpose of message handling # https://www.gnu.org/software/gettext/manual/html_node/The-LANGUAGE-variable.html # cave: if this list contains 'de(_.*)?' at any (sic!) position # vim 7.4.1689 will switch to german $LANGUAGE = ':'.join(['en_US', 'en']) $LC_COLLATE = 'C.UTF-8' # char classification, case conversion & other char attrs $LC_CTYPE = 'de_AT.UTF-8' # $ locale currency_symbol $LC_MONETARY = 'de_AT.UTF-8' # $ locale -k LC_NUMERIC | head -n 3 # decimal_point="." # thousands_sep="" # grouping=-1 $LC_NUMERIC = 'C.UTF-8' # A4 $LC_PAPER = 'de_AT.UTF-8' USER_BIN_PATH = os.path.join($HOME, '.local', 'bin') if os.path.isdir(USER_BIN_PATH): $PATH.insert(0, USER_BIN_PATH) $PAGER = 'less' $EDITOR = 'vim' # i3-sensible-terminal $TERMINAL = 'termite' if shutil.which('gpgconf'): # required by pinentry-tty when using gpg command # takes approx 100ms in termux on galaxy S7 $GPG_TTY = $(tty) # required by scute $GPG_AGENT_INFO = $(gpgconf --list-dir agent-socket).rstrip() + ':0:1' if not 'SSH_CLIENT' in ${...}: # in gnupg 2.1.13 the location of agents socket changed $SSH_AUTH_SOCK = $(gpgconf --list-dir agent-ssh-socket).rstrip() # wrapper for termite required when launching termite from ranger: $TERMCMD = os.path.join(os.path.dirname(__file__), 'ranger-termite-termcmd') # https://docs.docker.com/engine/security/trust/content_trust/ $DOCKER_CONTENT_TRUST = 1 class DockerImage: def __init__(self, image): import json attrs, = json.loads(subprocess.check_output(['sudo', 'docker', 'image', 'inspect', image]) .decode(sys.stdout.encoding)) self._id = attrs['Id'] self._tags = attrs['RepoTags'] def __repr__(self): return '{}(id={!r}, tags={!r})'.format(type(self).__name__, self._id, self._tags) @classmethod def build(cls, dockerfile_or_path, tag=None): out = io.BytesIO() args = ['sudo', 'docker', 'build'] if tag: args.extend(['--tag', tag]) with StdoutTee(out) as tee: if os.path.exists(dockerfile_or_path): p = subprocess.Popen(args + [dockerfile_or_path], stdin=None, stdout=tee) else: p = subprocess.Popen(args + ['-'], stdin=subprocess.PIPE, stdout=tee) p.stdin.write(dockerfile_or_path.encode()) p.stdin.close() assert p.wait() == 0, 'docker build failed' image_id, = re.search(rb'^Successfully built (\S+)$', out.getvalue(), re.MULTILINE).groups() return cls(image_id.decode(sys.stdout.encoding)) @classmethod def pull(cls, image): out = io.BytesIO() with StdoutTee(out) as tee: subprocess.run(['sudo', 'docker', 'image', 'pull', image], stdout=tee) repo_digest, = re.search(rb'^Digest: (sha\S+:\S+)$', out.getvalue(), re.MULTILINE).groups() return cls('{}@{}'.format(image, repo_digest.decode())) def run(self, args=[], name=None, detach=False, env={}, network=None, publish_ports=[], volumes=[], caps=[]): params = ['sudo', 'docker', 'run', '--rm'] if name: params.extend(['--name', name]) if detach: params.append('--detach') else: params.extend(['--interactive', '--tty']) params.extend([a for k, v in env.items() for a in ['--env', '{}={}'.format(k, v)]]) if network: params.extend(['--network', network]) params.extend([a for v in volumes for a in ['--volume', ':'.join(v)]]) params.extend('--publish=' + ':'.join([str(a) for a in p]) for p in publish_ports) params.extend(['--security-opt=no-new-privileges', '--cap-drop=all']) params.extend(['--cap-add={}'.format(c) for c in caps]) params.append(self._id) params.extend(args) sys.stderr.write('{}\n'.format(shlex_join(params))) subprocess.run(params, check=True) class StdoutTee: def __init__(self, sink): self._sink = sink def __enter__(self): self._read_fd, self._write_fd = os.pipe() import threading self._thread = threading.Thread(target=self._loop) self._thread.start() return self def _loop(self): while True: try: data = os_read_non_blocking(self._read_fd) except OSError: # fd closed return if data: self._sink.write(data) sys.stdout.buffer.write(data) sys.stdout.flush() def fileno(self): return self._write_fd def __exit__(self, exc_type, exc_value, traceback): os.close(self._read_fd) os.close(self._write_fd) self._thread.join() @contextlib.contextmanager def chdir(path): import shlex previous = os.getcwd() try: sys.stderr.write('cd {}\n'.format(shlex.quote(path))) os.chdir(path) yield path finally: os.chdir(previous) sys.stderr.write('cd {}\n'.format(shlex.quote(previous))) def dpkg_listfiles(pkg_name): assert isinstance(pkg_name, str) paths = $(dpkg --listfiles @(pkg_name)).split('\n')[:-1] assert len(paths) > 0, 'pkg {!r} not installed'.format(pkg_name) return paths def dpkg_search(path_search_pattern): assert isinstance(path_search_pattern, str) return re.findall( '^(\S+): (.*)$\n', $(dpkg --search @(path_search_pattern)), flags=re.MULTILINE, ) def dpkg_welse(cmd): pkg_name, cmd_path = dpkg_which(cmd) return dpkg_listfiles(pkg_name) def dpkg_which(cmd): cmd_path = shutil.which(cmd) assert cmd_path, 'cmd {!r} not found'.format(cmd) matches = dpkg_search(cmd_path) assert len(matches) != 0, '{!r} not installed via dpkg'.format(cmd_path) assert len(matches) == 1 return matches[0] @contextlib.contextmanager def encfs_mount(root_dir_path, mount_point_path, extpass=None): mount_arg_patterns = ['encfs', root_dir_path, mount_point_path] if extpass: mount_arg_patterns.extend(['--extpass', shlex_join(extpass)]) with fuse_mount(mount_arg_patterns=mount_arg_patterns, mount_point_path=mount_point_path): yield mount_point_path @contextlib.contextmanager def fuse_mount(mount_arg_patterns, mount_point_path): import shlex mount_args = [a.format(mp=shlex.quote(mount_point_path)) for a in mount_arg_patterns] sys.stderr.write('{}\n'.format(shlex_join(mount_args))) subprocess.check_call(mount_args) try: yield mount_point_path finally: umount_args = ['fusermount', '-u', '-z', mount_point_path] sys.stderr.write('{}\n'.format(shlex_join(umount_args))) subprocess.check_call(umount_args) def gpg_decrypt(path, verify=False): import gpg with gpg.Context() as gpg_ctx: with open(path, 'rb') as f: data, decrypt_result, verify_result = gpg_ctx.decrypt(f, verify=verify) return data def locate(*patterns, match_all=True, ignore_case=True): params = [] if match_all: params.insert(0, '--all') if ignore_case: params.insert(0, '--ignore-case') return $(locate @(params) -- @(patterns)).split('\n')[:-1] def os_read_non_blocking(fd, buffer_size_bytes=8*1024, timeout_seconds=0.1): import select if fd in select.select([fd], [], [], timeout_seconds)[0]: return os.read(fd, buffer_size_bytes) else: return None def shlex_join(params): import shlex assert isinstance(params, list) or isinstance(params, tuple), params return ' '.join(shlex.quote(p) for p in params) def timestamp_now_utc(): return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) def timestamp_now_local(): # if called without tz argument astimezone() assumes # the system local timezone for the target timezone return timestamp_now_utc().astimezone() def timestamp_iso_local(): # if called without tz argument astimezone() assumes # the system local timezone for the target timezone return timestamp_now_local().strftime('%Y%m%dT%H%M%S%z') def yaml_load(path): import yaml with open(path, 'r') as f: return yaml.load(f.read()) def yaml_write(path, data): import yaml with open(path, 'w') as f: f.write(yaml.dump(data, default_flow_style=False)) aliases['d'] = ['sudo', 'docker'] aliases['dpkg-welse'] = lambda args: '\n'.join(dpkg_welse(args[0])) aliases['dpkg-which'] = lambda args: '\t'.join(dpkg_which(args[0])) aliases['g'] = ['git'] aliases['ll'] = ['ls', '-l', '--all', '--indicator-style=slash', '--human-readable', '--time-style=long-iso', '--color=auto'] if shutil.which('startx') and $(tty).rstrip() == '/dev/tty1': startx # vim: filetype=python