Changeset: 728e42b28b38 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=728e42b28b38 Modified Files: common/stream/bs.c common/stream/stream.h common/stream/stream_internal.h Branch: copybinary Log Message:
Add prompting_block_stream diffs (108 lines): diff --git a/common/stream/bs.c b/common/stream/bs.c --- a/common/stream/bs.c +++ b/common/stream/bs.c @@ -44,6 +44,8 @@ joerijoeri(char *func, const char *buf, * indicated by an empty block (i.e. just a count of 0). */ +static ssize_t bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt); + static bs * bs_create(void) { @@ -187,7 +189,10 @@ bs_flush(stream *ss, mnstr_flush_level f } /* Read buffered data and return the number of items read. At the - * flush boundary we will return 0 to indicate the end of a block. + * flush boundary we will return 0 to indicate the end of a block, + * unless prompt and pstream are set. In that case, only return 0 + * after the prompt has been written to pstream and another read + * attempt immediately returns a block boundary. * * Structure field usage: * s - the underlying stream; @@ -199,6 +204,29 @@ bs_flush(stream *ss, mnstr_flush_level f ssize_t bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt) { + ssize_t ret = bs_read_internal(ss, buf, elmsize, cnt); + if (ret != 0) + return ret; + + bs *b = (bs*) ss-> stream_data.p; + if (b->prompt == NULL || b->pstream == NULL) + return 0; + + // before returning the 0 we send the prompt and make another attempt. + if (mnstr_write(b->pstream, b->prompt, strlen(b->prompt), 1) != 1) + return -1; + if (mnstr_flush(b->pstream, MNSTR_FLUSH_DATA) < 0) + return -1; + + // if it succeeds, return that to the client. + // if it's still a block boundary, return that to the client. + // if there's an error, return that to the client. + return bs_read_internal(ss, buf, elmsize, cnt); +} + +static ssize_t +bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt) +{ bs *s; size_t todo = cnt * elmsize; size_t n; @@ -415,3 +443,30 @@ block_stream(stream *s) return ns; } +/* Like block_stream(), but enables prompting. + * This means that on encountering a block boundary, a prompt is sent + * on the prompt_stream and the read is retried. If this does not + * yield another block boundary, the first block boundary is ignored. + * This is used as part of the MAPI protocol. + * + * When the stream is destroyed, prompt is not freed and prompt_stream is + * not destroyed or closed. + */ +stream * +prompting_block_stream(stream *s, const char *prompt, stream *prompt_stream) +{ + if (!s->readonly) { + mnstr_set_open_error(s->name, 0, "prompting_block_stream not implemented for write streams"); + return NULL; + } + + stream *b = block_stream(s); + if (b == NULL) + return NULL; + + bs *bs = b->stream_data.p; + bs->prompt = prompt; + bs->pstream = prompt_stream; + + return b; +} diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -227,6 +227,7 @@ stream_export buffer *mnstr_get_buffer(s stream_export stream *block_stream(stream *s); // mapi.c, mal_mapi.c, client.c, merovingian stream_export bool isa_block_stream(const stream *s); // mapi.c, mal_client.c, remote.c, sql_scenario.c/sqlReader, sql_scan.c stream_export stream *bs_stream(stream *s); // unused +stream_export stream *prompting_block_stream(stream *s, const char *prompt, stream *prompt_stream); stream_export void joeri_role(const char *role); stream_export void joeri_log(const char *fmt, ...) diff --git a/common/stream/stream_internal.h b/common/stream/stream_internal.h --- a/common/stream/stream_internal.h +++ b/common/stream/stream_internal.h @@ -234,6 +234,8 @@ struct bs { unsigned itotal; /* amount available in current read block */ size_t blks; /* read/writen blocks (possibly partial) */ size_t bytes; /* read/writen bytes */ + const char *prompt; /* on eof, first try to send this then try again */ + stream *pstream; /* stream to send prompts on */ char buf[BLOCK]; /* the buffered data (minus the size of * size-short */ }; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list