Changeset: 5059095a5538 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5059095a5538 Modified Files: common/stream/stream.c common/stream/stream.h Branch: protocol Log Message:
Add bytestream (unfinished). diffs (truncated from 383 to 300 lines): diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -3970,6 +3970,335 @@ isa_block_stream(stream *s) } /* ------------------------------------------------------------------ */ +/* Byte stream */ + + +bytestream * +bytestream_create(stream *s, size_t bufsize) +{ + /* should be a binary stream */ + bytestream *ns; + assert(bufsize > BYTESTREAM_OVERHEAD); + + if ((ns = malloc(sizeof(*ns))) == NULL) + return NULL; + ns->s = s; + ns->bufsize = bufsize; + ns->bufpos = 0; + ns->buf = malloc(bufsize); + if (ns->buf == NULL) { + free(ns); + return NULL; + } + return ns; +} + +static int +bytestream_flush(stream *ss) +{ + bytestream *s; + + s = (bytestream *) ss->stream_data.p; + if (s == NULL) + return -1; + assert(ss->access == ST_WRITE); + assert(s->bufpos < s->bufsize); + if (ss->access == ST_WRITE) { + if (s->bufpos + BYTESTREAM_OVERHEAD <= s->bufsize) { + // if there is room in the buffer, end the buffer with a 0 so the reader knows that the stream has ended + lng data = 0; + memcpy(s->buf + s->bufpos, &data, sizeof(lng)); + s->bufpos += sizeof(lng); + } + // FIXME: for compressed stream we can compress the buffer here before writing it + if (!s->s->write(s->s, s->buf, 1, s->bufpos)) { + ss->errnr = MNSTR_WRITE_ERROR; + return -1; + } + s->bufpos = 0; + } + return 0; +} + +static ssize_t +bytestream_write(stream *ss, const void *buf, size_t elmsize, size_t cnt) +{ + bytestream *s; + size_t todo = cnt * elmsize; + + s = (bytestream *) ss->stream_data.p; + if (s == NULL) + return -1; + assert(ss->access == ST_WRITE); + assert(s->bufpos < s->bufsize); + if (todo + BYTESTREAM_OVERHEAD > s->bufsize) + fprintf(stderr, "Content too big for buffer!\n"); + ss->errnr = MNSTR_WRITE_ERROR; + return -1; // content does not fit into buffer + + if (todo + BYTESTREAM_OVERHEAD > s->bufsize - s->bufpos) { + // content does not fit into buffer currently, but will if we flush it first + bytestream_flush(s); + } + + + // write the length of the package + memcpy(s->buf + s->bufpos, &todo, sizeof(size_t)); + s->bufpos += sizeof(size_t); + // write the actual data into the buffer + memcpy(s->buf + s->bufpos, buf, todo); + s->bufpos += todo; + + if (s->bufpos + BYTESTREAM_OVERHEAD >= s->bufsize) { + // we cannot fit any more packages in here + bytestream_flush(s); + } + return (ssize_t) cnt; +} + + + +static ssize_t +bytestream_read(stream *ss, void *buf, size_t elmsize, size_t cnt) +{ + bytestream *s; + size_t todo = cnt * elmsize; + size_t n; + + s = (bs *) ss->stream_data.p; + if (s == NULL) + return -1; + + // FIXME: reading + +// assert(ss->access == ST_READ); + +// lng size = mnstr_readLng(ss, &size); +// s->s->read(s->s, buf, 1, size); +// if (s->itotal == 0) { +// short blksize = 0; + +// if (s->nr) { +// /* We read the closing block but hadn't +// * returned that yet. Return it now, and note +// * that we did by setting s->nr to 0. */ +// assert(s->nr == 1); +// s->nr = 0; +// return 0; +// } + +// assert(s->nr == 0); + +// /* There is nothing more to read in the current block, +// * so read the count for the next block */ +// switch (mnstr_readSht(s->s, &blksize)) { +// case -1: +// ss->errnr = s->s->errnr; +// return -1; +// case 0: +// return 0; +// case 1: +// break; +// } +// if (blksize < 0) { +// ss->errnr = MNSTR_READ_ERROR; +// return -1; +// } +// #ifdef BSTREAM_DEBUG +// fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 1, blksize & 1 ? "true" : "false"); +// fprintf(stderr, "RC %s %d\n", ss->name, blksize); +// #endif +// s->itotal = (unsigned) (blksize >> 1); /* amount readable */ +// /* store whether this was the last block or not */ +// s->nr = blksize & 1; +// s->bytes += s->itotal; +// s->blks++; +// } + +// /* Fill the caller's buffer. */ +// cnt = 0; /* count how much we put into the buffer */ +// while (todo > 0) { +// /* there is more data waiting in the current block, so +// * read it */ +// n = todo < s->itotal ? todo : s->itotal; +// while (n > 0) { +// ssize_t m = s->s->read(s->s, buf, 1, n); + +// if (m <= 0) { +// ss->errnr = s->s->errnr; +// return -1; +// } +// #ifdef BSTREAM_DEBUG +// { +// ssize_t i; + +// fprintf(stderr, "RD %s %zd \"", ss->name, m); +// for (i = 0; i < m; i++) +// if (' ' <= ((char *) buf)[i] && ((char *) buf)[i] < 127) +// putc(((char *) buf)[i], stderr); +// else +// fprintf(stderr, "\\%03o", ((char *) buf)[i]); +// fprintf(stderr, "\"\n"); +// } +// #endif +// buf = (void *) ((char *) buf + m); +// cnt += m; +// n -= m; +// s->itotal -= (int) m; +// todo -= m; +// } + +// if (s->itotal == 0) { +// short blksize = 0; + +// /* The current block has been completely read, +// * so read the count for the next block, only +// * if the previous was not the last one */ +// if (s->nr) +// break; +// switch (mnstr_readSht(s->s, &blksize)) { +// case -1: +// ss->errnr = s->s->errnr; +// return -1; +// case 0: +// return 0; +// case 1: +// break; +// } +// if (blksize < 0) { +// ss->errnr = MNSTR_READ_ERROR; +// return -1; +// } +// #ifdef BSTREAM_DEBUG +// fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 1, blksize & 1 ? "true" : "false"); +// fprintf(stderr, "RC %s %d\n", ss->name, s->nr); +// fprintf(stderr, "RC %s %d\n", ss->name, blksize); +// #endif +// s->itotal = (unsigned) (blksize >> 1); /* amount readable */ +// /* store whether this was the last block or not */ +// s->nr = blksize & 1; +// s->bytes += s->itotal; +// s->blks++; +// } +// } +// /* if we got an empty block with the end-of-sequence marker +// * set (low-order bit) we must only return an empty read once, +// * so we must squash the flag that we still have to return an +// * empty read */ +// if (todo > 0 && cnt == 0) +// s->nr = 0; +// return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0); +} + +// static void +// bs_update_timeout(stream *ss) +// { +// bs *s; + +// if ((s = ss->stream_data.p) != NULL && s->s) { +// s->s->timeout = ss->timeout; +// s->s->timeout_func = ss->timeout_func; +// if (s->s->update_timeout) +// (*s->s->update_timeout)(s->s); +// } +// } + +// static int +// bs_isalive(stream *ss) +// { +// struct bs *s; + +// if ((s = ss->stream_data.p) != NULL && s->s) { +// if (s->s->isalive) +// return (*s->s->isalive)(s->s); +// return 1; +// } +// return 0; +// } + +// static void +// bs_close(stream *ss) +// { +// bs *s; + +// s = (bs *) ss->stream_data.p; +// assert(s); +// if (s == NULL) +// return; +// assert(s->s); +// if (s->s) +// s->s->close(s->s); +// } + +// static void +// bs_destroy(stream *ss) +// { +// bs *s; + +// s = (bs *) ss->stream_data.p; +// assert(s); +// if (s) { +// assert(s->s); +// if (s->s) +// s->s->destroy(s->s); +// free(s); +// } +// destroy(ss); +// } + +// static void +// bs_clrerr(stream *s) +// { +// if (s->stream_data.p) +// mnstr_clearerr(((bs *) s->stream_data.p)->s); +// } + + +stream * +byte_stream(stream *s, size_t bufsize) +{ + stream *ns; + bytestream *b; + + if (s == NULL) + return NULL; +#ifdef STREAM_DEBUG _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list