rc.xsh 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import contextlib
  2. import datetime as dt
  3. import io
  4. import os
  5. import re
  6. import shutil
  7. import stat
  8. import subprocess
  9. import sys
  10. TERMUX=shutil.which('termux-info') is not None
  11. $VI_MODE = True
  12. $AUTO_PUSHD = True
  13. $XONSH_AUTOPAIR = True
  14. # tab selection: do not execute cmd when pressing enter
  15. $COMPLETIONS_CONFIRM = True
  16. $_Z_EXCLUDE_DIRS = ['/tmp']
  17. # alias 'xontrib' wraps xontribs_load()
  18. # xontrib load vox z
  19. from xonsh.xontribs import xontribs_load
  20. xontribs_load(['vox', 'z'])
  21. def _last_exit_status():
  22. try:
  23. exit_status = __xonsh__.history.rtns[-1]
  24. return exit_status if exit_status != 0 else None
  25. except IndexError:
  26. return None
  27. $PROMPT_FIELDS['last_exit_status'] = _last_exit_status
  28. $SHLVL = int($SHLVL) + 1 if 'SHLVL' in ${...} else 1
  29. $XONSH_STDERR_PREFIX = '{RED}'
  30. $XONSH_STDERR_POSTFIX = '{NO_COLOR}'
  31. $DYNAMIC_CWD_WIDTH = '30%'
  32. $DYNAMIC_CWD_ELISION_CHAR = '…'
  33. $PROMPT = ''.join([
  34. '{RED}{last_exit_status:[{}] }',
  35. '{BOLD_GREEN}{user}@{hostname} ' if not TERMUX or 'SSH_CLIENT' in ${...} else '',
  36. '{YELLOW}{cwd} ',
  37. '{{BLUE}}{} '.format('{prompt_end}' * $SHLVL),
  38. '{NO_COLOR}',
  39. ])
  40. $RIGHT_PROMPT = '{gitstatus}{env_name: {}}'
  41. $XONSH_APPEND_NEWLINE = True
  42. os.umask(stat.S_IWGRP | stat.S_IRWXO) # 027
  43. # default locale
  44. # will be used for all non-explicitly set LC_* variables
  45. $LANG = 'en_US.UTF-8'
  46. # fallback locales
  47. # GNU gettext gives preference to LANGUAGE over LC_ALL and LANG
  48. # for the purpose of message handling
  49. # https://www.gnu.org/software/gettext/manual/html_node/The-LANGUAGE-variable.html
  50. # cave: if this list contains 'de(_.*)?' at any (sic!) position
  51. # vim 7.4.1689 will switch to german
  52. $LANGUAGE = ':'.join(['en_US', 'en'])
  53. $LC_COLLATE = 'C.UTF-8'
  54. # char classification, case conversion & other char attrs
  55. if not TERMUX: $LC_CTYPE = 'de_AT.UTF-8'
  56. # $ locale currency_symbol
  57. if not TERMUX: $LC_MONETARY = 'de_AT.UTF-8'
  58. # $ locale -k LC_NUMERIC | head -n 3
  59. # decimal_point="."
  60. # thousands_sep=""
  61. # grouping=-1
  62. $LC_NUMERIC = 'C.UTF-8'
  63. # A4
  64. $LC_PAPER = 'de_AT.UTF-8'
  65. USER_BIN_PATH = os.path.join($HOME, '.local', 'bin')
  66. if os.path.isdir(USER_BIN_PATH):
  67. $PATH.insert(0, USER_BIN_PATH)
  68. $PAGER = 'less'
  69. $EDITOR = 'vim'
  70. # i3-sensible-terminal
  71. $TERMINAL = 'termite'
  72. if shutil.which('gpgconf'):
  73. # required by pinentry-tty when using gpg command
  74. # takes approx 100ms in termux on galaxy S7
  75. $GPG_TTY = $(tty)
  76. # required by scute
  77. $GPG_AGENT_INFO = $(gpgconf --list-dir agent-socket).rstrip() + ':0:1'
  78. if not 'SSH_CLIENT' in ${...}:
  79. # in gnupg 2.1.13 the location of agents socket changed
  80. $SSH_AUTH_SOCK = $(gpgconf --list-dir agent-ssh-socket).rstrip()
  81. # wrapper for termite required when launching termite from ranger:
  82. $TERMCMD = os.path.join(os.path.dirname(__file__), 'ranger-termite-termcmd')
  83. # https://docs.docker.com/engine/security/trust/content_trust/
  84. $DOCKER_CONTENT_TRUST = 1
  85. $PASSWORD_STORE_UMASK = '027'
  86. $PASSWORD_STORE_CHARACTER_SET = '1-9a-km-zA-HJKLMNPR-Z*+!&#@%.\-_'
  87. class DockerImage:
  88. def __init__(self, image):
  89. import json
  90. attrs, = json.loads(subprocess.check_output(['sudo', 'docker', 'image', 'inspect', image])
  91. .decode(sys.stdout.encoding))
  92. self._id = attrs['Id']
  93. self._tags = attrs['RepoTags']
  94. def __repr__(self):
  95. return '{}(id={!r}, tags={!r})'.format(type(self).__name__, self._id, self._tags)
  96. @classmethod
  97. def build(cls, dockerfile_or_path, tag=None):
  98. out = io.BytesIO()
  99. args = ['sudo', 'docker', 'build']
  100. if tag:
  101. args.extend(['--tag', tag])
  102. with StdoutTee(out) as tee:
  103. if os.path.exists(dockerfile_or_path):
  104. p = subprocess.Popen(args + [dockerfile_or_path],
  105. stdin=None, stdout=tee)
  106. else:
  107. p = subprocess.Popen(args + ['-'],
  108. stdin=subprocess.PIPE, stdout=tee)
  109. p.stdin.write(dockerfile_or_path.encode())
  110. p.stdin.close()
  111. assert p.wait() == 0, 'docker build failed'
  112. image_id, = re.search(rb'^Successfully built (\S+)$', out.getvalue(), re.MULTILINE).groups()
  113. return cls(image_id.decode(sys.stdout.encoding))
  114. @classmethod
  115. def pull(cls, image):
  116. out = io.BytesIO()
  117. with StdoutTee(out) as tee:
  118. subprocess.run(['sudo', 'docker', 'image', 'pull', image], stdout=tee)
  119. repo_digest, = re.search(rb'^Digest: (sha\S+:\S+)$', out.getvalue(), re.MULTILINE).groups()
  120. return cls('{}@{}'.format(image, repo_digest.decode()))
  121. def run(self, args=[], name=None, detach=False, env={},
  122. network=None, publish_ports=[], volumes=[], caps=[]):
  123. params = ['sudo', 'docker', 'run', '--rm']
  124. if name:
  125. params.extend(['--name', name])
  126. if detach:
  127. params.append('--detach')
  128. else:
  129. params.extend(['--interactive', '--tty'])
  130. params.extend([a for k, v in env.items() for a in ['--env', '{}={}'.format(k, v)]])
  131. if network:
  132. params.extend(['--network', network])
  133. params.extend([a for v in volumes for a in ['--volume', ':'.join(v)]])
  134. params.extend('--publish=' + ':'.join([str(a) for a in p]) for p in publish_ports)
  135. params.extend(['--security-opt=no-new-privileges', '--cap-drop=all'])
  136. params.extend(['--cap-add={}'.format(c) for c in caps])
  137. params.append(self._id)
  138. params.extend(args)
  139. sys.stderr.write('{}\n'.format(shlex_join(params)))
  140. subprocess.run(params, check=True)
  141. class StdoutTee:
  142. def __init__(self, sink):
  143. self._sink = sink
  144. def __enter__(self):
  145. self._read_fd, self._write_fd = os.pipe()
  146. import threading
  147. self._thread = threading.Thread(target=self._loop)
  148. self._thread.start()
  149. return self
  150. def _loop(self):
  151. while True:
  152. try:
  153. data = os_read_non_blocking(self._read_fd)
  154. except OSError: # fd closed
  155. return
  156. if data:
  157. self._sink.write(data)
  158. sys.stdout.buffer.write(data)
  159. sys.stdout.flush()
  160. def fileno(self):
  161. return self._write_fd
  162. def __exit__(self, exc_type, exc_value, traceback):
  163. os.close(self._read_fd)
  164. os.close(self._write_fd)
  165. self._thread.join()
  166. @contextlib.contextmanager
  167. def chdir(path):
  168. import shlex
  169. previous = os.getcwd()
  170. try:
  171. sys.stderr.write('cd {}\n'.format(shlex.quote(path)))
  172. os.chdir(path)
  173. yield path
  174. finally:
  175. os.chdir(previous)
  176. sys.stderr.write('cd {}\n'.format(shlex.quote(previous)))
  177. def dpkg_listfiles(pkg_name):
  178. assert isinstance(pkg_name, str)
  179. paths = $(dpkg --listfiles @(pkg_name)).split('\n')[:-1]
  180. assert len(paths) > 0, 'pkg {!r} not installed'.format(pkg_name)
  181. return paths
  182. def dpkg_search(path_search_pattern):
  183. assert isinstance(path_search_pattern, str)
  184. return re.findall(
  185. '^(\S+): (.*)$\n',
  186. $(dpkg --search @(path_search_pattern)),
  187. flags=re.MULTILINE,
  188. )
  189. def dpkg_welse(cmd):
  190. pkg_name, cmd_path = dpkg_which(cmd)
  191. return dpkg_listfiles(pkg_name)
  192. def dpkg_which(cmd):
  193. cmd_path = shutil.which(cmd)
  194. assert cmd_path, 'cmd {!r} not found'.format(cmd)
  195. matches = dpkg_search(cmd_path)
  196. assert len(matches) != 0, '{!r} not installed via dpkg'.format(cmd_path)
  197. assert len(matches) == 1
  198. return matches[0]
  199. @contextlib.contextmanager
  200. def encfs_mount(root_dir_path, mount_point_path, extpass=None):
  201. mount_arg_patterns = ['encfs', root_dir_path, mount_point_path]
  202. if extpass:
  203. mount_arg_patterns.extend(['--extpass', shlex_join(extpass)])
  204. with fuse_mount(mount_arg_patterns=mount_arg_patterns,
  205. mount_point_path=mount_point_path):
  206. yield mount_point_path
  207. @contextlib.contextmanager
  208. def fuse_mount(mount_arg_patterns, mount_point_path):
  209. import shlex
  210. mount_args = [a.format(mp=shlex.quote(mount_point_path))
  211. for a in mount_arg_patterns]
  212. sys.stderr.write('{}\n'.format(shlex_join(mount_args)))
  213. subprocess.check_call(mount_args)
  214. try:
  215. yield mount_point_path
  216. finally:
  217. umount_args = ['fusermount', '-u', '-z', mount_point_path]
  218. sys.stderr.write('{}\n'.format(shlex_join(umount_args)))
  219. subprocess.check_call(umount_args)
  220. def gpg_decrypt(path, verify=False):
  221. import gpg
  222. with gpg.Context() as gpg_ctx:
  223. with open(path, 'rb') as f:
  224. data, decrypt_result, verify_result = gpg_ctx.decrypt(f, verify=verify)
  225. return data
  226. def locate(*patterns, match_all=True, ignore_case=True):
  227. params = []
  228. if match_all:
  229. params.insert(0, '--all')
  230. if ignore_case:
  231. params.insert(0, '--ignore-case')
  232. return $(locate @(params) -- @(patterns)).split('\n')[:-1]
  233. def os_read_non_blocking(fd, buffer_size_bytes=8*1024, timeout_seconds=0.1):
  234. import select
  235. if fd in select.select([fd], [], [], timeout_seconds)[0]:
  236. return os.read(fd, buffer_size_bytes)
  237. else:
  238. return None
  239. def shlex_join(params):
  240. import shlex
  241. assert isinstance(params, list) or isinstance(params, tuple), params
  242. return ' '.join(shlex.quote(p) for p in params)
  243. def timestamp_now_utc():
  244. return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
  245. def timestamp_now_local():
  246. # if called without tz argument astimezone() assumes
  247. # the system local timezone for the target timezone
  248. return timestamp_now_utc().astimezone()
  249. def timestamp_iso_local():
  250. # if called without tz argument astimezone() assumes
  251. # the system local timezone for the target timezone
  252. return timestamp_now_local().strftime('%Y%m%dT%H%M%S%z')
  253. def yaml_load(path):
  254. import yaml
  255. with open(path, 'r') as f:
  256. return yaml.load(f.read())
  257. def yaml_write(path, data):
  258. import yaml
  259. with open(path, 'w') as f:
  260. f.write(yaml.dump(data, default_flow_style=False))
  261. aliases['d'] = ['sudo', 'docker']
  262. aliases['dpkg-welse'] = lambda args: '\n'.join(dpkg_welse(args[0]))
  263. aliases['dpkg-which'] = lambda args: '\t'.join(dpkg_which(args[0]))
  264. aliases['g'] = ['git']
  265. aliases['j'] = ['journalctl', '--output=short-iso']
  266. aliases['less'] = ['less', '--jump-target=.2']
  267. aliases['ll'] = ['ls', '-l', '--all', '--indicator-style=slash',
  268. '--human-readable', '--time-style=long-iso', '--color=auto']
  269. aliases['p'] = ['pass']
  270. if shutil.which('startx') and $(tty).rstrip() == '/dev/tty1':
  271. startx
  272. # vim: filetype=python