Changeset: 8c72389e014a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8c72389e014a
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        common/stream/stream.c
        common/stream/stream.h
        sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:

merge


diffs (truncated from 527 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -806,7 +806,7 @@ CSVrenderer(MapiHdl hdl)
        char *s;
        char *sep = separator;
        int i;
-       char buffer[100000];
+       char buffer[BUFSIZ];
        char *buffer_ptr;
        if (csvheader) {
                fields = mapi_get_field_count(hdl);
@@ -825,7 +825,7 @@ CSVrenderer(MapiHdl hdl)
                        s = mapi_fetch_field(hdl, i);
                        buffer_ptr = stpcpy(buffer_ptr, s);
 
-                       if (i != 0) {
+                       if (i != fields - 1) {
                                *buffer_ptr++ = *sep;
                        }
 
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -5488,48 +5488,80 @@ mapi_fetch_row(MapiHdl hdl)
        struct MapiResultSet *result;
 
        if (hdl->mid->protocol == prot10 || hdl->mid->protocol == 
prot10compressed) {
+#ifdef PROT10_DEBUG
+               char *initbuf;
+#endif
                char* buf;
 
                result = hdl->result;
-               result->rows_read++;
+               // check if we have read the entire result set
                if (result->rows_read >= result->row_count) {
                        hdl->mid->active = NULL;
                        hdl->active = NULL;
+                       bs2_resetbuf(hdl->mid->from);
                        return 0;
                }
-               // do we have any rows in our cache
-               if (result->rows_read > result->tuple_count) {
-                       // read block from socket
+               // if not, check if our cache is empty
+               if (result->rows_read >= result->tuple_count) {
+                       // if our cache is empty, we read data from the socket
                        lng nrows = 0;
+                       // first we write a prompt to the server indicating 
that we want another block of the result set
                        if (!mnstr_writeChr(hdl->mid->to, 42) || 
mnstr_flush(hdl->mid->to)) {
-                               // FIXME: set hdl->mid to something
-
+                               hdl->mid->errorstr = strdup("Failed to write 
confirm message to server.");
+                               hdl->mid->error = 0;
+                               fprintf(stderr, "Failure 2.\n");
                                return hdl->mid->error;
                        }
+
                        bs2_resetbuf(hdl->mid->from); // kinda a bit evil
-                       // this actually triggers the read of the entire block 
from now we operate on buffer
+                       assert(bs2_buffer(hdl->mid->from).pos == 0);
+
+                       // this actually triggers the read of the entire block
+                       // after this point we operate on the buffer
                        if (!mnstr_readLng(hdl->mid->from, &nrows)) {
                                // FIXME: set hdl->mid to something
+                               hdl->mid->errorstr = strdup("Failed to read row 
response");
+                               hdl->mid->error = 0;
+                               fprintf(stderr, "Failure 3.\n");
                                return hdl->mid->error;
                        }
-                       bs2_resetbuf(hdl->mid->from);
-
-//                     fprintf(stderr, "nrows=%llu\n", nrows);
+
+
+                       //bs2_resetbuf(hdl->mid->from);
+
+#ifdef PROT10_DEBUG
+                       fprintf(stderr, "Read block: %llu - %llu (out of %lld, 
nrow=%lld)\n", result->rows_read, result->rows_read + nrows, result->row_count, 
nrows);
+                       initbuf = (char*) bs2_getbuf(hdl->mid->from);
+#endif
+
                        buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng);
 
                        // iterate over cols
                        for (i = 0; i < (size_t) result->fieldcnt; i++) {
+#ifdef PROT10_DEBUG
+                               fprintf(stderr, "Column %zu\n", i);
+#endif
                                result->fields[i].buffer_ptr = buf;
                                if (result->fields[i].columnlength < 0) {
                                        // variable-length column
                                        lng col_len = *((lng*) buf);
+#ifdef PROT10_DEBUG
+                                       fprintf(stderr, "Read lng %lld from 
position %zu\n", col_len, buf - initbuf);
+#endif
                                        assert((size_t) col_len < 
hdl->mid->blocksize && col_len > 0);
                                        result->fields[i].buffer_ptr += 
sizeof(lng);
                                        buf += col_len + sizeof(lng);
+#ifdef PROT10_DEBUG
+                                       fprintf(stderr, "Read strings from 
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
                                } else {
                                        buf += nrows * 
result->fields[i].columnlength;
+#ifdef PROT10_DEBUG
+                                       fprintf(stderr, "Read elements from 
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
                                }
                        }
+                       assert(result->fields[result->fieldcnt - 1].buffer_ptr 
- result->fields[0].buffer_ptr < (long) hdl->mid->blocksize);
                        result->tuple_count += nrows;
                } else {
                        for (i = 0; i < (size_t) result->fieldcnt; i++) {
@@ -5541,6 +5573,7 @@ mapi_fetch_row(MapiHdl hdl)
                                }
                        }
                }
+               result->rows_read++;
                return result->fieldcnt;
        }
 
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4233,34 +4233,41 @@ bs2_read(stream *ss, void *buf, size_t e
                /* store whether this was the last block or not */
                s->nr = blksize & 1;
 
-
-               if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
+               if (s->itotal > 0) {
                        // read everything into the comp buf
                        size_t uncompressed_length = s->bufsiz;
                        size_t m = 0;
+                       char *buf = s->buf;
+                       if (s->comp != COMPRESSION_NONE) {
+                               buf = s->compbuf;
+                       }
                        snappy_status ret;
 
                        while (m < s->itotal) {
                                ssize_t bytes_read = 0;
-                               bytes_read = s->s->read(s->s, s->compbuf + m, 
1, s->itotal - m);
+                               bytes_read = s->s->read(s->s, buf + m, 1, 
s->itotal - m);
                                if (bytes_read <= 0) {
                                        ss->errnr = s->s->errnr;
                                        return -1;
                                }
                                m += bytes_read;
                        }
-                       if ((ret = snappy_uncompress(s->compbuf, s->itotal, 
s->buf, &uncompressed_length)) != SNAPPY_OK) {
-                               ss->errnr = (int) ret;
-                               return -1;
+                       if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+                               if ((ret = snappy_uncompress(s->compbuf, 
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
+                                       ss->errnr = (int) ret;
+                                       return -1;
+                               }
+#else
+               assert(0);
+               return -1;
+#endif
+                       } else {
+                               uncompressed_length = m;
                        }
                        s->itotal = uncompressed_length;
                        s->readpos = 0;
                }
-#else
-               assert(0);
-               return 0;
-#endif
        }
 
        /* Fill the caller's buffer. */
@@ -4269,45 +4276,14 @@ bs2_read(stream *ss, void *buf, size_t e
                /* there is more data waiting in the current block, so
                 * read it */
                n = todo < s->itotal ? todo : s->itotal;
-               if (s->comp == COMPRESSION_SNAPPY) {
-                       memcpy(buf, s->buf + s->readpos, n);
-                       buf = (void *) ((char *) buf + n);
-                       cnt += n;
-                       todo -= n;
-                       s->readpos += n;
-                       s->itotal -= n;
-
-               } else {
-                       while (n > 0) {
-                               ssize_t m = s->s->read(s->s, buf, 1, n);
-
-                               if (m <= 0) {
-                                       ss->errnr = s->s->errnr;
-                                       return -1;
-                               }
-
-
-#ifdef BSTREAM_DEBUG
-                               {
-                                       ssize_t i;
-
-                                       fprintf(stderr, "R2 '%s' %zd \"", 
ss->name, m);
-                                       for (i = 0; i < m; i++)
-                                               if (' ' <= ((char *) buf)[i] && 
((char *) buf)[i] < 127)
-                                                       putc(((char *) buf)[i], 
stderr);
-                                               else
-                                                       fprintf(stderr, 
"\\%03o", ((char *) buf)[i]);
-                                       fprintf(stderr, "\"\n");
-                               }
-#endif
-
-                               buf = (void *) ((char *) buf + m);
-                               cnt += m;
-                               n -= m;
-                               s->itotal -= m;
-                               todo -= m;
-                       }
-               }
+               
+               memcpy(buf, s->buf + s->readpos, n);
+               buf = (void *) ((char *) buf + n);
+               cnt += n;
+               todo -= n;
+               s->readpos += n;
+               s->itotal -= n;
+
                if (s->itotal == 0) {
                        lng blksize = 0;
 
@@ -4339,33 +4315,41 @@ bs2_read(stream *ss, void *buf, size_t e
                        /* store whether this was the last block or not */
                        s->nr = blksize & 1;
 
-                       if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
+                       if (s->itotal > 0) {
                                // read everything into the comp buf
                                size_t uncompressed_length = s->bufsiz;
                                size_t m = 0;
+                               char *buf = s->buf;
+                               if (s->comp != COMPRESSION_NONE) {
+                                       buf = s->compbuf;
+                               }
                                snappy_status ret;
 
                                while (m < s->itotal) {
                                        ssize_t bytes_read = 0;
-                                       bytes_read = s->s->read(s->s, 
s->compbuf + m, 1, s->itotal - m);
+                                       bytes_read = s->s->read(s->s, buf + m, 
1, s->itotal - m);
                                        if (bytes_read <= 0) {
                                                ss->errnr = s->s->errnr;
                                                return -1;
                                        }
                                        m += bytes_read;
                                }
-                               if ((ret = snappy_uncompress(s->compbuf, 
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
-                                       ss->errnr = (int) ret;
-                                       return -1;
+                               if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+                                       if ((ret = 
snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != 
SNAPPY_OK) {
+                                               ss->errnr = (int) ret;
+                                               return -1;
+                                       }
+#else
+                       assert(0);
+                       return -1;
+#endif
+                               } else {
+                                       uncompressed_length = m;
                                }
                                s->itotal = uncompressed_length;
                                s->readpos = 0;
                        }
-#else
-                       assert(0);
-                       return -1;
-#endif
                }
        }
        /* if we got an empty block with the end-of-sequence marker
@@ -4387,7 +4371,6 @@ bs2_getbuf(stream *ss)
        return (void*) s->buf;
 }
 
-
 void
 bs2_resetbuf(stream *ss)
 {
@@ -4398,6 +4381,17 @@ bs2_resetbuf(stream *ss)
        s->readpos = 0;
 }
 
+buffer 
+bs2_buffer(stream *ss) {
+       bs2 *s = (bs2 *) ss->stream_data.p;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to