commit f0b80e7d35f8836abe3d347a9d82ad4691614856
Author: Oswald Buddenhagen <[email protected]>
Date:   Sat Nov 8 15:42:41 2014 +0100

    make socket writing buffered
    
    the primary objective is reducing the number of small SSL packets (which
    are always padded), but fewer syscalls in the non-SSL case should be
    good as well.

 src/socket.c |   97 ++++++++++++++++++++++++++++++++++---------------
 src/socket.h |    5 ++-
 2 files changed, 70 insertions(+), 32 deletions(-)

diff --git a/src/socket.c b/src/socket.c
index 5f6bb50..586c98c 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -281,6 +281,7 @@ static void start_tls_p3( conn_t *conn, int ok )
 #endif /* HAVE_LIBSSL */
 
 static void socket_fd_cb( int, void * );
+static void socket_fake_cb( void * );
 
 static void socket_connect_one( conn_t * );
 static void socket_connect_failed( conn_t * );
@@ -293,12 +294,14 @@ socket_open_internal( conn_t *sock, int fd )
        sock->fd = fd;
        fcntl( fd, F_SETFL, O_NONBLOCK );
        init_notifier( &sock->notify, fd, socket_fd_cb, sock );
+       init_wakeup( &sock->fd_fake, socket_fake_cb, sock );
 }
 
 static void
 socket_close_internal( conn_t *sock )
 {
        wipe_notifier( &sock->notify );
+       wipe_wakeup( &sock->fd_fake );
        close( sock->fd );
        sock->fd = -1;
 }
@@ -500,6 +503,8 @@ socket_close( conn_t *sock )
 #endif
        while (sock->write_buf)
                dispose_chunk( sock );
+       free( sock->append_buf );
+       sock->append_buf = 0;
 }
 
 static void
@@ -609,8 +614,6 @@ dispose_chunk( conn_t *conn )
        buff_chunk_t *bc = conn->write_buf;
        if (!(conn->write_buf = bc->next))
                conn->write_buf_append = &conn->write_buf;
-       if (bc->data != bc->buf)
-               free( bc->data );
        free( bc );
 }
 
@@ -641,48 +644,69 @@ do_queued_write( conn_t *conn )
 }
 
 static void
-do_append( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+do_append( conn_t *conn, buff_chunk_t *bc )
 {
-       buff_chunk_t *bc;
-
-       if (takeOwn == GiveOwn) {
-               bc = nfmalloc( offsetof(buff_chunk_t, buf) );
-               bc->data = buf;
-       } else {
-               bc = nfmalloc( offsetof(buff_chunk_t, buf) + len );
-               bc->data = bc->buf;
-               memcpy( bc->data, buf, len );
-       }
-       bc->len = len;
        bc->next = 0;
        *conn->write_buf_append = bc;
        conn->write_buf_append = &bc->next;
 }
 
+/* This is big enough to avoid excessive chunking, but is
+ * sufficiently small to keep SSL latency low with a slow uplink. */
+#define WRITE_CHUNK_SIZE 1024
+
 int
 socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
 {
-       for (; iovcnt; iovcnt--, iov++) {
-               if (conn->write_buf) {
-                       do_append( conn, iov->buf, iov->len, iov->takeOwn );
+       int i, buf_avail, len, offset = 0, total = 0;
+       buff_chunk_t *bc, *exwb = conn->write_buf;
+
+       for (i = 0; i < iovcnt; i++)
+               total += iov[i].len;
+       bc = conn->append_buf;
+       if (bc && total >= WRITE_CHUNK_SIZE) {
+               /* If the new data is too big, queue the pending buffer to 
avoid latency. */
+               do_append( conn, bc );
+               bc  = 0;
+       }
+       while (total) {
+               if (!bc) {
+                       buf_avail = total > WRITE_CHUNK_SIZE ? total : 
WRITE_CHUNK_SIZE;
+                       bc = nfmalloc( offsetof(buff_chunk_t, data) + buf_avail 
);
+                       bc->len = 0;
                } else {
-                       int n = do_write( conn, iov->buf, iov->len );
-                       if (n < 0) {
-                               do {
-                                       if (iov->takeOwn == GiveOwn)
-                                               free( iov->buf );
-                                       iovcnt--, iov++;
-                               } while (iovcnt);
-                               return -1;
+                       /* A pending buffer will always be of standard size - 
over-sized
+                        * buffers are immediately filled and queued. */
+                       buf_avail = WRITE_CHUNK_SIZE - bc->len;
+               }
+               while (total) {
+                       len = iov->len - offset;
+                       if (len > buf_avail)
+                               len = buf_avail;
+                       memcpy( bc->data + bc->len, iov->buf + offset, len );
+                       bc->len += len;
+                       buf_avail -= len;
+                       offset += len;
+                       total -= len;
+                       if (offset == iov->len) {
+                               if (iov->takeOwn == GiveOwn)
+                                       free( iov->buf );
+                               iov++;
+                               offset = 0;
                        }
-                       if (n != iov->len) {
-                               conn->write_offset = n;
-                               do_append( conn, iov->buf, iov->len, 
iov->takeOwn );
-                       } else if (iov->takeOwn == GiveOwn) {
-                               free( iov->buf );
+                       if (!buf_avail) {
+                               do_append( conn, bc );
+                               bc = 0;
+                               break;
                        }
                }
        }
+       conn->append_buf = bc;
+       /* Queue the pending write once the main loop goes idle. */
+       conf_wakeup( &conn->fd_fake, bc ? 0 : -1 );
+       /* If no writes were queued before, ensure that flushing commences. */
+       if (!exwb)
+               return do_queued_write( conn );
        return 0;
 }
 
@@ -733,6 +757,19 @@ socket_fd_cb( int events, void *aux )
                socket_fill( conn );
 }
 
+static void
+socket_fake_cb( void *aux )
+{
+       conn_t *conn = (conn_t *)aux;
+
+       buff_chunk_t *exwb = conn->write_buf;
+       do_append( conn, conn->append_buf );
+       conn->append_buf = 0;
+       /* If no writes were queued before, ensure that flushing commences. */
+       if (!exwb)
+               do_queued_write( conn );
+}
+
 #ifdef HAVE_LIBSSL
 static void
 ssl_fake_cb( void *aux )
diff --git a/src/socket.h b/src/socket.h
index 4d51a34..efecce6 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -57,9 +57,8 @@ typedef struct server_conf {
 
 typedef struct buff_chunk {
        struct buff_chunk *next;
-       char *data;
        int len;
-       char buf[1];
+       char data[1];
 } buff_chunk_t;
 
 typedef struct {
@@ -88,8 +87,10 @@ typedef struct {
        void *callback_aux;
 
        notifier_t notify;
+       wakeup_t fd_fake;
 
        /* writing */
+       buff_chunk_t *append_buf; /* accumulating buffer */
        buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
        int write_offset; /* offset into buffer head */
 

------------------------------------------------------------------------------
New Year. New Location. New Benefits. New Data Center in Ashburn, VA.
GigeNET is offering a free month of service with a new server in Ashburn.
Choose from 2 high performing configs, both with 100TB of bandwidth.
Higher redundancy.Lower latency.Increased capacity.Completely compliant.
http://p.sf.net/sfu/gigenet
_______________________________________________
isync-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to