Browse Source

poll stdout of subprocess without infinitely blocking main thread

Fabian Peter Hammerle 5 years ago
parent
commit
d4eb98b337
2 changed files with 45 additions and 77 deletions
  1. 45 0
      non_blocking_os_read.py
  2. 0 77
      subprocess_tee_stream_multiplier.py

+ 45 - 0
non_blocking_os_read.py

@@ -0,0 +1,45 @@
+#!/usr/bin/env python3
+
+import os
+import select
+
+DEFAULT_BUFFER_SIZE_BYTES = 8196
+DEFAULT_READ_TIMEOUT_SECONDS = 1.0
+
+
+def rselect(fd, timeout_seconds=None):
+    """ Wait until file descriptor is ready for reading.
+    Return True if ready. Return False if timeout was reached.
+
+    select.select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
+    """
+    rlist, wlist, xlist = select.select([fd], [], [], timeout_seconds)
+    return fd in rlist
+
+
+def non_blocking_os_read(fd, buffer_size_bytes=DEFAULT_BUFFER_SIZE_BYTES, timeout_seconds=DEFAULT_READ_TIMEOUT_SECONDS):
+    if rselect(fd, timeout_seconds=timeout_seconds):
+        return os.read(fd, buffer_size_bytes)
+    else:
+        return None
+
+
+# usage / example
+
+import logging
+import subprocess
+import sys
+
+logging.basicConfig(level=logging.INFO)
+
+read_fd, write_fd = os.pipe()
+p = subprocess.Popen(['ping', '1.1.1.1'], stdout=write_fd)
+while p.poll() is None:
+    logging.debug('waiting...')
+    stdout_data = non_blocking_os_read(read_fd)
+    if stdout_data:
+        logging.info('read {} bytes'.format(len(stdout_data)))
+        sys.stdout.buffer.write(stdout_data)
+        sys.stdout.flush()
+    else:
+        logging.info('timeout')

+ 0 - 77
subprocess_tee_stream_multiplier.py

@@ -1,77 +0,0 @@
-#!/usr/bin/env python3
-
-"""
-> sstream = io.StringIO()
-> subprocess.run(['hostname'], stdout=sstream, check=True)
-io.UnsupportedOperation: fileno
-"""
-
-import os
-import select
-import threading
-
-
-def rselect(fd, timeout_seconds=None):
-    """ Wait until file descriptor is ready for reading.
-    Return True if ready. Return False if timeout was reached.
-
-    select.select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
-    """
-    rlist, wlist, xlist = select.select([fd], [], [], timeout_seconds)
-    return fd in rlist
-
-
-class SubprocessTee:
-
-    # compare with libc's setvbuf / BUFSIZ
-    BUFFER_SIZE_BYTES = 8196
-    READ_TIMEOUT_SECONDS = 0.01
-
-    def __init__(self, sinks):
-        self._sinks = sinks
-
-    def __enter__(self):
-        self._read_fd, self._write_fd = os.pipe()
-        self._thread = threading.Thread(target=self._loop)
-        self._thread.start()
-        return self
-
-    def _write(self, data):
-        for sink in self._sinks:
-            if isinstance(sink, io.TextIOWrapper):
-                sink.buffer.write(data)
-            else:
-                sink.write(data)
-
-    def _loop(self):
-        while True:
-            try:
-                if rselect(self._read_fd, self.READ_TIMEOUT_SECONDS):
-                    self._write(os.read(self._read_fd, self.BUFFER_SIZE_BYTES))
-            except OSError:  # fd closed
-                return
-
-    def fileno(self):
-        return self._write_fd
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        os.close(self._read_fd)
-        os.close(self._write_fd)
-        # wait for writes to stop before sinks are being closed
-        self._thread.join()
-
-
-# usage / example
-
-import io
-import subprocess
-import sys
-
-with open('hostname', 'bw') as file_out:
-    bstream = io.BytesIO()
-    with SubprocessTee([sys.stdout, sys.stderr, file_out, bstream]) as tee:
-        subprocess.run(['hostname'], stdout=tee, check=True)
-
-print('hostname bstream: {!r}'.format(bstream.getvalue()))
-with open('hostname', 'r') as file_in:
-    print('hostname file: {!r}'.format(file_in.read()))