Changeset: f14ab769cd97 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f14ab769cd97 Modified Files: clients/mapilib/mapi.c common/stream/stream.c common/stream/stream.h sql/include/sql_query.h Branch: protocol Log Message:
Use old version of read_line for protocol v9. Because regular block streams do not buffer reads for some reason. diffs (252 lines): diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -876,7 +876,9 @@ struct MapiRowBuf { struct BlockCache { char *buf; int lim; + int nxt; int end; + int eos; }; @@ -1886,18 +1888,24 @@ mapi_new(void) mid->password = NULL; mid->protocol = protauto; - mid->blocksize = 128*BLOCK; // 1 MB + mid->blocksize = 128 * BLOCK; // 1 MB mid->cachelimit = 100; mid->redircnt = 0; mid->redirmax = 10; mid->tracelog = NULL; - mid->blk.buf = malloc(BLOCK); + mid->blk.eos = 0; + mid->blk.buf = malloc(BLOCK + 1); if (mid->blk.buf == NULL) { mapi_destroy(mid); return NULL; } + mid->blk.buf[BLOCK] = 0; + mid->blk.buf[0] = 0; + mid->blk.nxt = 0; + mid->blk.end = 0; mid->blk.lim = BLOCK; + mid->first = NULL; return mid; @@ -2535,8 +2543,6 @@ mapi_reconnect(Mapi mid) mid->connected = 1; - // FIXME: these need to become snappy streams (framing2) - if (!isa_block_stream(mid->to)) { mid->to = block_stream(mid->to); check_stream(mid, mid->to, mnstr_error(mid->to), "mapi_reconnect", mid->error); @@ -2550,7 +2556,7 @@ mapi_reconnect(Mapi mid) /* consume server challenge */ len = mnstr_read_block(mid->from, buf, 1, BLOCK); - check_stream(mid, mid->from, "Connection terminated while starting", "mapi_reconnect", mid->error); + check_stream(mid, mid->from, "Connection terminated while starting", "mapi_reconnect", (mid->blk.eos = 1, mid->error)); assert(len < BLOCK); buf[len] = 0; @@ -3483,32 +3489,113 @@ mapi_param_store(MapiHdl hdl) static char * read_line(Mapi mid) { - int ret = 0; - if (mid->active == NULL) - return 0; - mid->blk.end = 0; - do { - if ((mid->blk.end + 1) == mid->blk.lim) { - REALLOC(mid->blk.buf, mid->blk.lim + BLOCK); - if (!mid->blk.buf) { + if (isa_fixed_block_stream(mid->from)) { + // fixed blockstreams are not buffered, so we roll our own buffer + char *reply; + char *nl; + char *s; /* from where to search for newline */ + + if (mid->active == NULL) + return 0; + + /* check if we need to read more blocks to get a new line */ + mid->blk.eos = 0; + s = mid->blk.buf + mid->blk.nxt; + while ((nl = strchr(s, '\n')) == NULL && !mid->blk.eos) { + ssize_t len; + + if (mid->blk.lim - mid->blk.end < BLOCK) { + int len; + + len = mid->blk.lim; + if (mid->blk.nxt <= BLOCK) { + /* extend space */ + len += BLOCK; + } + REALLOC(mid->blk.buf, len + 1); + if (mid->blk.nxt > 0) { + memmove(mid->blk.buf, mid->blk.buf + mid->blk.nxt, mid->blk.end - mid->blk.nxt + 1); + mid->blk.end -= mid->blk.nxt; + mid->blk.nxt = 0; + } + mid->blk.lim = len; + } + + s = mid->blk.buf + mid->blk.end; + + /* fetch one more block */ + if (mid->trace == MAPI_TRACE) + printf("fetch next block: start at:%d\n", mid->blk.end); + len = mnstr_read(mid->from, mid->blk.buf + mid->blk.end, 1, BLOCK); + check_stream(mid, mid->from, "Connection terminated during read line", "read_line", (mid->blk.eos = 1, (char *) 0)); + if (mid->tracelog) { + mapi_log_header(mid, "R"); + mnstr_write(mid->tracelog, mid->blk.buf + mid->blk.end, 1, len); + mnstr_flush(mid->tracelog); + } + mid->blk.buf[mid->blk.end + len] = 0; + if (mid->trace == MAPI_TRACE) { + printf("got next block: length:" SSZFMT "\n", len); + printf("text:%s\n", mid->blk.buf + mid->blk.end); + } + if (len == 0) { /* add prompt */ + if (mid->blk.end > mid->blk.nxt) { + /* add fake newline since newline was + * missing from server */ + nl = mid->blk.buf + mid->blk.end; + *nl = '\n'; + mid->blk.end++; + } + len = 2; + mid->blk.buf[mid->blk.end] = PROMPTBEG; + mid->blk.buf[mid->blk.end + 1] = '\n'; + mid->blk.buf[mid->blk.end + 2] = 0; + } + mid->blk.end += (int) len; + } + if (mid->trace == MAPI_TRACE) { + printf("got complete block: \n"); + printf("text:%s\n", mid->blk.buf + mid->blk.nxt); + } + + /* we have a complete line in the buffer */ + assert(nl); + *nl++ = 0; + reply = mid->blk.buf + mid->blk.nxt; + mid->blk.nxt = (int) (nl - mid->blk.buf); + + if (mid->trace == MAPI_TRACE) + printf("read_line:%s\n", reply); + return reply; + } else { + int ret = 0; + if (mid->active == NULL) + return 0; + mid->blk.end = 0; + + do { + if ((mid->blk.end + 1) == mid->blk.lim) { + REALLOC(mid->blk.buf, mid->blk.lim + BLOCK); + if (!mid->blk.buf) { + return 0; + } + mid->blk.lim += BLOCK; + } + /* mid->from is **always** buffered, so no point in rolling an additional cache on top */ + if ((ret = mnstr_readChr(mid->from, mid->blk.buf + mid->blk.end)) != 1) { + if (ret == 0) { + mid->blk.buf[0] = PROMPTBEG; + mid->blk.buf[1] = '\n'; + mid->blk.buf[2] = 0; + return mid->blk.buf; + } return 0; } - mid->blk.lim += BLOCK; - } - /* mid->from is **always** buffered, so no point in rolling an additional cache on top */ - if ((ret = mnstr_readChr(mid->from, mid->blk.buf + mid->blk.end)) != 1) { - if (ret == 0) { - mid->blk.buf[0] = PROMPTBEG; - mid->blk.buf[1] = '\n'; - mid->blk.buf[2] = 0; - return mid->blk.buf; - } - return 0; - } - } while (mid->blk.buf[mid->blk.end++] != '\n'); - - mid->blk.buf[mid->blk.end-1] = 0; - return mid->blk.buf; + } while (mid->blk.buf[mid->blk.end++] != '\n'); + + mid->blk.buf[mid->blk.end-1] = 0; + return mid->blk.buf; + } } /* set or unset the autocommit flag in the server */ diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -4385,6 +4385,11 @@ isa_block_stream(stream *s) return s && ((s->read == bs_read || s->write == bs_write) || (s->read == bs2_read || s->write == bs2_write)); } +int +isa_fixed_block_stream(stream *s) { + assert(s != NULL); + return s && ((s->read == bs_read || s->write == bs_write)); +} stream * block_stream2(stream *s, size_t bufsiz, compression_method comp) @@ -4443,8 +4448,6 @@ mnstr_read_block(stream *s, void *buf, s int mnstr_readChr(stream *s, char *val) { - if (s == NULL || val == NULL) - return -1; return (int) s->read(s, (void *) val, sizeof(*val), 1); } diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -29,7 +29,6 @@ #include <signal.h> #include <limits.h> -#define STREAM_DEBUG 1 /* avoid using "#ifdef WIN32" so that this file does not need our config.h */ #if defined(_MSC_VER) || defined(__CYGWIN__) || defined(__MINGW32__) # ifndef LIBSTREAM @@ -227,6 +226,7 @@ stream_export buffer *mnstr_get_buffer(s stream_export stream *wbstream(stream *s, size_t buflen); stream_export stream *block_stream(stream *s); stream_export int isa_block_stream(stream *s); +stream_export int isa_fixed_block_stream(stream *s); stream_export stream* bs_stream(stream *s); typedef enum { diff --git a/sql/include/sql_query.h b/sql/include/sql_query.h --- a/sql/include/sql_query.h +++ b/sql/include/sql_query.h @@ -16,8 +16,7 @@ typedef enum sql_query_t { Q_SCHEMA = 3, Q_TRANS = 4, Q_PREPARE = 5, - Q_BLOCK = 6, - Q_BINARYTABLE = 7 + Q_BLOCK = 6 } sql_query_t; #endif /* _SQL_QUERY_H_ */ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list