rc.xsh 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import contextlib
  2. import datetime as dt
  3. import io
  4. import os
  5. import re
  6. import shutil
  7. import stat
  8. import subprocess
  9. import sys
  10. TERMUX=shutil.which('termux-info') is not None
  11. $VI_MODE = True
  12. $AUTO_PUSHD = True
  13. $XONSH_AUTOPAIR = True
  14. # tab selection: do not execute cmd when pressing enter
  15. $COMPLETIONS_CONFIRM = True
  16. $_Z_EXCLUDE_DIRS = ['/tmp']
  17. # alias 'xontrib' wraps xontribs_load()
  18. # xontrib load vox z
  19. from xonsh.xontribs import xontribs_load
  20. xontribs_load(['vox', 'z'])
  21. def _last_exit_status():
  22. try:
  23. exit_status = __xonsh_history__.rtns[-1]
  24. return exit_status if exit_status != 0 else None
  25. except IndexError:
  26. return None
  27. $PROMPT_FIELDS['last_exit_status'] = _last_exit_status
  28. $SHLVL = int($SHLVL) + 1 if 'SHLVL' in ${...} else 1
  29. $XONSH_STDERR_PREFIX = '{RED}'
  30. $XONSH_STDERR_POSTFIX = '{NO_COLOR}'
  31. $DYNAMIC_CWD_WIDTH = '30%'
  32. $DYNAMIC_CWD_ELISION_CHAR = '…'
  33. $PROMPT = ''.join([
  34. '{RED}{last_exit_status:[{}] }',
  35. '{BOLD_GREEN}{user}@{hostname} ' if not TERMUX or 'SSH_CLIENT' in ${...} else '',
  36. '{YELLOW}{cwd} ',
  37. '{{BLUE}}{} '.format('{prompt_end}' * $SHLVL),
  38. '{NO_COLOR}',
  39. ])
  40. $RIGHT_PROMPT = '{gitstatus}{env_name: {}}'
  41. $XONSH_APPEND_NEWLINE = True
  42. os.umask(stat.S_IWGRP | stat.S_IRWXO) # 027
  43. # default locale
  44. # will be used for all non-explicitly set LC_* variables
  45. $LANG = 'en_US.UTF-8'
  46. # fallback locales
  47. # GNU gettext gives preference to LANGUAGE over LC_ALL and LANG
  48. # for the purpose of message handling
  49. # https://www.gnu.org/software/gettext/manual/html_node/The-LANGUAGE-variable.html
  50. # cave: if this list contains 'de(_.*)?' at any (sic!) position
  51. # vim 7.4.1689 will switch to german
  52. $LANGUAGE = ':'.join(['en_US', 'en'])
  53. $LC_COLLATE = 'C.UTF-8'
  54. # char classification, case conversion & other char attrs
  55. if not TERMUX: $LC_CTYPE = 'de_AT.UTF-8'
  56. # $ locale currency_symbol
  57. if not TERMUX: $LC_MONETARY = 'de_AT.UTF-8'
  58. # $ locale -k LC_NUMERIC | head -n 3
  59. # decimal_point="."
  60. # thousands_sep=""
  61. # grouping=-1
  62. $LC_NUMERIC = 'C.UTF-8'
  63. # A4
  64. $LC_PAPER = 'de_AT.UTF-8'
  65. USER_BIN_PATH = os.path.join($HOME, '.local', 'bin')
  66. if os.path.isdir(USER_BIN_PATH):
  67. $PATH.insert(0, USER_BIN_PATH)
  68. $PAGER = 'less'
  69. $EDITOR = 'vim'
  70. # i3-sensible-terminal
  71. $TERMINAL = 'termite'
  72. if shutil.which('gpgconf'):
  73. # required by pinentry-tty when using gpg command
  74. # takes approx 100ms in termux on galaxy S7
  75. $GPG_TTY = $(tty)
  76. # required by scute
  77. $GPG_AGENT_INFO = $(gpgconf --list-dir agent-socket).rstrip() + ':0:1'
  78. if not 'SSH_CLIENT' in ${...}:
  79. # in gnupg 2.1.13 the location of agents socket changed
  80. $SSH_AUTH_SOCK = $(gpgconf --list-dir agent-ssh-socket).rstrip()
  81. # wrapper for termite required when launching termite from ranger:
  82. $TERMCMD = os.path.join(os.path.dirname(__file__), 'ranger-termite-termcmd')
  83. # https://docs.docker.com/engine/security/trust/content_trust/
  84. $DOCKER_CONTENT_TRUST = 1
  85. class DockerImage:
  86. def __init__(self, image):
  87. import json
  88. attrs, = json.loads(subprocess.check_output(['sudo', 'docker', 'image', 'inspect', image])
  89. .decode(sys.stdout.encoding))
  90. self._id = attrs['Id']
  91. self._tags = attrs['RepoTags']
  92. def __repr__(self):
  93. return '{}(id={!r}, tags={!r})'.format(type(self).__name__, self._id, self._tags)
  94. @classmethod
  95. def build(cls, dockerfile_or_path, tag=None):
  96. out = io.BytesIO()
  97. args = ['sudo', 'docker', 'build']
  98. if tag:
  99. args.extend(['--tag', tag])
  100. with StdoutTee(out) as tee:
  101. if os.path.exists(dockerfile_or_path):
  102. p = subprocess.Popen(args + [dockerfile_or_path],
  103. stdin=None, stdout=tee)
  104. else:
  105. p = subprocess.Popen(args + ['-'],
  106. stdin=subprocess.PIPE, stdout=tee)
  107. p.stdin.write(dockerfile_or_path.encode())
  108. p.stdin.close()
  109. assert p.wait() == 0, 'docker build failed'
  110. image_id, = re.search(rb'^Successfully built (\S+)$', out.getvalue(), re.MULTILINE).groups()
  111. return cls(image_id.decode(sys.stdout.encoding))
  112. @classmethod
  113. def pull(cls, image):
  114. out = io.BytesIO()
  115. with StdoutTee(out) as tee:
  116. subprocess.run(['sudo', 'docker', 'image', 'pull', image], stdout=tee)
  117. repo_digest, = re.search(rb'^Digest: (sha\S+:\S+)$', out.getvalue(), re.MULTILINE).groups()
  118. return cls('{}@{}'.format(image, repo_digest.decode()))
  119. def run(self, args=[], name=None, detach=False, env={},
  120. network=None, publish_ports=[], volumes=[], caps=[]):
  121. params = ['sudo', 'docker', 'run', '--rm']
  122. if name:
  123. params.extend(['--name', name])
  124. if detach:
  125. params.append('--detach')
  126. else:
  127. params.extend(['--interactive', '--tty'])
  128. params.extend([a for k, v in env.items() for a in ['--env', '{}={}'.format(k, v)]])
  129. if network:
  130. params.extend(['--network', network])
  131. params.extend([a for v in volumes for a in ['--volume', ':'.join(v)]])
  132. params.extend('--publish=' + ':'.join([str(a) for a in p]) for p in publish_ports)
  133. params.extend(['--security-opt=no-new-privileges', '--cap-drop=all'])
  134. params.extend(['--cap-add={}'.format(c) for c in caps])
  135. params.append(self._id)
  136. params.extend(args)
  137. sys.stderr.write('{}\n'.format(shlex_join(params)))
  138. subprocess.run(params, check=True)
  139. class StdoutTee:
  140. def __init__(self, sink):
  141. self._sink = sink
  142. def __enter__(self):
  143. self._read_fd, self._write_fd = os.pipe()
  144. import threading
  145. self._thread = threading.Thread(target=self._loop)
  146. self._thread.start()
  147. return self
  148. def _loop(self):
  149. while True:
  150. try:
  151. data = os_read_non_blocking(self._read_fd)
  152. except OSError: # fd closed
  153. return
  154. if data:
  155. self._sink.write(data)
  156. sys.stdout.buffer.write(data)
  157. sys.stdout.flush()
  158. def fileno(self):
  159. return self._write_fd
  160. def __exit__(self, exc_type, exc_value, traceback):
  161. os.close(self._read_fd)
  162. os.close(self._write_fd)
  163. self._thread.join()
  164. @contextlib.contextmanager
  165. def chdir(path):
  166. import shlex
  167. previous = os.getcwd()
  168. try:
  169. sys.stderr.write('cd {}\n'.format(shlex.quote(path)))
  170. os.chdir(path)
  171. yield path
  172. finally:
  173. os.chdir(previous)
  174. sys.stderr.write('cd {}\n'.format(shlex.quote(previous)))
  175. def dpkg_listfiles(pkg_name):
  176. assert isinstance(pkg_name, str)
  177. paths = $(dpkg --listfiles @(pkg_name)).split('\n')[:-1]
  178. assert len(paths) > 0, 'pkg {!r} not installed'.format(pkg_name)
  179. return paths
  180. def dpkg_search(path_search_pattern):
  181. assert isinstance(path_search_pattern, str)
  182. return re.findall(
  183. '^(\S+): (.*)$\n',
  184. $(dpkg --search @(path_search_pattern)),
  185. flags=re.MULTILINE,
  186. )
  187. def dpkg_welse(cmd):
  188. pkg_name, cmd_path = dpkg_which(cmd)
  189. return dpkg_listfiles(pkg_name)
  190. def dpkg_which(cmd):
  191. cmd_path = shutil.which(cmd)
  192. assert cmd_path, 'cmd {!r} not found'.format(cmd)
  193. matches = dpkg_search(cmd_path)
  194. assert len(matches) != 0, '{!r} not installed via dpkg'.format(cmd_path)
  195. assert len(matches) == 1
  196. return matches[0]
  197. @contextlib.contextmanager
  198. def encfs_mount(root_dir_path, mount_point_path, extpass=None):
  199. mount_arg_patterns = ['encfs', root_dir_path, mount_point_path]
  200. if extpass:
  201. mount_arg_patterns.extend(['--extpass', shlex_join(extpass)])
  202. with fuse_mount(mount_arg_patterns=mount_arg_patterns,
  203. mount_point_path=mount_point_path):
  204. yield mount_point_path
  205. @contextlib.contextmanager
  206. def fuse_mount(mount_arg_patterns, mount_point_path):
  207. import shlex
  208. mount_args = [a.format(mp=shlex.quote(mount_point_path))
  209. for a in mount_arg_patterns]
  210. sys.stderr.write('{}\n'.format(shlex_join(mount_args)))
  211. subprocess.check_call(mount_args)
  212. try:
  213. yield mount_point_path
  214. finally:
  215. umount_args = ['fusermount', '-u', '-z', mount_point_path]
  216. sys.stderr.write('{}\n'.format(shlex_join(umount_args)))
  217. subprocess.check_call(umount_args)
  218. def gpg_decrypt(path, verify=False):
  219. import gpg
  220. with gpg.Context() as gpg_ctx:
  221. with open(path, 'rb') as f:
  222. data, decrypt_result, verify_result = gpg_ctx.decrypt(f, verify=verify)
  223. return data
  224. def locate(*patterns, match_all=True, ignore_case=True):
  225. params = []
  226. if match_all:
  227. params.insert(0, '--all')
  228. if ignore_case:
  229. params.insert(0, '--ignore-case')
  230. return $(locate @(params) -- @(patterns)).split('\n')[:-1]
  231. def os_read_non_blocking(fd, buffer_size_bytes=8*1024, timeout_seconds=0.1):
  232. import select
  233. if fd in select.select([fd], [], [], timeout_seconds)[0]:
  234. return os.read(fd, buffer_size_bytes)
  235. else:
  236. return None
  237. def shlex_join(params):
  238. import shlex
  239. assert isinstance(params, list) or isinstance(params, tuple), params
  240. return ' '.join(shlex.quote(p) for p in params)
  241. def timestamp_now_utc():
  242. return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
  243. def timestamp_now_local():
  244. # if called without tz argument astimezone() assumes
  245. # the system local timezone for the target timezone
  246. return timestamp_now_utc().astimezone()
  247. def timestamp_iso_local():
  248. # if called without tz argument astimezone() assumes
  249. # the system local timezone for the target timezone
  250. return timestamp_now_local().strftime('%Y%m%dT%H%M%S%z')
  251. def yaml_load(path):
  252. import yaml
  253. with open(path, 'r') as f:
  254. return yaml.load(f.read())
  255. def yaml_write(path, data):
  256. import yaml
  257. with open(path, 'w') as f:
  258. f.write(yaml.dump(data, default_flow_style=False))
  259. aliases['d'] = ['sudo', 'docker']
  260. aliases['dpkg-welse'] = lambda args: '\n'.join(dpkg_welse(args[0]))
  261. aliases['dpkg-which'] = lambda args: '\t'.join(dpkg_which(args[0]))
  262. aliases['g'] = ['git']
  263. aliases['j'] = ['journalctl', '--output=short-iso']
  264. aliases['less'] = ['less', '--jump-target=.2']
  265. aliases['ll'] = ['ls', '-l', '--all', '--indicator-style=slash',
  266. '--human-readable', '--time-style=long-iso', '--color=auto']
  267. if shutil.which('startx') and $(tty).rstrip() == '/dev/tty1':
  268. startx
  269. # vim: filetype=python