Antoine Pitrou <[EMAIL PROTECTED]> added the comment:

Attaching a new patch with better performance characteristics than my
previous one, and the non-blocking test rewritten in a sane way.

Some timeit runs:

-s "import io; f=io.open('/dev/null', 'wb'); s=b'a'*1" "for i in xrange(100): 
f.write(s)"

without patch: 533 usec per loop
with patch: 724 usec per loop
with builtin open(): 59.6 usec per loop

-s "import io; f=io.open('/dev/null', 'wb'); s=b'a'*100" "for i in xrange(100): 
f.write(s)"

without patch: 563 usec per loop
with patch: 768 usec per loop
with builtin open(): 67.8 usec per loop

-s "import io; f=io.open('/dev/null', 'wb'); s=b'a'*10000" "for i in 
xrange(100): f.write(s)"

without patch: 1.35 msec per loop
with patch: 1.34 msec per loop
with builtin open(): 194 usec per loop

-s "import io; f=io.open('/dev/null', 'wb'); s=b'a'*100000" "for i in 
xrange(100): f.write(s)"

without patch: 4.92 msec per loop
with patch: 1.34 msec per loop
with builtin open(): 199 usec per loop

-s "import io; f=io.open('/dev/null', 'wb'); s=b'a'*1000000" "for i in 
xrange(100): f.write(s)"

without patch: 173 msec per loop
with patch: 1.35 msec per loop
with builtin open(): 194 usec per loop

Added file: http://bugs.python.org/file11034/bufferedwriter2.patch

_______________________________________
Python tracker <[EMAIL PROTECTED]>
<http://bugs.python.org/issue3476>
_______________________________________
diff -r d73a32dd9bbc Lib/io.py
--- a/Lib/io.py Fri Aug 01 21:33:00 2008 +0200
+++ b/Lib/io.py Fri Aug 01 22:03:27 2008 +0200
@@ -1018,39 +1018,79 @@ class BufferedWriter(_BufferedIOMixin):
         raw._checkWritable()
         _BufferedIOMixin.__init__(self, raw)
         self.buffer_size = buffer_size
-        self.max_buffer_size = (2*buffer_size
-                                if max_buffer_size is None
-                                else max_buffer_size)
-        self._write_buf = bytearray()
+        self._write_buf = bytearray(b" " * self.buffer_size)
+        self._reset_write_buf()
+
+    def _reset_write_buf(self):
+        # Just after the last byte actually written
+        self._write_pos = 0
+        # Just after the last buffered byte
+        self._write_end = 0
 
     def write(self, b):
         if self.closed:
             raise ValueError("write to closed file")
         if isinstance(b, unicode):
             raise TypeError("can't write unicode to binary stream")
-        # XXX we can implement some more tricks to try and avoid partial writes
-        if len(self._write_buf) > self.buffer_size:
-            # We're full, so let's pre-flush the buffer
-            try:
-                self.flush()
-            except BlockingIOError as e:
-                # We can't accept anything else.
-                # XXX Why not just let the exception pass through?
-                raise BlockingIOError(e.errno, e.strerror, 0)
-        before = len(self._write_buf)
-        self._write_buf.extend(b)
-        written = len(self._write_buf) - before
-        if len(self._write_buf) > self.buffer_size:
-            try:
-                self.flush()
-            except BlockingIOError as e:
-                if (len(self._write_buf) > self.max_buffer_size):
-                    # We've hit max_buffer_size. We have to accept a partial
-                    # write and cut back our buffer.
-                    overage = len(self._write_buf) - self.max_buffer_size
-                    self._write_buf = self._write_buf[:self.max_buffer_size]
-                    raise BlockingIOError(e.errno, e.strerror, overage)
-        return written
+        end = self._write_end
+        free = self.buffer_size - end
+        # If b is a bytearray, ensure it won't be mutated
+        # (we must keep the internal buffer size constant)
+        b = bytes(b)
+        if len(b) <= free:
+            # Fast path: the data to write can be fully buffered
+            self._write_buf[end:end+len(b)] = b
+            self._write_end = end + len(b)
+            return len(b)
+
+        # First write the current buffer
+        try:
+            self.flush()
+        except BlockingIOError as e:
+            # Make some place by rotating the buffer
+            pos = self._write_pos
+            if pos > 0:
+                self._write_buf[:-pos] = self._write_buf[pos:]
+                self._write_pos = 0
+            end = self._write_end - pos
+            avail = self.buffer_size - end
+            # Buffer as much as possible
+            if avail < len(b):
+                self._write_buf[end:] = b[:avail]
+                self._write_end = self.buffer_size
+                # The remaining bytes don't fit inside the buffer
+                raise BlockingIOError(e.errno, e.strerror, avail)
+            self._write_buf[end:end+len(b)] = b
+            self._write_end = end + len(b)
+            return len(b)
+
+        # Then write b itself
+        remaining = len(b)
+        written = 0
+        try:
+            while remaining >= self.buffer_size:
+                # XXX if self.raw.write() doesn't want to write everything
+                # at once, this can be quadratic.
+                n = self.raw.write(b[written:])
+                written += n
+                remaining -= n
+        except BlockingIOError as e:
+            n = e.characters_written
+            written += n
+            remaining -= n
+            if remaining > self.buffer_size:
+                # Buffer as much as possible
+                self._write_buf[:] = b[written:written+self.buffer_size]
+                self._write_end = self.buffer_size
+                # The remaining bytes don't fit inside the buffer
+                raise BlockingIOError(e.errno, e.strerror,
+                    written + self.buffer_size)
+
+        if remaining > 0:
+            self._write_buf[:remaining] = b[-remaining:]
+        self._write_pos = 0
+        self._write_end = remaining
+        return written + remaining
 
     def truncate(self, pos=None):
         self.flush()
