Changeset: b60c634a07cd for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b60c634a07cd Modified Files: clients/mapilib/mapi.c common/stream/stream.c common/stream/stream.h Branch: protocol Log Message:
Additional changes to byte_stream. diffs (truncated from 483 to 300 lines): diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -2781,15 +2781,13 @@ mapi_reconnect(Mapi mid) check_stream(mid, mid->to, "Could not send initial byte sequence", "mapi_reconnect", mid->error); if (prot_version == prot10 || prot_version == prot10compressed) { + bstream *bs_to = (bstream*) mid->to; + bstream *bs_from = (bstream*) mid->from; + printf("Using protocol version %s.\n", prot_version == prot10 ? "PROT10" : "PROT10COMPRESSED"); - // FIXME: destroy block streams and replace with appropriate streams -#if 0 assert(isa_block_stream(mid->to)); assert(isa_block_stream(mid->from)); - bs *bs_to = (bs*) mid->to; - bs *bs_from = (bs*) mid->from; - if (prot_version == prot10compressed) { #ifdef HAVE_LIBSNAPPY mid->to = compressed_stream(bs_to->s, COMPRESSION_SNAPPY); @@ -2798,13 +2796,16 @@ mapi_reconnect(Mapi mid) assert(0); #endif } else { - mid->to = byte_stream((bs_to->s); - mid->from = byte_stream(bs_from->s); + // FIXME: figure out proper stream sizes + mid->to = byte_stream(bs_to->s, 1024000); + mid->from = byte_stream(bs_from->s, 1024000); + } - - close_stream(bs_to); - close_stream(bs_from); -#endif + bs_to->s = NULL; + bs_from->s = NULL; + close_stream((stream*) bs_to); + close_stream((stream*) bs_from); + } /* consume the welcome message from the server */ diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -3885,20 +3885,6 @@ bs_isalive(stream *ss) } static void -bs_close(stream *ss) -{ - bs *s; - - s = (bs *) ss->stream_data.p; - assert(s); - if (s == NULL) - return; - assert(s->s); - if (s->s) - s->s->close(s->s); -} - -static void bs_destroy(stream *ss) { bs *s; @@ -3914,6 +3900,24 @@ bs_destroy(stream *ss) destroy(ss); } + + +static void +bs_close(stream *ss) +{ + bs *s; + + s = (bs *) ss->stream_data.p; + assert(s); + if (s == NULL) + return; + assert(s->s); + if (s->s) + s->s->close(s->s); + bs_destroy(ss); +} + + static void bs_clrerr(stream *s) { @@ -3984,8 +3988,9 @@ bytestream_create(stream *s, size_t bufs return NULL; ns->s = s; ns->bufsize = bufsize; - ns->bufpos = 0; + ns->bufpos = BYTESTREAM_OVERHEAD; ns->buf = malloc(bufsize); + ns->bufend = ns->bufpos; if (ns->buf == NULL) { free(ns); return NULL; @@ -4003,20 +4008,17 @@ bytestream_flush(stream *ss) return -1; assert(ss->access == ST_WRITE); assert(s->bufpos < s->bufsize); - if (ss->access == ST_WRITE) { - if (s->bufpos + BYTESTREAM_OVERHEAD <= s->bufsize) { - // if there is room in the buffer, end the buffer with a 0 so the reader knows that the stream has ended - lng data = 0; - memcpy(s->buf + s->bufpos, &data, sizeof(lng)); - s->bufpos += sizeof(lng); - } - // FIXME: for compressed stream we can compress the buffer here before writing it - if (!s->s->write(s->s, s->buf, 1, s->bufpos)) { - ss->errnr = MNSTR_WRITE_ERROR; - return -1; - } - s->bufpos = 0; + + memcpy(s->buf, &s->bufpos, sizeof(size_t)); + + + // FIXME: for compressed stream we can compress the buffer here before writing it + if (!s->s->write(s->s, s->buf, 1, s->bufpos)) { + ss->errnr = MNSTR_WRITE_ERROR; + return -1; } + s->bufpos = BYTESTREAM_OVERHEAD; + return 0; } @@ -4031,227 +4033,125 @@ bytestream_write(stream *ss, const void return -1; assert(ss->access == ST_WRITE); assert(s->bufpos < s->bufsize); - if (todo + BYTESTREAM_OVERHEAD > s->bufsize) + if (todo > s->bufsize) fprintf(stderr, "Content too big for buffer!\n"); ss->errnr = MNSTR_WRITE_ERROR; return -1; // content does not fit into buffer - if (todo + BYTESTREAM_OVERHEAD > s->bufsize - s->bufpos) { + if (todo > s->bufsize - s->bufpos) { // content does not fit into buffer currently, but will if we flush it first - bytestream_flush(s); + bytestream_flush(ss); } - - // write the length of the package - memcpy(s->buf + s->bufpos, &todo, sizeof(size_t)); - s->bufpos += sizeof(size_t); // write the actual data into the buffer memcpy(s->buf + s->bufpos, buf, todo); s->bufpos += todo; - if (s->bufpos + BYTESTREAM_OVERHEAD >= s->bufsize) { + if (s->bufpos >= s->bufsize) { // we cannot fit any more packages in here - bytestream_flush(s); + bytestream_flush(ss); } return (ssize_t) cnt; } -static ssize_t +ssize_t bytestream_read(stream *ss, void *buf, size_t elmsize, size_t cnt) { bytestream *s; size_t todo = cnt * elmsize; - size_t n; - - s = (bs *) ss->stream_data.p; + lng size; + + s = (bytestream *) ss->stream_data.p; if (s == NULL) return -1; - // FIXME: reading - -// assert(ss->access == ST_READ); - -// lng size = mnstr_readLng(ss, &size); -// s->s->read(s->s, buf, 1, size); -// if (s->itotal == 0) { -// short blksize = 0; - -// if (s->nr) { -// /* We read the closing block but hadn't -// * returned that yet. Return it now, and note -// * that we did by setting s->nr to 0. */ -// assert(s->nr == 1); -// s->nr = 0; -// return 0; -// } - -// assert(s->nr == 0); - -// /* There is nothing more to read in the current block, -// * so read the count for the next block */ -// switch (mnstr_readSht(s->s, &blksize)) { -// case -1: -// ss->errnr = s->s->errnr; -// return -1; -// case 0: -// return 0; -// case 1: -// break; -// } -// if (blksize < 0) { -// ss->errnr = MNSTR_READ_ERROR; -// return -1; -// } -// #ifdef BSTREAM_DEBUG -// fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 1, blksize & 1 ? "true" : "false"); -// fprintf(stderr, "RC %s %d\n", ss->name, blksize); -// #endif -// s->itotal = (unsigned) (blksize >> 1); /* amount readable */ -// /* store whether this was the last block or not */ -// s->nr = blksize & 1; -// s->bytes += s->itotal; -// s->blks++; -// } - -// /* Fill the caller's buffer. */ -// cnt = 0; /* count how much we put into the buffer */ -// while (todo > 0) { -// /* there is more data waiting in the current block, so -// * read it */ -// n = todo < s->itotal ? todo : s->itotal; -// while (n > 0) { -// ssize_t m = s->s->read(s->s, buf, 1, n); - -// if (m <= 0) { -// ss->errnr = s->s->errnr; -// return -1; -// } -// #ifdef BSTREAM_DEBUG -// { -// ssize_t i; - -// fprintf(stderr, "RD %s %zd \"", ss->name, m); -// for (i = 0; i < m; i++) -// if (' ' <= ((char *) buf)[i] && ((char *) buf)[i] < 127) -// putc(((char *) buf)[i], stderr); -// else -// fprintf(stderr, "\\%03o", ((char *) buf)[i]); -// fprintf(stderr, "\"\n"); -// } -// #endif -// buf = (void *) ((char *) buf + m); -// cnt += m; -// n -= m; -// s->itotal -= (int) m; -// todo -= m; -// } - -// if (s->itotal == 0) { -// short blksize = 0; - -// /* The current block has been completely read, -// * so read the count for the next block, only -// * if the previous was not the last one */ -// if (s->nr) -// break; -// switch (mnstr_readSht(s->s, &blksize)) { -// case -1: -// ss->errnr = s->s->errnr; -// return -1; -// case 0: -// return 0; -// case 1: -// break; -// } -// if (blksize < 0) { -// ss->errnr = MNSTR_READ_ERROR; -// return -1; -// } -// #ifdef BSTREAM_DEBUG -// fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 1, blksize & 1 ? "true" : "false"); -// fprintf(stderr, "RC %s %d\n", ss->name, s->nr); -// fprintf(stderr, "RC %s %d\n", ss->name, blksize); -// #endif -// s->itotal = (unsigned) (blksize >> 1); /* amount readable */ -// /* store whether this was the last block or not */ -// s->nr = blksize & 1; -// s->bytes += s->itotal; -// s->blks++; -// } -// } -// /* if we got an empty block with the end-of-sequence marker _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list