Changeset: faf73bcc6423 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=faf73bcc6423
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        clients/mapilib/mapi.h
        common/stream/stream.c
        common/stream/stream.h
        common/utils/mcrypt.c
        monetdb5/modules/mal/mal_mapi.c
        sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:

Add extra --colcomp flag to mserver to specify column-type compression used (if 
any).


diffs (truncated from 356 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2957,6 +2957,7 @@ usage(const char *prog, int xit)
        fprintf(stderr, " -C version  | --compression=type specify compression 
method {snappy,lz4}\n");
        fprintf(stderr, " -P version  | --protocol=version specify protocol 
version {prot9,prot10,prot10compressed}\n");
        fprintf(stderr, " -B size     | --blocksize=size   specify protocol 
block size (>= %d)\n", BLOCK);
+       fprintf(stderr, " -c colcomp  | --colcomp=type     specify column 
compression type {none,pfor}");
 
        fprintf(stderr, " -H          | --history          load/save cmdline 
history (default off)\n");
        fprintf(stderr, " -i          | --interactive[=tm] interpret `\\' 
commands on stdin, use time formatting {ms,s,m}\n");
@@ -2997,6 +2998,7 @@ main(int argc, char **argv)
        char *output = NULL;    /* output format as string */
        char *protocol = NULL;
        char *compression = NULL;
+       char *colcomp = NULL;
        size_t blocksize = 0;
        FILE *fp = NULL;
        int trace = 0;
@@ -3024,6 +3026,7 @@ main(int argc, char **argv)
                {"protocol", 1, 0, 'P'},
                {"blocksize", 1, 0, 'B'},
                {"compression", 1, 0, 'C'},
+               {"colcomp", 1, 0, 'c'},
                {"help", 0, 0, '?'},
                {"history", 0, 0, 'H'},
                {"host", 1, 0, 'h'},
@@ -3169,6 +3172,12 @@ main(int argc, char **argv)
                                free(compression);
                        compression = strdup(optarg);
                        break;
+               case 'c':
+                       assert(optarg);
+                       if (colcomp != NULL)
+                               free(colcomp);
+                       colcomp = strdup(optarg);
+                       break;
                case 'B':
                        assert(optarg);
                        blocksize = (size_t) atol(optarg);
@@ -3331,11 +3340,20 @@ main(int argc, char **argv)
        if (protocol) {
                if (mapi_set_protocol(mid, protocol) != 0) {
                        fprintf(stderr, "%s\n", mapi_error_str(mid));
+                       exit(1);
                }
        }
        if (compression) {
                if (mapi_set_compression(mid, compression) != 0) {
                        fprintf(stderr, "%s\n", mapi_error_str(mid));
+                       exit(1);
+               }
+       }
+
+       if (colcomp) {
+               if (mapi_set_column_compression(mid, colcomp) != 0) {
+                       fprintf(stderr, "%s\n", mapi_error_str(mid));
+                       exit(1);
                }
        }
 
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -911,6 +911,7 @@ struct MapiStruct {
        char *motd;             /* welcome message from server */
        protocol_version protocol;
        compression_method comp;
+       column_compression colcomp;
        size_t blocksize;
 
        int trace;              /* Trace Mapi interaction */
@@ -1902,6 +1903,7 @@ mapi_new(void)
 
        mid->comp = COMPRESSION_SNAPPY;
        mid->protocol = protauto;
+       mid->colcomp = COLUMN_COMPRESSION_AUTO;
        mid->blocksize = 128 * BLOCK; // 1 MB
 
        mid->cachelimit = 100;
@@ -2641,6 +2643,23 @@ mapi_reconnect(Mapi mid)
                        *hash = '\0';
                        rest = hash + 1;
                }
+#ifdef HAVE_PFOR
+               if (strstr(hashes, "PFOR")) {
+                       if (mid->colcomp == COLUMN_COMPRESSION_AUTO) {
+                               mid->colcomp = COLUMN_COMPRESSION_PFOR;
+                       }
+               } else if (mid->colcomp == COLUMN_COMPRESSION_PFOR) {
+                       mapi_setError(mid, "Client wants PFOR but server does 
not support it",
+                                       "mapi_reconnect", MERROR);
+                       close_connection(mid);
+                       return mid->error;
+               }
+#else
+               if (mid->colcomp == COLUMN_COMPRESSION_PFOR) {
+                       fprintf(stderr, "Client does not support PFOR 
compression.\n");
+                       exit(1);
+               }
+#endif
 #ifdef HAVE_LIBSNAPPY
                if (strstr(hashes, "PROT10COMPR")) {
                        // both server and client support compressed protocol 
10; use compressed version 
@@ -2767,7 +2786,7 @@ mapi_reconnect(Mapi mid)
                        if (prot_version == prot10 || prot_version == 
prot10compressed) {
                                // if we are using protocol 10, we have to send 
either PROT10/PROT10COMPRESSED to the server
                                // so the server knows which protocol to use
-                               retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:%s:%s:%zu:\n",
+                               retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:%s:%s%s:%zu:\n",
        #ifdef WORDS_BIGENDIAN
                                     "BIG",
        #else
@@ -2777,6 +2796,11 @@ mapi_reconnect(Mapi mid)
                                     mid->database == NULL ? "" : mid->database,
                                     prot_version == prot10 ? "PROT10" : 
"PROT10COMPR",
                                     comp == COMPRESSION_SNAPPY ? "SNAPPY" : 
(comp == COMPRESSION_LZ4 ? "LZ4" : ""),
+#ifdef HAVE_PFOR
+                                    mid->colcomp == COLUMN_COMPRESSION_PFOR ? 
",HAVEPFOR" : "",
+#else
+                                    "",
+#endif
                                    mid->blocksize);
                        } else {
                                retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:\n",
@@ -2824,14 +2848,14 @@ mapi_reconnect(Mapi mid)
 
                if (prot_version == prot10compressed) {
 #ifdef HAVE_LIBSNAPPY
-                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, comp);
-                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, comp);
+                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, comp, mid->colcomp);
+                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, comp, mid->colcomp);
 #else
                        assert(0);
 #endif
                } else {
-                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_NONE);
-                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_NONE);
+                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_NONE, mid->colcomp);
+                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_NONE, mid->colcomp);
                }
 
                // FIXME: this leaks a block stream header
@@ -5593,7 +5617,7 @@ mapi_fetch_row(MapiHdl hdl)
 #endif
                                } else {
 #ifdef HAVE_PFOR
-                                       if 
(strcasecmp(result->fields[i].columntype, "int") == 0) {
+                                       if (hdl->mid->colcomp == 
COLUMN_COMPRESSION_PFOR && strcasecmp(result->fields[i].columntype, "int") == 
0) {
                                                lng b = *((lng*) buf);
                                                buf += sizeof(lng);
                                                lng length = *((lng*)(buf));
@@ -6009,7 +6033,8 @@ MapiMsg mapi_set_protocol(Mapi mid, cons
 }
 
 
-MapiMsg mapi_set_compression(Mapi mid, const char* compression) {
+MapiMsg 
+mapi_set_compression(Mapi mid, const char* compression) {
        if (strcasecmp(compression, "snappy") == 0) {
                mid->comp = COMPRESSION_SNAPPY;
        }
@@ -6023,11 +6048,26 @@ MapiMsg mapi_set_compression(Mapi mid, c
        return 0;
 }
 
-void mapi_set_blocksize(Mapi mid, size_t blocksize) {
+void 
+mapi_set_blocksize(Mapi mid, size_t blocksize) {
        if (blocksize >= BLOCK) {
                mid->blocksize = blocksize;
        }
 }
 
-
-
+MapiMsg 
+mapi_set_column_compression(Mapi mid, const char* colcomp) {
+       if (strcasecmp(colcomp, "pfor") == 0) {
+               mid->colcomp = COLUMN_COMPRESSION_PFOR;
+       }
+       else if (strcasecmp(colcomp, "none") == 0) {
+               mid->colcomp = COLUMN_COMPRESSION_NONE;
+       } else {
+               mapi_setError(mid, "invalid column compression type", 
"mapi_set_compression", MERROR);
+               return -1;
+       }
+
+       return 0;
+}
+
+
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -235,6 +235,7 @@ mapi_export MapiHdl mapi_get_active(Mapi
 
 mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot);
 mapi_export MapiMsg mapi_set_compression(Mapi mid, const char* compression);
+mapi_export MapiMsg mapi_set_column_compression(Mapi mid, const char* colcomp);
 mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize);
 
 #ifdef _MSC_VER
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -3982,6 +3982,7 @@ typedef struct bs2 {
        size_t bufsiz;
        size_t readpos;
        compression_method comp;
+       column_compression colcomp;
        char *compbuf;
        size_t compbufsiz;
        char buf[1];    /* the buffered data */
@@ -4460,6 +4461,12 @@ bs2_buffer(stream *ss) {
        return b;
 }
 
+column_compression
+bs2_colcomp(stream *ss) {
+       bs2 *s = (bs2 *) ss->stream_data.p;
+       return s->colcomp;
+}
+
 int
 isa_block_stream(stream *s)
 {
@@ -4474,7 +4481,7 @@ isa_fixed_block_stream(stream *s) {
 }
 
 stream *
-block_stream2(stream *s, size_t bufsiz, compression_method comp)
+block_stream2(stream *s, size_t bufsiz, compression_method comp, 
column_compression colcomp)
 {
        stream *ns;
        bs2 *b;
@@ -4490,6 +4497,7 @@ block_stream2(stream *s, size_t bufsiz, 
                destroy(ns);
                return NULL;
        }
+       b->colcomp = colcomp;
        /* blocksizes have a fixed little endian byteorder */
 #ifdef WORDS_BIGENDIAN
        s->byteorder = 3412;    /* simply != 1234 */
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -249,11 +249,17 @@ typedef enum {
        COMPRESSION_UNKNOWN = 255
 } compression_method;
 
-stream_export stream *block_stream2(stream *s, size_t bufsiz, 
compression_method comp);
+typedef enum {
+       COLUMN_COMPRESSION_AUTO = 255,
+       COLUMN_COMPRESSION_NONE = 0,
+       COLUMN_COMPRESSION_PFOR = 1
+} column_compression;
+
+stream_export stream *block_stream2(stream *s, size_t bufsiz, 
compression_method comp, column_compression colcomp);
 stream_export void* bs2_getbuf(stream *ss);
 stream_export void bs2_resetbuf(stream *ss);
 stream_export buffer bs2_buffer(stream *s);
-
+column_compression bs2_colcomp(stream *ss);
 
 /* read block of data including the end of block marker */
 stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize, 
size_t cnt);
diff --git a/common/utils/mcrypt.c b/common/utils/mcrypt.c
--- a/common/utils/mcrypt.c
+++ b/common/utils/mcrypt.c
@@ -34,13 +34,16 @@ mcrypt_getHashAlgorithms(void)
         * Better/stronger/faster algorithms can be added in the future upon
         * desire.
         */
+       return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10"
 #ifdef HAVE_LIBSNAPPY
-       // the server supports protocol 10 + compression
-       return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10,PROT10COMPR");
-#else
-       // the server only supports protocol 10
-       return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10");
+// the server supports protocol 10 + compression
+               ",PROT10COMPR"
 #endif
+#ifdef HAVE_PFOR
+// the server supports PFOR
+               ",PFOR"
+#endif
+               );
 }
 
 /**
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -121,6 +121,7 @@ doChallenge(void *data)
        ssize_t len = 0;
        protocol_version protocol = prot9;
        size_t buflen = BLOCK;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to