#!/usr/bin/env python3 """ > sstream = io.StringIO() > subprocess.run(['hostname'], stdout=sstream, check=True) io.UnsupportedOperation: fileno """ import os import select import threading def rselect(fd, timeout_seconds=None): """ Wait until file descriptor is ready for reading. Return True if ready. Return False if timeout was reached. select.select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist) """ rlist, wlist, xlist = select.select([fd], [], [], timeout_seconds) return fd in rlist class SubprocessTee: # compare with libc's setvbuf / BUFSIZ BUFFER_SIZE_BYTES = 8196 READ_TIMEOUT_SECONDS = 0.01 def __init__(self, sinks): self._sinks = sinks def __enter__(self): self._read_fd, self._write_fd = os.pipe() self._thread = threading.Thread(target=self._loop) self._thread.start() return self def _write(self, data): for sink in self._sinks: if isinstance(sink, io.TextIOWrapper): sink.buffer.write(data) else: sink.write(data) def _loop(self): while True: try: if rselect(self._read_fd, self.READ_TIMEOUT_SECONDS): self._write(os.read(self._read_fd, self.BUFFER_SIZE_BYTES)) except OSError: # fd closed return def fileno(self): return self._write_fd def __exit__(self, exc_type, exc_value, traceback): os.close(self._read_fd) os.close(self._write_fd) # wait for writes to stop before sinks are being closed self._thread.join() # usage / example import io import subprocess import sys with open('hostname', 'bw') as file_out: bstream = io.BytesIO() with SubprocessTee([sys.stdout, sys.stderr, file_out, bstream]) as tee: subprocess.run(['hostname'], stdout=tee, check=True) print('hostname bstream: {!r}'.format(bstream.getvalue())) with open('hostname', 'r') as file_in: print('hostname file: {!r}'.format(file_in.read()))