Changeset: faf73bcc6423 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=faf73bcc6423 Modified Files: clients/mapiclient/mclient.c clients/mapilib/mapi.c clients/mapilib/mapi.h common/stream/stream.c common/stream/stream.h common/utils/mcrypt.c monetdb5/modules/mal/mal_mapi.c sql/backends/monet5/sql_result.c Branch: protocol Log Message:
Add extra --colcomp flag to mserver to specify column-type compression used (if any). diffs (truncated from 356 to 300 lines): diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -2957,6 +2957,7 @@ usage(const char *prog, int xit) fprintf(stderr, " -C version | --compression=type specify compression method {snappy,lz4}\n"); fprintf(stderr, " -P version | --protocol=version specify protocol version {prot9,prot10,prot10compressed}\n"); fprintf(stderr, " -B size | --blocksize=size specify protocol block size (>= %d)\n", BLOCK); + fprintf(stderr, " -c colcomp | --colcomp=type specify column compression type {none,pfor}"); fprintf(stderr, " -H | --history load/save cmdline history (default off)\n"); fprintf(stderr, " -i | --interactive[=tm] interpret `\\' commands on stdin, use time formatting {ms,s,m}\n"); @@ -2997,6 +2998,7 @@ main(int argc, char **argv) char *output = NULL; /* output format as string */ char *protocol = NULL; char *compression = NULL; + char *colcomp = NULL; size_t blocksize = 0; FILE *fp = NULL; int trace = 0; @@ -3024,6 +3026,7 @@ main(int argc, char **argv) {"protocol", 1, 0, 'P'}, {"blocksize", 1, 0, 'B'}, {"compression", 1, 0, 'C'}, + {"colcomp", 1, 0, 'c'}, {"help", 0, 0, '?'}, {"history", 0, 0, 'H'}, {"host", 1, 0, 'h'}, @@ -3169,6 +3172,12 @@ main(int argc, char **argv) free(compression); compression = strdup(optarg); break; + case 'c': + assert(optarg); + if (colcomp != NULL) + free(colcomp); + colcomp = strdup(optarg); + break; case 'B': assert(optarg); blocksize = (size_t) atol(optarg); @@ -3331,11 +3340,20 @@ main(int argc, char **argv) if (protocol) { if (mapi_set_protocol(mid, protocol) != 0) { fprintf(stderr, "%s\n", mapi_error_str(mid)); + exit(1); } } if (compression) { if (mapi_set_compression(mid, compression) != 0) { fprintf(stderr, "%s\n", mapi_error_str(mid)); + exit(1); + } + } + + if (colcomp) { + if (mapi_set_column_compression(mid, colcomp) != 0) { + fprintf(stderr, "%s\n", mapi_error_str(mid)); + exit(1); } } diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -911,6 +911,7 @@ struct MapiStruct { char *motd; /* welcome message from server */ protocol_version protocol; compression_method comp; + column_compression colcomp; size_t blocksize; int trace; /* Trace Mapi interaction */ @@ -1902,6 +1903,7 @@ mapi_new(void) mid->comp = COMPRESSION_SNAPPY; mid->protocol = protauto; + mid->colcomp = COLUMN_COMPRESSION_AUTO; mid->blocksize = 128 * BLOCK; // 1 MB mid->cachelimit = 100; @@ -2641,6 +2643,23 @@ mapi_reconnect(Mapi mid) *hash = '\0'; rest = hash + 1; } +#ifdef HAVE_PFOR + if (strstr(hashes, "PFOR")) { + if (mid->colcomp == COLUMN_COMPRESSION_AUTO) { + mid->colcomp = COLUMN_COMPRESSION_PFOR; + } + } else if (mid->colcomp == COLUMN_COMPRESSION_PFOR) { + mapi_setError(mid, "Client wants PFOR but server does not support it", + "mapi_reconnect", MERROR); + close_connection(mid); + return mid->error; + } +#else + if (mid->colcomp == COLUMN_COMPRESSION_PFOR) { + fprintf(stderr, "Client does not support PFOR compression.\n"); + exit(1); + } +#endif #ifdef HAVE_LIBSNAPPY if (strstr(hashes, "PROT10COMPR")) { // both server and client support compressed protocol 10; use compressed version @@ -2767,7 +2786,7 @@ mapi_reconnect(Mapi mid) if (prot_version == prot10 || prot_version == prot10compressed) { // if we are using protocol 10, we have to send either PROT10/PROT10COMPRESSED to the server // so the server knows which protocol to use - retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:%s:%s:%zu:\n", + retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:%s:%s%s:%zu:\n", #ifdef WORDS_BIGENDIAN "BIG", #else @@ -2777,6 +2796,11 @@ mapi_reconnect(Mapi mid) mid->database == NULL ? "" : mid->database, prot_version == prot10 ? "PROT10" : "PROT10COMPR", comp == COMPRESSION_SNAPPY ? "SNAPPY" : (comp == COMPRESSION_LZ4 ? "LZ4" : ""), +#ifdef HAVE_PFOR + mid->colcomp == COLUMN_COMPRESSION_PFOR ? ",HAVEPFOR" : "", +#else + "", +#endif mid->blocksize); } else { retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:\n", @@ -2824,14 +2848,14 @@ mapi_reconnect(Mapi mid) if (prot_version == prot10compressed) { #ifdef HAVE_LIBSNAPPY - mid->to = block_stream2(bs_stream(mid->to), mid->blocksize, comp); - mid->from = block_stream2(bs_stream(mid->from), mid->blocksize, comp); + mid->to = block_stream2(bs_stream(mid->to), mid->blocksize, comp, mid->colcomp); + mid->from = block_stream2(bs_stream(mid->from), mid->blocksize, comp, mid->colcomp); #else assert(0); #endif } else { - mid->to = block_stream2(bs_stream(mid->to), mid->blocksize, COMPRESSION_NONE); - mid->from = block_stream2(bs_stream(mid->from), mid->blocksize, COMPRESSION_NONE); + mid->to = block_stream2(bs_stream(mid->to), mid->blocksize, COMPRESSION_NONE, mid->colcomp); + mid->from = block_stream2(bs_stream(mid->from), mid->blocksize, COMPRESSION_NONE, mid->colcomp); } // FIXME: this leaks a block stream header @@ -5593,7 +5617,7 @@ mapi_fetch_row(MapiHdl hdl) #endif } else { #ifdef HAVE_PFOR - if (strcasecmp(result->fields[i].columntype, "int") == 0) { + if (hdl->mid->colcomp == COLUMN_COMPRESSION_PFOR && strcasecmp(result->fields[i].columntype, "int") == 0) { lng b = *((lng*) buf); buf += sizeof(lng); lng length = *((lng*)(buf)); @@ -6009,7 +6033,8 @@ MapiMsg mapi_set_protocol(Mapi mid, cons } -MapiMsg mapi_set_compression(Mapi mid, const char* compression) { +MapiMsg +mapi_set_compression(Mapi mid, const char* compression) { if (strcasecmp(compression, "snappy") == 0) { mid->comp = COMPRESSION_SNAPPY; } @@ -6023,11 +6048,26 @@ MapiMsg mapi_set_compression(Mapi mid, c return 0; } -void mapi_set_blocksize(Mapi mid, size_t blocksize) { +void +mapi_set_blocksize(Mapi mid, size_t blocksize) { if (blocksize >= BLOCK) { mid->blocksize = blocksize; } } - - +MapiMsg +mapi_set_column_compression(Mapi mid, const char* colcomp) { + if (strcasecmp(colcomp, "pfor") == 0) { + mid->colcomp = COLUMN_COMPRESSION_PFOR; + } + else if (strcasecmp(colcomp, "none") == 0) { + mid->colcomp = COLUMN_COMPRESSION_NONE; + } else { + mapi_setError(mid, "invalid column compression type", "mapi_set_compression", MERROR); + return -1; + } + + return 0; +} + + diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h --- a/clients/mapilib/mapi.h +++ b/clients/mapilib/mapi.h @@ -235,6 +235,7 @@ mapi_export MapiHdl mapi_get_active(Mapi mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot); mapi_export MapiMsg mapi_set_compression(Mapi mid, const char* compression); +mapi_export MapiMsg mapi_set_column_compression(Mapi mid, const char* colcomp); mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize); #ifdef _MSC_VER diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -3982,6 +3982,7 @@ typedef struct bs2 { size_t bufsiz; size_t readpos; compression_method comp; + column_compression colcomp; char *compbuf; size_t compbufsiz; char buf[1]; /* the buffered data */ @@ -4460,6 +4461,12 @@ bs2_buffer(stream *ss) { return b; } +column_compression +bs2_colcomp(stream *ss) { + bs2 *s = (bs2 *) ss->stream_data.p; + return s->colcomp; +} + int isa_block_stream(stream *s) { @@ -4474,7 +4481,7 @@ isa_fixed_block_stream(stream *s) { } stream * -block_stream2(stream *s, size_t bufsiz, compression_method comp) +block_stream2(stream *s, size_t bufsiz, compression_method comp, column_compression colcomp) { stream *ns; bs2 *b; @@ -4490,6 +4497,7 @@ block_stream2(stream *s, size_t bufsiz, destroy(ns); return NULL; } + b->colcomp = colcomp; /* blocksizes have a fixed little endian byteorder */ #ifdef WORDS_BIGENDIAN s->byteorder = 3412; /* simply != 1234 */ diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -249,11 +249,17 @@ typedef enum { COMPRESSION_UNKNOWN = 255 } compression_method; -stream_export stream *block_stream2(stream *s, size_t bufsiz, compression_method comp); +typedef enum { + COLUMN_COMPRESSION_AUTO = 255, + COLUMN_COMPRESSION_NONE = 0, + COLUMN_COMPRESSION_PFOR = 1 +} column_compression; + +stream_export stream *block_stream2(stream *s, size_t bufsiz, compression_method comp, column_compression colcomp); stream_export void* bs2_getbuf(stream *ss); stream_export void bs2_resetbuf(stream *ss); stream_export buffer bs2_buffer(stream *s); - +column_compression bs2_colcomp(stream *ss); /* read block of data including the end of block marker */ stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize, size_t cnt); diff --git a/common/utils/mcrypt.c b/common/utils/mcrypt.c --- a/common/utils/mcrypt.c +++ b/common/utils/mcrypt.c @@ -34,13 +34,16 @@ mcrypt_getHashAlgorithms(void) * Better/stronger/faster algorithms can be added in the future upon * desire. */ + return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10" #ifdef HAVE_LIBSNAPPY - // the server supports protocol 10 + compression - return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10,PROT10COMPR"); -#else - // the server only supports protocol 10 - return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10"); +// the server supports protocol 10 + compression + ",PROT10COMPR" #endif +#ifdef HAVE_PFOR +// the server supports PFOR + ",PFOR" +#endif + ); } /** diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c --- a/monetdb5/modules/mal/mal_mapi.c +++ b/monetdb5/modules/mal/mal_mapi.c @@ -121,6 +121,7 @@ doChallenge(void *data) ssize_t len = 0; protocol_version protocol = prot9; size_t buflen = BLOCK; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list