Changeset: 728e42b28b38 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=728e42b28b38
Modified Files:
        common/stream/bs.c
        common/stream/stream.h
        common/stream/stream_internal.h
Branch: copybinary
Log Message:

Add prompting_block_stream


diffs (108 lines):

diff --git a/common/stream/bs.c b/common/stream/bs.c
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -44,6 +44,8 @@ joerijoeri(char *func, const char *buf, 
  * indicated by an empty block (i.e. just a count of 0).
  */
 
+static ssize_t bs_read_internal(stream *restrict ss, void *restrict buf, 
size_t elmsize, size_t cnt);
+
 static bs *
 bs_create(void)
 {
@@ -187,7 +189,10 @@ bs_flush(stream *ss, mnstr_flush_level f
 }
 
 /* Read buffered data and return the number of items read.  At the
- * flush boundary we will return 0 to indicate the end of a block.
+ * flush boundary we will return 0 to indicate the end of a block,
+ * unless prompt and pstream are set. In that case, only return 0
+ * after the prompt has been written to pstream and another read
+ * attempt immediately returns a block boundary.
  *
  * Structure field usage:
  * s - the underlying stream;
@@ -199,6 +204,29 @@ bs_flush(stream *ss, mnstr_flush_level f
 ssize_t
 bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
 {
+       ssize_t ret = bs_read_internal(ss, buf, elmsize, cnt);
+       if (ret != 0)
+               return ret;
+
+       bs *b = (bs*) ss-> stream_data.p;
+       if (b->prompt == NULL || b->pstream == NULL)
+               return 0;
+
+       // before returning the 0 we send the prompt and make another attempt.
+       if (mnstr_write(b->pstream, b->prompt, strlen(b->prompt), 1) != 1)
+               return -1;
+       if (mnstr_flush(b->pstream, MNSTR_FLUSH_DATA) < 0)
+               return -1;
+
+       // if it succeeds, return that to the client.
+       // if it's still a block boundary, return that to the client.
+       // if there's an error, return that to the client.
+       return bs_read_internal(ss, buf, elmsize, cnt);
+}
+
+static ssize_t
+bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, 
size_t cnt)
+{
        bs *s;
        size_t todo = cnt * elmsize;
        size_t n;
@@ -415,3 +443,30 @@ block_stream(stream *s)
        return ns;
 }
 
+/* Like block_stream(), but enables prompting.
+ * This means that on encountering a block boundary, a prompt is sent
+ * on the prompt_stream and the read is retried. If this does not
+ * yield another block boundary, the first block boundary is ignored.
+ * This is used as part of the MAPI protocol.
+ *
+ * When the stream is destroyed, prompt is not freed and prompt_stream is
+ * not destroyed or closed.
+ */
+stream *
+prompting_block_stream(stream *s, const char *prompt, stream *prompt_stream)
+{
+       if (!s->readonly) {
+               mnstr_set_open_error(s->name, 0, "prompting_block_stream not 
implemented for write streams");
+               return NULL;
+       }
+
+       stream *b = block_stream(s);
+       if (b == NULL)
+               return NULL;
+
+       bs *bs = b->stream_data.p;
+       bs->prompt = prompt;
+       bs->pstream = prompt_stream;
+
+       return b;
+}
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -227,6 +227,7 @@ stream_export buffer *mnstr_get_buffer(s
 stream_export stream *block_stream(stream *s); // mapi.c, mal_mapi.c, 
client.c, merovingian
 stream_export bool isa_block_stream(const stream *s); // mapi.c, mal_client.c, 
remote.c, sql_scenario.c/sqlReader, sql_scan.c
 stream_export stream *bs_stream(stream *s); // unused
+stream_export stream *prompting_block_stream(stream *s, const char *prompt, 
stream *prompt_stream);
 
 stream_export void joeri_role(const char *role);
 stream_export void joeri_log(const char *fmt, ...)
diff --git a/common/stream/stream_internal.h b/common/stream/stream_internal.h
--- a/common/stream/stream_internal.h
+++ b/common/stream/stream_internal.h
@@ -234,6 +234,8 @@ struct bs {
        unsigned itotal;        /* amount available in current read block */
        size_t blks;            /* read/writen blocks (possibly partial) */
        size_t bytes;           /* read/writen bytes */
+       const char *prompt;     /* on eof, first try to send this then try 
again */
+       stream *pstream;        /* stream to send prompts on */
        char buf[BLOCK];        /* the buffered data (minus the size of
                                 * size-short */
 };
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to