Changeset: 8c72389e014a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8c72389e014a Modified Files: clients/mapiclient/mclient.c clients/mapilib/mapi.c common/stream/stream.c common/stream/stream.h sql/backends/monet5/sql_result.c Branch: protocol Log Message:
merge diffs (truncated from 527 to 300 lines): diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -806,7 +806,7 @@ CSVrenderer(MapiHdl hdl) char *s; char *sep = separator; int i; - char buffer[100000]; + char buffer[BUFSIZ]; char *buffer_ptr; if (csvheader) { fields = mapi_get_field_count(hdl); @@ -825,7 +825,7 @@ CSVrenderer(MapiHdl hdl) s = mapi_fetch_field(hdl, i); buffer_ptr = stpcpy(buffer_ptr, s); - if (i != 0) { + if (i != fields - 1) { *buffer_ptr++ = *sep; } diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -5488,48 +5488,80 @@ mapi_fetch_row(MapiHdl hdl) struct MapiResultSet *result; if (hdl->mid->protocol == prot10 || hdl->mid->protocol == prot10compressed) { +#ifdef PROT10_DEBUG + char *initbuf; +#endif char* buf; result = hdl->result; - result->rows_read++; + // check if we have read the entire result set if (result->rows_read >= result->row_count) { hdl->mid->active = NULL; hdl->active = NULL; + bs2_resetbuf(hdl->mid->from); return 0; } - // do we have any rows in our cache - if (result->rows_read > result->tuple_count) { - // read block from socket + // if not, check if our cache is empty + if (result->rows_read >= result->tuple_count) { + // if our cache is empty, we read data from the socket lng nrows = 0; + // first we write a prompt to the server indicating that we want another block of the result set if (!mnstr_writeChr(hdl->mid->to, 42) || mnstr_flush(hdl->mid->to)) { - // FIXME: set hdl->mid to something - + hdl->mid->errorstr = strdup("Failed to write confirm message to server."); + hdl->mid->error = 0; + fprintf(stderr, "Failure 2.\n"); return hdl->mid->error; } + bs2_resetbuf(hdl->mid->from); // kinda a bit evil - // this actually triggers the read of the entire block from now we operate on buffer + assert(bs2_buffer(hdl->mid->from).pos == 0); + + // this actually triggers the read of the entire block + // after this point we operate on the buffer if (!mnstr_readLng(hdl->mid->from, &nrows)) { // FIXME: set hdl->mid to something + hdl->mid->errorstr = strdup("Failed to read row response"); + hdl->mid->error = 0; + fprintf(stderr, "Failure 3.\n"); return hdl->mid->error; } - bs2_resetbuf(hdl->mid->from); - -// fprintf(stderr, "nrows=%llu\n", nrows); + + + //bs2_resetbuf(hdl->mid->from); + +#ifdef PROT10_DEBUG + fprintf(stderr, "Read block: %llu - %llu (out of %lld, nrow=%lld)\n", result->rows_read, result->rows_read + nrows, result->row_count, nrows); + initbuf = (char*) bs2_getbuf(hdl->mid->from); +#endif + buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng); // iterate over cols for (i = 0; i < (size_t) result->fieldcnt; i++) { +#ifdef PROT10_DEBUG + fprintf(stderr, "Column %zu\n", i); +#endif result->fields[i].buffer_ptr = buf; if (result->fields[i].columnlength < 0) { // variable-length column lng col_len = *((lng*) buf); +#ifdef PROT10_DEBUG + fprintf(stderr, "Read lng %lld from position %zu\n", col_len, buf - initbuf); +#endif assert((size_t) col_len < hdl->mid->blocksize && col_len > 0); result->fields[i].buffer_ptr += sizeof(lng); buf += col_len + sizeof(lng); +#ifdef PROT10_DEBUG + fprintf(stderr, "Read strings from position %zu\n", result->fields[i].buffer_ptr - initbuf); +#endif } else { buf += nrows * result->fields[i].columnlength; +#ifdef PROT10_DEBUG + fprintf(stderr, "Read elements from position %zu\n", result->fields[i].buffer_ptr - initbuf); +#endif } } + assert(result->fields[result->fieldcnt - 1].buffer_ptr - result->fields[0].buffer_ptr < (long) hdl->mid->blocksize); result->tuple_count += nrows; } else { for (i = 0; i < (size_t) result->fieldcnt; i++) { @@ -5541,6 +5573,7 @@ mapi_fetch_row(MapiHdl hdl) } } } + result->rows_read++; return result->fieldcnt; } diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -4233,34 +4233,41 @@ bs2_read(stream *ss, void *buf, size_t e /* store whether this was the last block or not */ s->nr = blksize & 1; - - if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) { -#ifdef HAVE_LIBSNAPPY + if (s->itotal > 0) { // read everything into the comp buf size_t uncompressed_length = s->bufsiz; size_t m = 0; + char *buf = s->buf; + if (s->comp != COMPRESSION_NONE) { + buf = s->compbuf; + } snappy_status ret; while (m < s->itotal) { ssize_t bytes_read = 0; - bytes_read = s->s->read(s->s, s->compbuf + m, 1, s->itotal - m); + bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m); if (bytes_read <= 0) { ss->errnr = s->s->errnr; return -1; } m += bytes_read; } - if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { - ss->errnr = (int) ret; - return -1; + if (s->comp == COMPRESSION_SNAPPY) { +#ifdef HAVE_LIBSNAPPY + if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { + ss->errnr = (int) ret; + return -1; + } +#else + assert(0); + return -1; +#endif + } else { + uncompressed_length = m; } s->itotal = uncompressed_length; s->readpos = 0; } -#else - assert(0); - return 0; -#endif } /* Fill the caller's buffer. */ @@ -4269,45 +4276,14 @@ bs2_read(stream *ss, void *buf, size_t e /* there is more data waiting in the current block, so * read it */ n = todo < s->itotal ? todo : s->itotal; - if (s->comp == COMPRESSION_SNAPPY) { - memcpy(buf, s->buf + s->readpos, n); - buf = (void *) ((char *) buf + n); - cnt += n; - todo -= n; - s->readpos += n; - s->itotal -= n; - - } else { - while (n > 0) { - ssize_t m = s->s->read(s->s, buf, 1, n); - - if (m <= 0) { - ss->errnr = s->s->errnr; - return -1; - } - - -#ifdef BSTREAM_DEBUG - { - ssize_t i; - - fprintf(stderr, "R2 '%s' %zd \"", ss->name, m); - for (i = 0; i < m; i++) - if (' ' <= ((char *) buf)[i] && ((char *) buf)[i] < 127) - putc(((char *) buf)[i], stderr); - else - fprintf(stderr, "\\%03o", ((char *) buf)[i]); - fprintf(stderr, "\"\n"); - } -#endif - - buf = (void *) ((char *) buf + m); - cnt += m; - n -= m; - s->itotal -= m; - todo -= m; - } - } + + memcpy(buf, s->buf + s->readpos, n); + buf = (void *) ((char *) buf + n); + cnt += n; + todo -= n; + s->readpos += n; + s->itotal -= n; + if (s->itotal == 0) { lng blksize = 0; @@ -4339,33 +4315,41 @@ bs2_read(stream *ss, void *buf, size_t e /* store whether this was the last block or not */ s->nr = blksize & 1; - if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) { -#ifdef HAVE_LIBSNAPPY + if (s->itotal > 0) { // read everything into the comp buf size_t uncompressed_length = s->bufsiz; size_t m = 0; + char *buf = s->buf; + if (s->comp != COMPRESSION_NONE) { + buf = s->compbuf; + } snappy_status ret; while (m < s->itotal) { ssize_t bytes_read = 0; - bytes_read = s->s->read(s->s, s->compbuf + m, 1, s->itotal - m); + bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m); if (bytes_read <= 0) { ss->errnr = s->s->errnr; return -1; } m += bytes_read; } - if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { - ss->errnr = (int) ret; - return -1; + if (s->comp == COMPRESSION_SNAPPY) { +#ifdef HAVE_LIBSNAPPY + if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { + ss->errnr = (int) ret; + return -1; + } +#else + assert(0); + return -1; +#endif + } else { + uncompressed_length = m; } s->itotal = uncompressed_length; s->readpos = 0; } -#else - assert(0); - return -1; -#endif } } /* if we got an empty block with the end-of-sequence marker @@ -4387,7 +4371,6 @@ bs2_getbuf(stream *ss) return (void*) s->buf; } - void bs2_resetbuf(stream *ss) { @@ -4398,6 +4381,17 @@ bs2_resetbuf(stream *ss) s->readpos = 0; } +buffer +bs2_buffer(stream *ss) { + bs2 *s = (bs2 *) ss->stream_data.p; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list