Changeset: 5059095a5538 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5059095a5538
Modified Files:
        common/stream/stream.c
        common/stream/stream.h
Branch: protocol
Log Message:

Add bytestream (unfinished).


diffs (truncated from 383 to 300 lines):

diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -3970,6 +3970,335 @@ isa_block_stream(stream *s)
 }
 
 /* ------------------------------------------------------------------ */
+/* Byte stream */
+
+
+bytestream *
+bytestream_create(stream *s, size_t bufsize)
+{
+       /* should be a binary stream */
+       bytestream *ns;
+       assert(bufsize > BYTESTREAM_OVERHEAD);
+
+       if ((ns = malloc(sizeof(*ns))) == NULL)
+               return NULL;
+       ns->s = s;
+       ns->bufsize = bufsize;
+       ns->bufpos = 0;
+       ns->buf = malloc(bufsize);
+       if (ns->buf == NULL) {
+               free(ns);
+               return NULL;
+       }
+       return ns;
+}
+
+static int
+bytestream_flush(stream *ss)
+{
+       bytestream *s;
+
+       s = (bytestream *) ss->stream_data.p;
+       if (s == NULL)
+               return -1;
+       assert(ss->access == ST_WRITE);
+       assert(s->bufpos < s->bufsize);
+       if (ss->access == ST_WRITE) {
+               if (s->bufpos + BYTESTREAM_OVERHEAD <= s->bufsize) {
+                       // if there is room in the buffer, end the buffer with 
a 0 so the reader knows that the stream has ended
+                       lng data = 0;
+                       memcpy(s->buf + s->bufpos, &data, sizeof(lng));
+                       s->bufpos += sizeof(lng);
+               }
+               // FIXME: for compressed stream we can compress the buffer here 
before writing it
+               if (!s->s->write(s->s, s->buf, 1, s->bufpos)) {
+                       ss->errnr = MNSTR_WRITE_ERROR;
+                       return -1;
+               }
+               s->bufpos = 0;
+       }
+       return 0;
+}
+
+static ssize_t
+bytestream_write(stream *ss, const void *buf, size_t elmsize, size_t cnt)
+{
+       bytestream *s;
+       size_t todo = cnt * elmsize;
+
+       s = (bytestream *) ss->stream_data.p;
+       if (s == NULL)
+               return -1;
+       assert(ss->access == ST_WRITE);
+       assert(s->bufpos < s->bufsize);
+       if (todo + BYTESTREAM_OVERHEAD > s->bufsize)
+               fprintf(stderr, "Content too big for buffer!\n");
+               ss->errnr = MNSTR_WRITE_ERROR;
+               return -1; // content does not fit into buffer
+
+       if (todo + BYTESTREAM_OVERHEAD > s->bufsize - s->bufpos) {
+               // content does not fit into buffer currently, but will if we 
flush it first
+               bytestream_flush(s);
+       }
+
+
+       // write the length of the package
+       memcpy(s->buf + s->bufpos, &todo, sizeof(size_t));
+       s->bufpos += sizeof(size_t);
+       // write the actual data into the buffer
+       memcpy(s->buf + s->bufpos, buf, todo);
+       s->bufpos += todo;
+
+       if (s->bufpos + BYTESTREAM_OVERHEAD >= s->bufsize) {
+               // we cannot fit any more packages in here
+               bytestream_flush(s);
+       }
+       return (ssize_t) cnt;
+}
+
+
+
+static ssize_t
+bytestream_read(stream *ss, void *buf, size_t elmsize, size_t cnt)
+{
+       bytestream *s;
+       size_t todo = cnt * elmsize;
+       size_t n;
+
+       s = (bs *) ss->stream_data.p;
+       if (s == NULL)
+               return -1;
+
+       // FIXME: reading
+
+//     assert(ss->access == ST_READ);
+
+//     lng size = mnstr_readLng(ss, &size);
+//     s->s->read(s->s, buf, 1, size);
+//     if (s->itotal == 0) {
+//             short blksize = 0;
+
+//             if (s->nr) {
+//                     /* We read the closing block but hadn't
+//                      * returned that yet. Return it now, and note
+//                      * that we did by setting s->nr to 0. */
+//                     assert(s->nr == 1);
+//                     s->nr = 0;
+//                     return 0;
+//             }
+
+//             assert(s->nr == 0);
+
+//             /* There is nothing more to read in the current block,
+//              * so read the count for the next block */
+//             switch (mnstr_readSht(s->s, &blksize)) {
+//             case -1:
+//                     ss->errnr = s->s->errnr;
+//                     return -1;
+//             case 0:
+//                     return 0;
+//             case 1:
+//                     break;
+//             }
+//             if (blksize < 0) {
+//                     ss->errnr = MNSTR_READ_ERROR;
+//                     return -1;
+//             }
+// #ifdef BSTREAM_DEBUG
+//             fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 1, 
blksize & 1 ? "true" : "false");
+//             fprintf(stderr, "RC %s %d\n", ss->name, blksize);
+// #endif
+//             s->itotal = (unsigned) (blksize >> 1);  /* amount readable */
+//             /* store whether this was the last block or not */
+//             s->nr = blksize & 1;
+//             s->bytes += s->itotal;
+//             s->blks++;
+//     }
+
+//     /* Fill the caller's buffer. */
+//     cnt = 0;                /* count how much we put into the buffer */
+//     while (todo > 0) {
+//             /* there is more data waiting in the current block, so
+//              * read it */
+//             n = todo < s->itotal ? todo : s->itotal;
+//             while (n > 0) {
+//                     ssize_t m = s->s->read(s->s, buf, 1, n);
+
+//                     if (m <= 0) {
+//                             ss->errnr = s->s->errnr;
+//                             return -1;
+//                     }
+// #ifdef BSTREAM_DEBUG
+//                     {
+//                             ssize_t i;
+
+//                             fprintf(stderr, "RD %s %zd \"", ss->name, m);
+//                             for (i = 0; i < m; i++)
+//                                     if (' ' <= ((char *) buf)[i] && ((char 
*) buf)[i] < 127)
+//                                             putc(((char *) buf)[i], stderr);
+//                                     else
+//                                             fprintf(stderr, "\\%03o", 
((char *) buf)[i]);
+//                             fprintf(stderr, "\"\n");
+//                     }
+// #endif
+//                     buf = (void *) ((char *) buf + m);
+//                     cnt += m;
+//                     n -= m;
+//                     s->itotal -= (int) m;
+//                     todo -= m;
+//             }
+
+//             if (s->itotal == 0) {
+//                     short blksize = 0;
+
+//                     /* The current block has been completely read,
+//                      * so read the count for the next block, only
+//                      * if the previous was not the last one */
+//                     if (s->nr)
+//                             break;
+//                     switch (mnstr_readSht(s->s, &blksize)) {
+//                     case -1:
+//                             ss->errnr = s->s->errnr;
+//                             return -1;
+//                     case 0:
+//                             return 0;
+//                     case 1:
+//                             break;
+//                     }
+//                     if (blksize < 0) {
+//                             ss->errnr = MNSTR_READ_ERROR;
+//                             return -1;
+//                     }
+// #ifdef BSTREAM_DEBUG
+//                     fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 
1, blksize & 1 ? "true" : "false");
+//                     fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
+//                     fprintf(stderr, "RC %s %d\n", ss->name, blksize);
+// #endif
+//                     s->itotal = (unsigned) (blksize >> 1);  /* amount 
readable */
+//                     /* store whether this was the last block or not */
+//                     s->nr = blksize & 1;
+//                     s->bytes += s->itotal;
+//                     s->blks++;
+//             }
+//     }
+//     /* if we got an empty block with the end-of-sequence marker
+//      * set (low-order bit) we must only return an empty read once,
+//      * so we must squash the flag that we still have to return an
+//      * empty read */
+//     if (todo > 0 && cnt == 0)
+//             s->nr = 0;
+//     return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
+}
+
+// static void
+// bs_update_timeout(stream *ss)
+// {
+//     bs *s;
+
+//     if ((s = ss->stream_data.p) != NULL && s->s) {
+//             s->s->timeout = ss->timeout;
+//             s->s->timeout_func = ss->timeout_func;
+//             if (s->s->update_timeout)
+//                     (*s->s->update_timeout)(s->s);
+//     }
+// }
+
+// static int
+// bs_isalive(stream *ss)
+// {
+//     struct bs *s;
+
+//     if ((s = ss->stream_data.p) != NULL && s->s) {
+//             if (s->s->isalive)
+//                     return (*s->s->isalive)(s->s);
+//             return 1;
+//     }
+//     return 0;
+// }
+
+// static void
+// bs_close(stream *ss)
+// {
+//     bs *s;
+
+//     s = (bs *) ss->stream_data.p;
+//     assert(s);
+//     if (s == NULL)
+//             return;
+//     assert(s->s);
+//     if (s->s)
+//             s->s->close(s->s);
+// }
+
+// static void
+// bs_destroy(stream *ss)
+// {
+//     bs *s;
+
+//     s = (bs *) ss->stream_data.p;
+//     assert(s);
+//     if (s) {
+//             assert(s->s);
+//             if (s->s)
+//                     s->s->destroy(s->s);
+//             free(s);
+//     }
+//     destroy(ss);
+// }
+
+// static void
+// bs_clrerr(stream *s)
+// {
+//     if (s->stream_data.p)
+//             mnstr_clearerr(((bs *) s->stream_data.p)->s);
+// }
+
+
+stream *
+byte_stream(stream *s, size_t bufsize)
+{
+       stream *ns;
+       bytestream *b;
+
+       if (s == NULL)
+               return NULL;
+#ifdef STREAM_DEBUG
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to