Changeset: d5d332081c12 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d5d332081c12 Added Files: common/protobuf/Makefile.ag common/protobuf/messages.c common/protobuf/messages.h common/protobuf/mhapi.proto Modified Files: monetdb5/modules/mal/mal_mapi.c Branch: protocol Log Message:
proto stuff diffs (truncated from 313 to 300 lines): diff --git a/common/protobuf/Makefile.ag b/common/protobuf/Makefile.ag new file mode 100644 --- /dev/null +++ b/common/protobuf/Makefile.ag @@ -0,0 +1,25 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. + +## Process this file with automake to produce Makefile.in + +MTSAFE + +INCLUDES = ../stream \ + $(snappy_CFLAGS) \ + $(protobuf_CFLAGS) + +lib_protobuf = { + SOURCES = messages.c messages.h mhapi.pb-c.c mhapi.pb-c.h + LIBS = $(snappy_LIBS) \ + $(protobuf_LIBS) +} + +headers_common = { + DIR = includedir/monetdb + HEADERS = h + SOURCES = messages.h mhapi.pb-c.h +} diff --git a/common/protobuf/messages.c b/common/protobuf/messages.c new file mode 100644 --- /dev/null +++ b/common/protobuf/messages.c @@ -0,0 +1,175 @@ +#include "messages.h" +#ifdef HAVE_LIBSNAPPY +#include <snappy-c.h> // C forever +#endif + +#include <stdarg.h> /* va_alist.. */ + + +Mhapi__Message* message_read(stream *s, compression_method comp) { + lng len; + mnstr_readLng(s, &len); + return message_read_length(s, comp, len); +} + +Mhapi__Message* message_read_length(stream *s, compression_method comp, lng len) { + Mhapi__Message* ret; + char* read_buf; + + if (len < 1) { + return NULL; + } + read_buf = malloc(len); + if (!read_buf) { + return NULL; + } + if (mnstr_read(s, read_buf, len, 1) != len) { + return NULL; + } + switch(comp) { + case COMPRESSION_NONE: + break; + case COMPRESSION_SNAPPY: +#ifdef HAVE_LIBSNAPPY + { + size_t uncompressed_length; + char *uncompressed_buf; + if (!snappy_uncompressed_length(read_buf, len, &uncompressed_length) == SNAPPY_OK) { + free(read_buf); + return NULL; + } + uncompressed_buf = malloc(uncompressed_length); + if (!uncompressed_buf) { + free(read_buf); + return NULL; + } + if (snappy_uncompress(read_buf, len, uncompressed_buf, &uncompressed_length) != SNAPPY_OK) { + free(read_buf); + free(uncompressed_buf); + return NULL; + } + free(read_buf); + read_buf = uncompressed_buf; + len = uncompressed_length; + } +#else + return NULL; +#endif + } + + ret = (Mhapi__Message*) protobuf_c_message_unpack(&mhapi__message__descriptor, NULL, len, (uint8_t*) read_buf); + free(read_buf); + return ret; +} + +ssize_t message_write(stream *s, compression_method comp, Mhapi__Message *msg) { + lng len = protobuf_c_message_get_packed_size((ProtobufCMessage*) msg); + char* write_buf = malloc(len); + if (!write_buf) { + return -1; + } + if (protobuf_c_message_pack((ProtobufCMessage*) msg, (uint8_t*) write_buf) != (size_t) len) { + return -1; + } + switch(comp) { + case COMPRESSION_NONE: + break; + case COMPRESSION_SNAPPY: +#ifdef HAVE_LIBSNAPPY + { + size_t compressed_length = snappy_max_compressed_length(len); + char *compressed_buf = malloc(compressed_length); + if (!compressed_buf) { + free(write_buf); + return NULL; + } + if (snappy_compress(write_buf, len, compressed_buf, &compressed_length) != SNAPPY_OK) { + free(write_buf); + free(compressed_buf); + return NULL; + } + free(write_buf); + write_buf = compressed_buf; + len = compressed_length; + } +#else + return -1; +#endif + } + + if (!mnstr_writeLng(s, len) || mnstr_write(s, write_buf, len, 1) != len) { + return -1; + } + return len; +} + +void message_send_error(stream *s, protocol_version proto, const char *format, ...) { + char buf[BUFSIZ], *bf = buf; + int i = 0; + va_list ap; + + if (s == NULL) { + return; + } + va_start(ap, format); + i = vsnprintf(bf, BUFSIZ, format, ap); + va_end (ap); + + if (proto == prot9) { + mnstr_printf(s, "!%s\n", bf); + mnstr_flush(s); + } else { + Mhapi__Message msg; + Mhapi__Error err; + + mhapi__message__init(&msg); + mhapi__error__init(&err); + + msg.message_case = MHAPI__MESSAGE__MESSAGE_ERROR; + err.message = bf; + msg.error = &err; + + if (proto == prot10compressed) { + message_write(s, COMPRESSION_SNAPPY, &msg); + } else { + message_write(s, COMPRESSION_NONE, &msg); + } + } +} + +// fixme this is mostly redundant + +void message_send_warning(stream *s, protocol_version proto, const char *format, ...) { + char buf[BUFSIZ], *bf = buf; + int i = 0; + va_list ap; + + if (s == NULL) { + return; + } + va_start(ap, format); + i = vsnprintf(bf, BUFSIZ, format, ap); + va_end (ap); + + if (proto == prot9) { + mnstr_printf(s, "#%s\n", bf); + mnstr_flush(s); + } else { + Mhapi__Message msg; + Mhapi__Warning err; + + mhapi__message__init(&msg); + mhapi__warning__init(&err); + + msg.message_case = MHAPI__MESSAGE__MESSAGE_WARNING; + err.message = bf; + msg.warning = &err; + + if (proto == prot10compressed) { + message_write(s, COMPRESSION_SNAPPY, &msg); + } else { + message_write(s, COMPRESSION_NONE, &msg); + } + } +} + diff --git a/common/protobuf/messages.h b/common/protobuf/messages.h new file mode 100644 --- /dev/null +++ b/common/protobuf/messages.h @@ -0,0 +1,9 @@ +#include "mhapi.pb-c.h" +#include "stream.h" + +Mhapi__Message* message_read(stream *s, compression_method comp); +Mhapi__Message* message_read_length(stream *s, compression_method comp, lng length); + +ssize_t message_write(stream *s, compression_method comp, Mhapi__Message *msg); +void message_send_error(stream *s, protocol_version proto, const char *format, ...); +void message_send_warning(stream *s, protocol_version proto, const char *format, ...); diff --git a/common/protobuf/mhapi.proto b/common/protobuf/mhapi.proto new file mode 100644 --- /dev/null +++ b/common/protobuf/mhapi.proto @@ -0,0 +1,71 @@ +package mhapi; + +// this is very preliminary + + +message AuthResponse { + enum Version { + PROTO10 = 1; + } + optional Version protocol_version = 1; + optional string username = 2; + optional string password_hashed = 3; + optional string dbname = 4; + enum Scenario { + MAL = 0; + SQL = 1; + } + optional Scenario scenario = 5; + enum Compression { + NONE = 0; + SNAPPY = 1; + } + optional Compression compression_method = 6; +} + +message Error { + optional string message = 1; +} + +message Warning { + optional string message = 1; +} + +message Query { + optional string message = 1; +} + + +message QueryResult { + optional int64 rows = 1; + message Column { + optional string name = 1; + optional string sqltype = 2; + + enum Type { + STRING = 1; + INTEGER = 2; + FLOAT = 3; + // more types added here... + // maybe these should be consistent with something else + } + + optional Type type = 3; + repeated string stringValues = 10; + repeated int64 intValues = 11 [packed=true]; + repeated double floatValues = 12 [packed=true]; + } + + repeated Column columns = 3; +} + + +message Message { + oneof message { + AuthResponse authresp = 2; + Query query = 3; + Error error = 5; + Warning warning = 6; + QueryResult queryresult = 7; + } +} _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list