Changeset: f14ab769cd97 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f14ab769cd97
Modified Files:
        clients/mapilib/mapi.c
        common/stream/stream.c
        common/stream/stream.h
        sql/include/sql_query.h
Branch: protocol
Log Message:

Use old version of read_line for protocol v9.

Because regular block streams do not buffer reads for some reason.


diffs (252 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -876,7 +876,9 @@ struct MapiRowBuf {
 struct BlockCache {
        char *buf;
        int lim;
+       int nxt;
        int end;
+       int eos;
 };
 
 
@@ -1886,18 +1888,24 @@ mapi_new(void)
        mid->password = NULL;
 
        mid->protocol = protauto;
-       mid->blocksize = 128*BLOCK; // 1 MB
+       mid->blocksize = 128 * BLOCK; // 1 MB
 
        mid->cachelimit = 100;
        mid->redircnt = 0;
        mid->redirmax = 10;
        mid->tracelog = NULL;
-       mid->blk.buf = malloc(BLOCK);
+       mid->blk.eos = 0;
+       mid->blk.buf = malloc(BLOCK + 1);
        if (mid->blk.buf == NULL) {
                mapi_destroy(mid);
                return NULL;
        }
+       mid->blk.buf[BLOCK] = 0;
+       mid->blk.buf[0] = 0;
+       mid->blk.nxt = 0;
+       mid->blk.end = 0;
        mid->blk.lim = BLOCK;
+
        mid->first = NULL;
 
        return mid;
@@ -2535,8 +2543,6 @@ mapi_reconnect(Mapi mid)
 
        mid->connected = 1;
 
-       // FIXME: these need to become snappy streams (framing2)
-
        if (!isa_block_stream(mid->to)) {
                mid->to = block_stream(mid->to);
                check_stream(mid, mid->to, mnstr_error(mid->to), 
"mapi_reconnect", mid->error);
@@ -2550,7 +2556,7 @@ mapi_reconnect(Mapi mid)
        /* consume server challenge */
        len = mnstr_read_block(mid->from, buf, 1, BLOCK);
 
-       check_stream(mid, mid->from, "Connection terminated while starting", 
"mapi_reconnect", mid->error);
+       check_stream(mid, mid->from, "Connection terminated while starting", 
"mapi_reconnect", (mid->blk.eos = 1, mid->error));
 
        assert(len < BLOCK);
        buf[len] = 0;
@@ -3483,32 +3489,113 @@ mapi_param_store(MapiHdl hdl)
 static char *
 read_line(Mapi mid)
 {
-       int ret = 0;
-       if (mid->active == NULL)
-               return 0;
-       mid->blk.end = 0;
-       do {
-               if ((mid->blk.end + 1) == mid->blk.lim) {
-                       REALLOC(mid->blk.buf, mid->blk.lim + BLOCK);
-                       if (!mid->blk.buf) {
+       if (isa_fixed_block_stream(mid->from)) {
+               // fixed blockstreams are not buffered, so we roll our own 
buffer
+               char *reply;
+               char *nl;
+               char *s;                /* from where to search for newline */
+
+               if (mid->active == NULL)
+                       return 0;
+
+               /* check if we need to read more blocks to get a new line */
+               mid->blk.eos = 0;
+               s = mid->blk.buf + mid->blk.nxt;
+               while ((nl = strchr(s, '\n')) == NULL && !mid->blk.eos) {
+                       ssize_t len;
+
+                       if (mid->blk.lim - mid->blk.end < BLOCK) {
+                               int len;
+
+                               len = mid->blk.lim;
+                               if (mid->blk.nxt <= BLOCK) {
+                                       /* extend space */
+                                       len += BLOCK;
+                               }
+                               REALLOC(mid->blk.buf, len + 1);
+                               if (mid->blk.nxt > 0) {
+                                       memmove(mid->blk.buf, mid->blk.buf + 
mid->blk.nxt, mid->blk.end - mid->blk.nxt + 1);
+                                       mid->blk.end -= mid->blk.nxt;
+                                       mid->blk.nxt = 0;
+                               }
+                               mid->blk.lim = len;
+                       }
+
+                       s = mid->blk.buf + mid->blk.end;
+
+                       /* fetch one more block */
+                       if (mid->trace == MAPI_TRACE)
+                               printf("fetch next block: start at:%d\n", 
mid->blk.end);
+                       len = mnstr_read(mid->from, mid->blk.buf + 
mid->blk.end, 1, BLOCK);
+                       check_stream(mid, mid->from, "Connection terminated 
during read line", "read_line", (mid->blk.eos = 1, (char *) 0));
+                       if (mid->tracelog) {
+                               mapi_log_header(mid, "R");
+                               mnstr_write(mid->tracelog, mid->blk.buf + 
mid->blk.end, 1, len);
+                               mnstr_flush(mid->tracelog);
+                       }
+                       mid->blk.buf[mid->blk.end + len] = 0;
+                       if (mid->trace == MAPI_TRACE) {
+                               printf("got next block: length:" SSZFMT "\n", 
len);
+                               printf("text:%s\n", mid->blk.buf + 
mid->blk.end);
+                       }
+                       if (len == 0) { /* add prompt */
+                               if (mid->blk.end > mid->blk.nxt) {
+                                       /* add fake newline since newline was
+                                        * missing from server */
+                                       nl = mid->blk.buf + mid->blk.end;
+                                       *nl = '\n';
+                                       mid->blk.end++;
+                               }
+                               len = 2;
+                               mid->blk.buf[mid->blk.end] = PROMPTBEG;
+                               mid->blk.buf[mid->blk.end + 1] = '\n';
+                               mid->blk.buf[mid->blk.end + 2] = 0;
+                       }
+                       mid->blk.end += (int) len;
+               }
+               if (mid->trace == MAPI_TRACE) {
+                       printf("got complete block: \n");
+                       printf("text:%s\n", mid->blk.buf + mid->blk.nxt);
+               }
+
+               /* we have a complete line in the buffer */
+               assert(nl);
+               *nl++ = 0;
+               reply = mid->blk.buf + mid->blk.nxt;
+               mid->blk.nxt = (int) (nl - mid->blk.buf);
+
+               if (mid->trace == MAPI_TRACE)
+                       printf("read_line:%s\n", reply);
+               return reply;
+       } else {
+               int ret = 0;
+               if (mid->active == NULL)
+                       return 0;
+               mid->blk.end = 0;
+
+               do {
+                       if ((mid->blk.end + 1) == mid->blk.lim) {
+                               REALLOC(mid->blk.buf, mid->blk.lim + BLOCK);
+                               if (!mid->blk.buf) {
+                                       return 0;
+                               }
+                               mid->blk.lim += BLOCK;
+                       }
+                       /* mid->from is **always** buffered, so no point in 
rolling an additional cache on top */
+                       if ((ret = mnstr_readChr(mid->from, mid->blk.buf + 
mid->blk.end)) != 1) {
+                               if (ret == 0) {
+                                       mid->blk.buf[0] = PROMPTBEG;
+                                       mid->blk.buf[1] = '\n';
+                                       mid->blk.buf[2] = 0;
+                                       return mid->blk.buf;
+                               }
                                return 0;
                        }
-                       mid->blk.lim += BLOCK;
-               }
-               /* mid->from is **always** buffered, so no point in rolling an 
additional cache on top */
-               if ((ret = mnstr_readChr(mid->from, mid->blk.buf + 
mid->blk.end)) != 1) {
-                       if (ret == 0) {
-                               mid->blk.buf[0] = PROMPTBEG;
-                               mid->blk.buf[1] = '\n';
-                               mid->blk.buf[2] = 0;
-                               return mid->blk.buf;
-                       }
-                       return 0;
-               }
-       } while (mid->blk.buf[mid->blk.end++] != '\n');
-
-       mid->blk.buf[mid->blk.end-1] = 0;
-       return mid->blk.buf;
+               } while (mid->blk.buf[mid->blk.end++] != '\n');
+
+               mid->blk.buf[mid->blk.end-1] = 0;
+               return mid->blk.buf;
+       }
 }
 
 /* set or unset the autocommit flag in the server */
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4385,6 +4385,11 @@ isa_block_stream(stream *s)
        return s && ((s->read == bs_read || s->write == bs_write) || (s->read 
== bs2_read || s->write == bs2_write));
 }
 
+int 
+isa_fixed_block_stream(stream *s) {
+       assert(s != NULL);
+       return s && ((s->read == bs_read || s->write == bs_write));
+}
 
 stream *
 block_stream2(stream *s, size_t bufsiz, compression_method comp)
@@ -4443,8 +4448,6 @@ mnstr_read_block(stream *s, void *buf, s
 int
 mnstr_readChr(stream *s, char *val)
 {
-       if (s == NULL || val == NULL)
-               return -1;
        return (int) s->read(s, (void *) val, sizeof(*val), 1);
 }
 
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -29,7 +29,6 @@
 #include <signal.h>
 #include <limits.h>
 
-#define STREAM_DEBUG 1
 /* avoid using "#ifdef WIN32" so that this file does not need our config.h */
 #if defined(_MSC_VER) || defined(__CYGWIN__) || defined(__MINGW32__)
 # ifndef LIBSTREAM
@@ -227,6 +226,7 @@ stream_export buffer *mnstr_get_buffer(s
 stream_export stream *wbstream(stream *s, size_t buflen);
 stream_export stream *block_stream(stream *s);
 stream_export int isa_block_stream(stream *s);
+stream_export int isa_fixed_block_stream(stream *s);
 stream_export stream* bs_stream(stream *s);
 
 typedef enum {
diff --git a/sql/include/sql_query.h b/sql/include/sql_query.h
--- a/sql/include/sql_query.h
+++ b/sql/include/sql_query.h
@@ -16,8 +16,7 @@ typedef enum sql_query_t {
        Q_SCHEMA = 3,
        Q_TRANS = 4,
        Q_PREPARE = 5,
-       Q_BLOCK = 6,
-       Q_BINARYTABLE = 7
+       Q_BLOCK = 6
 } sql_query_t;
 
 #endif /* _SQL_QUERY_H_ */
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to