Changeset: 721c8b3d945f for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=721c8b3d945f Modified Files: clients/mapiclient/mclient.c clients/mapilib/mapi.c clients/mapilib/mapi.h common/stream/stream.c common/stream/stream.h monetdb5/modules/mal/mal_mapi.c Branch: protocol Log Message:
mclient support for new block stream, compression working diffs (truncated from 615 to 300 lines): diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -2932,6 +2932,10 @@ usage(const char *prog, int xit) fprintf(stderr, " -E charset | --encoding=charset specify encoding (character set) of the terminal\n"); #endif fprintf(stderr, " -f kind | --format=kind specify output format {csv,tab,raw,sql,xml}\n"); + + fprintf(stderr, " -P version | --protocol=version specify protocol version {prot9,prot10,prot10compressed}\n"); + fprintf(stderr, " -B size | --blocksize=size specify protocol block size (>= %d)\n", BLOCK); + fprintf(stderr, " -H | --history load/save cmdline history (default off)\n"); fprintf(stderr, " -i | --interactive[=tm] interpret `\\' commands on stdin, use time formatting {ms,s,m}\n"); fprintf(stderr, " -l language | --language=lang {sql,mal}\n"); @@ -2969,6 +2973,8 @@ main(int argc, char **argv) char *command = NULL; char *dbname = NULL; char *output = NULL; /* output format as string */ + char *protocol = NULL; + size_t blocksize = 0; FILE *fp = NULL; int trace = 0; int dump = 0; @@ -2992,6 +2998,9 @@ main(int argc, char **argv) {"encoding", 1, 0, 'E'}, #endif {"format", 1, 0, 'f'}, + {"protocol", 1, 0, 'P'}, + {"blocksize", 1, 0, 'B'}, + {"help", 0, 0, '?'}, {"history", 0, 0, 'H'}, {"host", 1, 0, 'h'}, @@ -3125,6 +3134,16 @@ main(int argc, char **argv) free(output); output = strdup(optarg); /* output format */ break; + case 'P': + assert(optarg); + if (protocol != NULL) + free(protocol); + protocol = strdup(optarg); + break; + case 'B': + assert(optarg); + blocksize = (size_t) atol(optarg); + break; case 'i': interactive = 1; showtiming = 1; @@ -3271,6 +3290,30 @@ main(int argc, char **argv) if (passwd) free(passwd); passwd = NULL; + + if (blocksize > 0) { + if (blocksize < BLOCK) { + fprintf(stderr, "invalid block size (needs to be bigger than %d)\n", BLOCK); + } else { + mapi_set_blocksize(mid, blocksize); + } + } + + if (protocol) { + if (strcasecmp(protocol, "prot9") == 0) { + mapi_set_protocol(mid, prot9); + } + else if (strcasecmp(protocol, "prot10") == 0) { + mapi_set_protocol(mid, prot10); + } + else if (strcasecmp(protocol, "prot10compressed") == 0) { + mapi_set_protocol(mid, prot10compressed); + } + else { + fprintf(stderr, "invalid protocol name '%s'\n", protocol); + } + } + if (mid && mapi_error(mid) == MOK) mapi_reconnect(mid); /* actually, initial connect */ diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -881,6 +881,7 @@ struct BlockCache { int eos; /* end of sequence */ }; + /* A connection to a server is represented by a struct MapiStruct. An application can have any number of connections to any number of servers. Connections are completely independent of each other. @@ -897,6 +898,8 @@ struct MapiStruct { char *uri; int languageId; char *motd; /* welcome message from server */ + protocol_version protocol; + size_t blocksize; int trace; /* Trace Mapi interaction */ int auto_commit; @@ -1884,6 +1887,9 @@ mapi_new(void) mid->username = NULL; mid->password = NULL; + mid->protocol = protauto; + mid->blocksize = 128*BLOCK; // 1 MB + mid->cachelimit = 100; mid->redircnt = 0; mid->redirmax = 10; @@ -2193,11 +2199,6 @@ mapi_destroy(Mapi mid) return MOK; } -typedef enum { - prot9 = 1, - prot10 = 2, - prot10compressed = 3, -} protocol_version; /* (Re-)establish a connection with the server. */ MapiMsg @@ -2214,8 +2215,6 @@ mapi_reconnect(Mapi mid) char *protover; char *rest; protocol_version prot_version = prot9; - // FIXME: make this configurable - size_t block_size = 1024000; if (mid->connected) close_connection(mid); @@ -2631,17 +2630,37 @@ mapi_reconnect(Mapi mid) } #ifdef HAVE_LIBSNAPPY - if (strstr(hashes, "PROT10COMPRESSED")) { + if (strstr(hashes, "PROT10COMPR")) { // both server and client support compressed protocol 10; use compressed version - prot_version = prot10compressed; + if (mid->protocol == protauto) { + prot_version = prot10compressed; + } else { + prot_version = mid->protocol; + } } else #endif if (strstr(hashes, "PROT10")) { // both server and client support protocol 10; use protocol 10 - prot_version = prot10; + if (mid->protocol == protauto) { + prot_version = prot10; + } else { + if (mid->protocol == prot10compressed) { + mapi_setError(mid, "Either client or server do not support protocol compression", "mapi_reconnect", MERROR); + close_connection(mid); + return mid->error; + } else { + prot_version = mid->protocol; + } + } } else { // connecting to old server; use protocol 9 - prot_version = prot9; + if (mid->protocol == prot9 || mid->protocol == protauto) { + prot_version = prot9; + } else { + mapi_setError(mid, "Either client or server do not support protocol compression", "mapi_reconnect", MERROR); + close_connection(mid); + return mid->error; + } } /* in rest now should be the byte order of the server */ @@ -2742,8 +2761,8 @@ mapi_reconnect(Mapi mid) #endif mid->username, hash, mid->language, mid->database == NULL ? "" : mid->database, - prot_version == prot10 ? "PROT10" : "PROT10COMPRESSED", - block_size); + prot_version == prot10 ? "PROT10" : "PROT10COMPR", + mid->blocksize); } else { retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:\n", #ifdef WORDS_BIGENDIAN @@ -2785,29 +2804,23 @@ mapi_reconnect(Mapi mid) if (prot_version == prot10 || prot_version == prot10compressed) { - printf("Using protocol version %s.\n", prot_version == prot10 ? "PROT10" : "PROT10COMPRESSED"); + printf("Using protocol version %s.\n", prot_version == prot10 ? "PROT10" : "PROT10COMPR"); assert(isa_block_stream(mid->to)); assert(isa_block_stream(mid->from)); if (prot_version == prot10compressed) { -#ifdef HAVE_LIBSNAPPY2 - mid->to = compressed_stream(bs_to->s, COMPRESSION_SNAPPY); - mid->from = compressed_stream(bs_from->s, COMPRESSION_SNAPPY); +#ifdef HAVE_LIBSNAPPY + mid->to = block_stream2(bs_stream(mid->to), mid->blocksize, COMPRESSION_SNAPPY); + mid->from = block_stream2(bs_stream(mid->from), mid->blocksize, COMPRESSION_SNAPPY); #else assert(0); #endif } else { - // FIXME: figure out proper stream sizes - mid->to = block_stream2(bs_stream(mid->to), block_size); - mid->from = block_stream2(bs_stream(mid->from), block_size); + mid->to = block_stream2(bs_stream(mid->to), mid->blocksize, COMPRESSION_NONE); + mid->from = block_stream2(bs_stream(mid->from), mid->blocksize, COMPRESSION_NONE); } - // FIXME: this leaks -// bs_to->s = NULL; -// bs_from->s = NULL; -// close_stream((stream*) bs_to); -// close_stream((stream*) bs_from); - + // FIXME: this leaks a block stream header } /* consume the welcome message from the server */ @@ -5527,3 +5540,15 @@ mapi_get_active(Mapi mid) return mid->active; } +void mapi_set_protocol(Mapi mid, protocol_version prot) { + mid->protocol = prot; +} + +void mapi_set_blocksize(Mapi mid, size_t blocksize) { + if (blocksize >= BLOCK) { + mid->blocksize = blocksize; + } +} + + + diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h --- a/clients/mapilib/mapi.h +++ b/clients/mapilib/mapi.h @@ -129,6 +129,13 @@ typedef struct { /* used by MAPI_DATETI unsigned int fraction; /* in 1000 millionths of a second (10e-9) */ } MapiDateTime; +typedef enum { + protauto = 0, + prot9 = 1, + prot10 = 2, + prot10compressed = 3, +} protocol_version; + /* connection-oriented functions */ mapi_export Mapi mapi_mapi(const char *host, int port, const char *username, const char *password, const char *lang, const char *dbname); mapi_export Mapi mapi_mapiuri(const char *url, const char *user, const char *pass, const char *lang); @@ -232,6 +239,10 @@ mapi_export int mapi_get_tableid(MapiHdl mapi_export char *mapi_quote(const char *msg, int size); mapi_export char *mapi_unquote(char *msg); mapi_export MapiHdl mapi_get_active(Mapi mid); + +mapi_export void mapi_set_protocol(Mapi mid, protocol_version prot); +mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize); + #ifdef _MSC_VER mapi_export const char *wsaerror(int); #endif diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -93,6 +93,9 @@ #ifdef HAVE_LIBLZMA #include <lzma.h> #endif +#ifdef HAVE_LIBSNAPPY +#include <snappy-c.h> // C forever +#endif #ifdef HAVE_ICONV #ifdef HAVE_ICONV_H @@ -3974,13 +3977,15 @@ typedef struct bs2 { size_t nr; /* how far we got in buf */ size_t itotal; /* amount available in current read block */ size_t bufsiz; + compression_method comp; + char *compbuf; + size_t compbufsiz; char buf[0]; /* the buffered data (minus the size of * size-short */ - } bs2; static bs2 * -bs2_create(stream *s, size_t bufsiz) +bs2_create(stream *s, size_t bufsiz, compression_method comp) { /* should be a binary stream */ bs2 *ns; @@ -3991,6 +3996,21 @@ bs2_create(stream *s, size_t bufsiz) ns->nr = 0; ns->itotal = 0; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list