Changeset: 5210c24ef644 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5210c24ef644 Modified Files: clients/mapiclient/mclient.c clients/mapilib/Makefile.ag clients/mapilib/mapi.c common/stream/mhapi.proto common/stream/stream.c common/stream/stream.h monetdb5/modules/mal/mal_mapi.c sql/backends/monet5/Makefile.ag sql/backends/monet5/sql_result.c Branch: protocol Log Message:
protobuf result sets first part diffs (truncated from 362 to 300 lines): diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -2957,7 +2957,7 @@ usage(const char *prog, int xit) fprintf(stderr, " -C version | --compression=type specify compression method {snappy,lz4}\n"); fprintf(stderr, " -P version | --protocol=version specify protocol version {prot9,prot10,prot10compressed}\n"); fprintf(stderr, " -B size | --blocksize=size specify protocol block size (>= %d)\n", BLOCK); - fprintf(stderr, " -c colcomp | --colcomp=type specify column compression type {none,pfor}"); + fprintf(stderr, " -c colcomp | --colcomp=type specify column compression type {none,pfor,protobuf}"); fprintf(stderr, " -H | --history load/save cmdline history (default off)\n"); fprintf(stderr, " -i | --interactive[=tm] interpret `\\' commands on stdin, use time formatting {ms,s,m}\n"); diff --git a/clients/mapilib/Makefile.ag b/clients/mapilib/Makefile.ag --- a/clients/mapilib/Makefile.ag +++ b/clients/mapilib/Makefile.ag @@ -13,8 +13,9 @@ lib_mapi = { VERSION = $(MAPI_VERSION) SOURCES = mapi.c mapi.rc LIBS = $(SOCKET_LIBS) ../../common/stream/libstream \ + ../../common/stream/libstream_protobuf \ ../../common/options/libmoptions \ - ../../common/utils/libmcrypt $(openssl_LIBS) $(pfor_LIBS) + ../../common/utils/libmcrypt $(openssl_LIBS) $(pfor_LIBS) $(protobuf_LIBS) } headers_mapi = { diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -913,6 +913,7 @@ struct MapiStruct { compression_method comp; column_compression colcomp; size_t blocksize; + void* protobuf_res; int trace; /* Trace Mapi interaction */ int auto_commit; @@ -952,6 +953,8 @@ struct MapiResultSet { struct MapiRowBuf cache; int commentonly; /* only comments seen so far */ mapi_int64 rows_read; + mapi_int64 cur_row; + }; struct MapiStatement { @@ -1905,6 +1908,7 @@ mapi_new(void) mid->protocol = protauto; mid->colcomp = COLUMN_COMPRESSION_AUTO; mid->blocksize = 128 * BLOCK; // 1 MB + mid->protobuf_res = NULL; mid->cachelimit = 100; mid->redircnt = 0; @@ -2796,11 +2800,7 @@ mapi_reconnect(Mapi mid) mid->database == NULL ? "" : mid->database, prot_version == prot10 ? "PROT10" : "PROT10COMPR", comp == COMPRESSION_SNAPPY ? "SNAPPY" : (comp == COMPRESSION_LZ4 ? "LZ4" : ""), -#ifdef HAVE_PFOR - mid->colcomp == COLUMN_COMPRESSION_PFOR ? ",HAVEPFOR" : "", -#else - "", -#endif + mid->colcomp == COLUMN_COMPRESSION_PFOR ? ",HAVEPFOR" : (mid->colcomp == COLUMN_COMPRESSION_PROTOBUF ? ",PROTOBUF" : ""), mid->blocksize); } else { retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:\n", @@ -4236,7 +4236,7 @@ read_into_cache(MapiHdl hdl, int lookahe result->querytype = Q_TABLE; result->tuple_count = 0; result->rows_read = 0; - + result->cur_row = 0; for (i = 0; i < nr_cols; i++) { lng col_info_length; @@ -5533,6 +5533,10 @@ mapi_split_line(MapiHdl hdl) return n; } +#ifdef HAVE_LIBPROTOBUF +#include <mhapi.pb-c.h> +#endif + int mapi_fetch_row(MapiHdl hdl) { @@ -5558,6 +5562,8 @@ mapi_fetch_row(MapiHdl hdl) if (result->rows_read >= result->tuple_count) { // if our cache is empty, we read data from the socket lng nrows = 0; + result->cur_row = 1; + // first we write a prompt to the server indicating that we want another block of the result set #ifdef CONTINUATION_MESSAGE @@ -5569,8 +5575,39 @@ mapi_fetch_row(MapiHdl hdl) } #endif + + if (hdl->mid->colcomp == COLUMN_COMPRESSION_PROTOBUF) { + buffer buf = bs2_buffer(hdl->mid->from); +#ifndef HAVE_LIBPROTOBUF + // TODO: complain +#else + Mhapi__QueryResult *res = mhapi__query_result__unpack(NULL, buf.pos, (const uint8_t *) buf.buf); + assert(res->row_count <= result->row_count); + assert(res->n_columns == (size_t) result->fieldcnt); + + for (i = 0; i < (size_t) result->fieldcnt; i++) { + Mhapi__QueryResult__Column *c = res->columns[i]; + if (c->n_double_values > 0) { + result->fields[i].buffer_ptr = (char*) c->double_values; + } else if (c->n_int32_values > 0) { + result->fields[i].buffer_ptr = (char*) c->int32_values; + } else if (c->n_int64_values > 0) { + result->fields[i].buffer_ptr = (char*) c->int64_values; + }else if (c->n_string_values > 0) { + result->fields[i].buffer_ptr = (char*) c->string_values[0]; + } + } + result->tuple_count += res->row_count; + result->rows_read++; + if(hdl->mid->protobuf_res) { + free(hdl->mid->protobuf_res); + } + hdl->mid->protobuf_res = (void*) res; + return result->fieldcnt; +#endif + } + bs2_resetbuf(hdl->mid->from); // kinda a bit evil - assert(bs2_buffer(hdl->mid->from).pos == 0); // this actually triggers the read of the entire block // after this point we operate on the buffer @@ -5622,12 +5659,18 @@ mapi_fetch_row(MapiHdl hdl) for (i = 0; i < (size_t) result->fieldcnt; i++) { if (result->fields[i].columnlength < 0) { // variable-length column - result->fields[i].buffer_ptr += strlen(result->fields[i].buffer_ptr) + 1; + + if (hdl->mid->protobuf_res) { + result->fields[i].buffer_ptr = ((Mhapi__QueryResult*) hdl->mid->protobuf_res)->columns[i]->string_values[result->cur_row]; + } else { + result->fields[i].buffer_ptr += strlen(result->fields[i].buffer_ptr) + 1; + } } else { result->fields[i].buffer_ptr += result->fields[i].columnlength; } } } + result->cur_row++; result->rows_read++; return result->fieldcnt; } @@ -6038,6 +6081,8 @@ mapi_set_column_compression(Mapi mid, co } else if (strcasecmp(colcomp, "none") == 0) { mid->colcomp = COLUMN_COMPRESSION_NONE; + } else if (strcasecmp(colcomp, "protobuf") == 0) { + mid->colcomp = COLUMN_COMPRESSION_PROTOBUF; } else { mapi_setError(mid, "invalid column compression type", "mapi_set_compression", MERROR); return -1; diff --git a/common/stream/mhapi.proto b/common/stream/mhapi.proto --- a/common/stream/mhapi.proto +++ b/common/stream/mhapi.proto @@ -1,33 +1,34 @@ package mhapi; message QueryResult { - required int32 result_id = 1; +// required int32 result_id = 1; required int64 row_count = 2; - required int64 col_count = 3; +// required int64 col_count = 3; message Column { - required string table_name = 1; - required string column_name = 2; - required string type_name = 3; +// required string table_name = 1; +// required string column_name = 2; +// required string type_name = 3; repeated string string_values = 4; repeated int64 int64_values = 5 [packed=true]; - repeated int32 int32_value = 6 [packed=true]; + repeated int32 int32_values = 6 [packed=true]; repeated double double_values = 7 [packed=true]; } message ColumnUnpacked { - required string table_name = 1; - required string column_name = 2; - required string type_name = 3; +// required string table_name = 1; +// required string column_name = 2; +// required string type_name = 3; repeated string string_values = 4; - repeated int64 int64_values = 5; - repeated int32 int32_value = 6; - repeated double double_values = 7; + repeated int64 int64_values = 5 [packed=false]; + repeated int32 int32_values = 6 [packed=false]; + repeated double double_values = 7 [packed=false]; } repeated Column columns = 4; repeated ColumnUnpacked columns_unpacked = 5; } + diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -4461,6 +4461,12 @@ bs2_buffer(stream *ss) { return b; } +void bs2_setpos(stream *ss, size_t pos) { + bs2 *s = (bs2 *) ss->stream_data.p; + assert(pos < s->bufsiz); + s->nr = pos; +} + column_compression bs2_colcomp(stream *ss) { bs2 *s = (bs2 *) ss->stream_data.p; diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -261,6 +261,8 @@ stream_export void* bs2_getbuf(stream *s stream_export void bs2_resetbuf(stream *ss); stream_export buffer bs2_buffer(stream *s); column_compression bs2_colcomp(stream *ss); +stream_export void bs2_setpos(stream *ss, size_t pos); + /* read block of data including the end of block marker */ stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize, size_t cnt); diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c --- a/monetdb5/modules/mal/mal_mapi.c +++ b/monetdb5/modules/mal/mal_mapi.c @@ -190,6 +190,9 @@ doChallenge(void *data) if (strstr(buf, "PFOR")) { colcomp = COLUMN_COMPRESSION_PFOR; } + if (strstr(buf, "PROTOBUF")) { + colcomp = COLUMN_COMPRESSION_PROTOBUF; + } // FIXME: this leaks a block stream header if (buflen < BLOCK) { diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag --- a/sql/backends/monet5/Makefile.ag +++ b/sql/backends/monet5/Makefile.ag @@ -53,8 +53,10 @@ lib__sql = { ../../../monetdb5/tools/libmonetdb5 \ ../../../gdk/libbat \ ../../../common/stream/libstream \ + ../../../common/stream/libstream_protobuf \ ../../../common/utils/libmcrypt \ $(PTHREAD_LIBS) \ + $(protobuf_LIBS) \ $(openssl_LIBS) $(MATH_LIBS) $(pfor_LIBS) } diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c --- a/sql/backends/monet5/sql_result.c +++ b/sql/backends/monet5/sql_result.c @@ -1867,6 +1867,10 @@ static int write_str_term(stream* s, str return mnstr_writeStr(s, val) && mnstr_writeBte(s, 0); } +#ifdef HAVE_LIBPROTOBUF +#include <mhapi.pb-c.h> +#endif + static int mvc_export_resultset_prot10(res_table* t, stream* s, stream *c, size_t bsize) { BAT *order; lng count; @@ -2085,6 +2089,76 @@ static int mvc_export_resultset_prot10(r assert(bs2_buffer(s).pos == 0); + if (colcomp == COLUMN_COMPRESSION_PROTOBUF) { +#ifndef HAVE_LIBPROTOBUF + fprintf(stderr, "Can't use protobuf stuff.\n"); + goto cleanup; +#else + Mhapi__QueryResult msg; + mhapi__query_result__init(&msg); + msg.row_count = (int64_t)(row - srow); + msg.n_columns = t->nr_cols; + msg.columns = malloc(sizeof(Mhapi__QueryResult__Column)*t->nr_cols); + if (!msg.columns) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list