Changeset: 07ac8508edec for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=07ac8508edec Modified Files: testing/src/process.py Branch: Jun2010 Log Message:
Fix for bug 2621. When you start multiple clients/servers using our Python process module, and for each of those processes you collect the output in order to print the output in an orderly manner (i.e. avoiding intermixing of output from multiple processes), there is a potential deadlock (or rather, starvation): several processes may be hanging, trying to write output, while the Python process orchestrating all of this is reading from yet another process (usually one that is waiting for some other event that isn't going to happen). We solve this by starting reader threads for each of the outputs (stdout and stderr) of each of the processes we start. In order to be able to read from the pipes in the Python program, we also provide a read method to get data from the reader thread. The upshot of this all is: no changes to the code that uses the process module. diffs (129 lines): diff -r 5927f4a21c34 -r 07ac8508edec testing/src/process.py --- a/testing/src/process.py Tue Aug 17 15:02:15 2010 +0200 +++ b/testing/src/process.py Tue Aug 17 15:10:01 2010 +0200 @@ -6,6 +6,8 @@ import tempfile import copy import atexit +import threading +import Queue from subprocess import PIPE @@ -54,6 +56,58 @@ atexit.register(_delfiles) +class _BufferedPipe: + def __init__(self, fd): + self._pipe = fd + self._queue = Queue.Queue() + self._eof = False + self._thread = threading.Thread(target = self._readerthread, + args = (fd, self._queue)) + self._thread.setDaemon(True) + self._thread.start() + + def _readerthread(self, fh, queue): + while True: + c = fh.read(1) + queue.put(c) # put '' if at EOF + if not c: + break + + def close(self): + if self._thread: + self._thread.join() + self._thread = None + + def read(self, size = -1): + if self._eof: + return '' + if size < 0: + self.close() + ret = [] + while size != 0: + c = self._queue.get() + if c == '\r': + c = self._queue.get() # just ignore \r + ret.append(c) + if size > 0: + size -= 1 + self._queue.task_done() + if not c: + self._eof = True + break # EOF + return ''.join(ret) + + def readline(self, size = -1): + ret = [] + while size != 0: + c = self.read(1) + ret.append(c) + if size > 0: + size -= 1 + if c == '\n' or c == '': + break + return ''.join(ret) + class Popen(subprocess.Popen): def __init__(self, *args, **kwargs): self.dotmonetdbfile = None @@ -69,6 +123,23 @@ pass return ret + def communicate(self, input = None): + # since we always use threads for stdout/stderr, we can just read() + stdout = None + stderr = None + if self.stdin: + if input: + self.stdin.write(input) + self.stdin.close() + if self.stdout: + stdout = self.stdout.read() + self.stdout.close() + if self.stderr: + stderr = self.stderr.read() + self.stderr.close() + self.wait() + return stdout, stderr + def client(lang, args = [], stdin = None, stdout = None, stderr = None, port = os.getenv('MAPIPORT'), host = None, user = 'monetdb', passwd = 'monetdb', log = False): @@ -137,6 +208,10 @@ env = env, universal_newlines = True) p.dotmonetdbfile = fnam + if stdout == PIPE: + p.stdout = _BufferedPipe(p.stdout) + if stderr == PIPE: + p.stderr = _BufferedPipe(p.stderr) return p def server(lang, args = [], stdin = None, stdout = None, stderr = None, @@ -205,10 +280,15 @@ print >> sys.stderr, prompt print >> sys.stderr sys.stderr.flush() - return Popen(cmd + args, - stdin = stdin, - stdout = stdout, - stderr = stderr, - shell = False, - universal_newlines = True, - bufsize = bufsize) + p = Popen(cmd + args, + stdin = stdin, + stdout = stdout, + stderr = stderr, + shell = False, + universal_newlines = True, + bufsize = bufsize) + if stdout == PIPE: + p.stdout = _BufferedPipe(p.stdout) + if stderr == PIPE: + p.stderr = _BufferedPipe(p.stderr) + return p _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list