@@ -1063,18 +1103,20 @@ class BufferedWriter(_BufferedIOMixin):
             raise ValueError("flush of closed file")
         written = 0
         try:
-            while self._write_buf:
-                n = self.raw.write(self._write_buf)
-                del self._write_buf[:n]
+            end = self._write_end
+            while self._write_pos < end:
+                n = self.raw.write(self._write_buf[self._write_pos:end])
+                self._write_pos += n
                 written += n
         except BlockingIOError as e:
             n = e.characters_written
-            del self._write_buf[:n]
+            self._write_pos += n
             written += n
             raise BlockingIOError(e.errno, e.strerror, written)
+        self._reset_write_buf()
 
     def tell(self):
-        return self.raw.tell() + len(self._write_buf)
+        return self.raw.tell() + self._write_end - self._write_pos
 
     def seek(self, pos, whence=0):
         self.flush()
@@ -1107,7 +1149,7 @@ class BufferedRWPair(BufferedIOBase):
         reader._checkReadable()
         writer._checkWritable()
         self.reader = BufferedReader(reader, buffer_size)
-        self.writer = BufferedWriter(writer, buffer_size, max_buffer_size)
+        self.writer = BufferedWriter(writer, buffer_size)
 
     def read(self, n=None):
         if n is None:
@@ -1157,11 +1199,14 @@ class BufferedRandom(BufferedWriter, Buf
     writer) defaults to twice the buffer size.
     """
 
+    # XXX reusing Buffered{Reader, Writer} makes the implementation
+    # simple but inefficient (see flush() calls in read() etc.)
+
     def __init__(self, raw,
                  buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None):
         raw._checkSeekable()
         BufferedReader.__init__(self, raw, buffer_size)
-        BufferedWriter.__init__(self, raw, buffer_size, max_buffer_size)
+        BufferedWriter.__init__(self, raw, buffer_size)
 
     def seek(self, pos, whence=0):
         self.flush()
@@ -1169,11 +1214,12 @@ class BufferedRandom(BufferedWriter, Buf
         # if the raw seek fails, we don't lose buffered data forever.
         pos = self.raw.seek(pos, whence)
         self._reset_read_buf()
+        self._reset_write_buf()
         return pos
 
     def tell(self):
-        if self._write_buf:
-            return self.raw.tell() + len(self._write_buf)
+        if self._write_end > self._write_pos:
+            return BufferedWriter.tell(self)
         else:
             return BufferedReader.tell(self)
 
@@ -1207,6 +1253,7 @@ class BufferedRandom(BufferedWriter, Buf
             # Undo readahead
             self.raw.seek(self._read_pos - len(self._read_buf), 1)
             self._reset_read_buf()
+            self._reset_write_buf()
         return BufferedWriter.write(self, b)
 
 
diff -r d73a32dd9bbc Lib/test/test_io.py
--- a/Lib/test/test_io.py       Fri Aug 01 21:33:00 2008 +0200
+++ b/Lib/test/test_io.py       Fri Aug 01 22:03:27 2008 +0200
@@ -7,6 +7,7 @@ import time
 import time
 import array
 import unittest
+import threading
 from itertools import chain
 from test import test_support
 
@@ -19,6 +20,9 @@ class MockRawIO(io.RawIOBase):
     def __init__(self, read_stack=()):
         self._read_stack = list(read_stack)
         self._write_stack = []
+
+    def get_written(self):
+        return b"".join(self._write_stack)
 
     def read(self, n=None):
         try:
@@ -27,7 +31,7 @@ class MockRawIO(io.RawIOBase):
             return b""
 
     def write(self, b):
-        self._write_stack.append(b[:])
+        self._write_stack.append(bytes(b))
         return len(b)
 
     def writable(self):
@@ -63,17 +67,33 @@ class MockFileIO(io.BytesIO):
 
 class MockNonBlockWriterIO(io.RawIOBase):
 
-    def __init__(self, blocking_script):
-        self._blocking_script = list(blocking_script)
+    def __init__(self):
         self._write_stack = []
+        self._blocker_char = None
+
+    def pop_written(self):
+        s = b"".join(self._write_stack)
+        self._write_stack[:] = []
+        return s
+
+    def block_on(self, char):
+        """Block when a given char is encountered."""
+        self._blocker_char = char
 
     def write(self, b):
-        self._write_stack.append(b[:])
-        n = self._blocking_script.pop(0)
-        if (n < 0):
-            raise io.BlockingIOError(0, "test blocking", -n)
-        else:
-            return n
+        b = bytes(b)
+        n = -1
+        if self._blocker_char:
+            try:
+                n = b.index(self._blocker_char)
+            except ValueError:
+                pass
+            else:
+                self._blocker_char = None
+                self._write_stack.append(b[:n])
+                raise io.BlockingIOError(0, "test blocking", n)
+        self._write_stack.append(b)
+        return len(b)
 
     def writable(self):
         return True
@@ -398,7 +418,7 @@ class BufferedWriterTest(unittest.TestCa
         writer = MockRawIO()
         bufio = io.BufferedWriter(writer, 8)
 
-        bufio.write(b"abc")
+        self.assertEquals(bufio.write(b"abc"), 3)
 
         self.assertFalse(writer._write_stack)
 
@@ -406,30 +426,39 @@ class BufferedWriterTest(unittest.TestCa
         writer = MockRawIO()
         bufio = io.BufferedWriter(writer, 8)
 
-        bufio.write(b"abc")
-        bufio.write(b"defghijkl")
+        self.assertEquals(bufio.write(b"abc"), 3)
+        self.assertEquals(bufio.write(b"defghijkl"), 9)
 
-        self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
+        # The first 8 bytes were written
+        self.assertTrue(writer.get_written().startswith(b"abcdefgh"),
+            writer.get_written())
 
     def testWriteNonBlocking(self):
-        raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
-        bufio = io.BufferedWriter(raw, 8, 16)
+        raw = MockNonBlockWriterIO()
+        bufio = io.BufferedWriter(raw, 8)
 
-        bufio.write(b"asdf")
-        bufio.write(b"asdfa")
-        self.assertEquals(b"asdfasdfa", raw._write_stack[0])
+        self.assertEquals(bufio.write(b"abcd"), 4)
+        self.assertEquals(bufio.write(b"efghi"), 5)
+        # 1 byte will be written, the rest will be buffered
+        raw.block_on(b"k")
+        self.assertEquals(bufio.write(b"jklmnopqr"), 9)
 
-        bufio.write(b"asdfasdfasdf")
-        self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
-        bufio.write(b"asdfasdfasdf")
-        self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
-        self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
+        # 4 bytes will be written, 8 will be buffered and the rest will be lost
+        raw.block_on(b"0")
+        try:
+            bufio.write(b"wxyz0123456789")
+        except io.BlockingIOError as e:
+            written = e.characters_written
+        else:
+            self.fail("BlockingIOError should have been raised")
+        self.assertEquals(written, 12)
+        self.assertEquals(raw.pop_written(),
+            b"abcdefghijklmnopqrwxyz")
 
-        bufio.write(b"asdfasdfasdf")
-
-        # XXX I don't like this test. It relies too heavily on how the
-        # algorithm actually works, which we might change. Refactor
-        # later.
+        self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
+        s = raw.pop_written()
+        # Previously buffered bytes were flushed
+        self.assertTrue(s.startswith(b"01234567A"), s)
 
     def testFileno(self):
         rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -445,6 +474,36 @@ class BufferedWriterTest(unittest.TestCa
         bufio.flush()
 
         self.assertEquals(b"abc", writer._write_stack[0])
+
+    def testThreads(self):
+        # BufferedWriter should not raise exceptions or crash
+        # when called from multiple threads.
+        try:
+            # We use a real file object because it allows us to
+            # exercise situations where the GIL is released before
+            # writing the buffer.
+            with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
+                bufio = io.BufferedWriter(raw, 8)
+                errors = []
+                def f():
+                    try:
+                        # Write enough bytes to flush the buffer
+                        s = b"a" * 19
+                        for i in range(50):
+                            bufio.write(s)
+                    except Exception as e:
+                        errors.append(e)
+                        raise
+                threads = [threading.Thread(target=f) for x in range(20)]
+                for t in threads:
+                    t.start()
+                time.sleep(0.02) # yield
+                for t in threads:
+                    t.join()
+                self.assertFalse(errors,
+                    "the following exceptions were caught: %r" % errors)
+        finally:
+            test_support.unlink(test_support.TESTFN)
 
 
 class BufferedRWPairTest(unittest.TestCase):
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to