subprocess_tee_stream_multiplier.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. #!/usr/bin/env python3
  2. """
  3. > sstream = io.StringIO()
  4. > subprocess.run(['hostname'], stdout=sstream, check=True)
  5. io.UnsupportedOperation: fileno
  6. """
  7. import os
  8. import select
  9. import threading
  10. def rselect(fd, timeout_seconds=None):
  11. """ Wait until file descriptor is ready for reading.
  12. Return True if ready. Return False if timeout was reached.
  13. select.select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
  14. """
  15. rlist, wlist, xlist = select.select([fd], [], [], timeout_seconds)
  16. return fd in rlist
  17. class SubprocessTee:
  18. # compare with libc's setvbuf / BUFSIZ
  19. BUFFER_SIZE_BYTES = 8196
  20. READ_TIMEOUT_SECONDS = 0.01
  21. def __init__(self, sinks):
  22. self._sinks = sinks
  23. def __enter__(self):
  24. self._read_fd, self._write_fd = os.pipe()
  25. self._thread = threading.Thread(target=self._loop)
  26. self._thread.start()
  27. return self
  28. def _write(self, data):
  29. for sink in self._sinks:
  30. if isinstance(sink, io.TextIOWrapper):
  31. sink.buffer.write(data)
  32. else:
  33. sink.write(data)
  34. def _loop(self):
  35. while True:
  36. try:
  37. if rselect(self._read_fd, self.READ_TIMEOUT_SECONDS):
  38. self._write(os.read(self._read_fd, self.BUFFER_SIZE_BYTES))
  39. except OSError: # fd closed
  40. return
  41. def fileno(self):
  42. return self._write_fd
  43. def __exit__(self, exc_type, exc_value, traceback):
  44. os.close(self._read_fd)
  45. os.close(self._write_fd)
  46. # wait for writes to stop before sinks are being closed
  47. self._thread.join()
  48. # usage / example
  49. import io
  50. import subprocess
  51. import sys
  52. with open('hostname', 'bw') as file_out:
  53. bstream = io.BytesIO()
  54. with SubprocessTee([sys.stdout, sys.stderr, file_out, bstream]) as tee:
  55. subprocess.run(['hostname'], stdout=tee, check=True)
  56. print('hostname bstream: {!r}'.format(bstream.getvalue()))
  57. with open('hostname', 'r') as file_in:
  58. print('hostname file: {!r}'.format(file_in.read()))