Changeset: 0dbef0f0ddf6 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0dbef0f0ddf6 Modified Files: clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/Streams/streamscontext.py clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/Utilities/customthreading.py sql/backends/monet5/iot/50_iot.sql Branch: iot Log Message:
Several bugfixes while reading from the database diffs (truncated from 1062 to 300 lines): diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -1,6 +1,7 @@ import struct from abc import ABCMeta, abstractmethod +from collections import OrderedDict from datetime import date, time, datetime from dateutil.relativedelta import relativedelta @@ -57,10 +58,8 @@ class StreamDataType(object): return results def to_json_representation(self): # get a json representation of the data type while checking the stream's info - dic = {'name': self._column_name, 'type': self._data_type, 'nullable': self._is_nullable} - if self._default_value is not None: - dic['default'] = self._default_value - return dic + return OrderedDict((('name', self._column_name), ('type', self._data_type), + ('default', self._default_value), ('nullable', self._is_nullable))) class TextType(StreamDataType): @@ -329,7 +328,7 @@ class TimeType(StreamDataType): # Store return results -class TimestampType(StreamDataType): # it's represented with the two integers from time and date +class TimestampType(StreamDataType): # It is represented with the two integers from time and date """Covers: TIMESTAMP""" def __init__(self, *args): diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -65,8 +65,8 @@ class IOTStream: return self._stream_name def get_data_dictionary(self): - dic = OrderedDict({'schema': self._schema_name, 'stream': self._stream_name, - 'columns': [value.to_json_representation() for value in self._columns.values()]}) + dic = OrderedDict((('schema', self._schema_name), ('stream', self._stream_name), + ('columns', [value.to_json_representation() for value in self._columns.values()]))) self._baskets_lock.acquire_read() count = len(self._baskets) listing = [{'number': k, 'count': v} for k, v in self._baskets.items()] @@ -146,4 +146,5 @@ class IOTStream: keys = results.keys() # TODO check if this is viable for many tuples!! tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k in keys))] - return {'schema': self._schema_name, 'stream': self._stream_name, 'count': read_tuples, 'tuples': tuples} + return OrderedDict((('schema', self._schema_name), ('stream', self._stream_name), ('count', read_tuples), + ('tuples', tuples))) diff --git a/clients/iotapi/src/Streams/streamscontext.py b/clients/iotapi/src/Streams/streamscontext.py --- a/clients/iotapi/src/Streams/streamscontext.py +++ b/clients/iotapi/src/Streams/streamscontext.py @@ -1,3 +1,4 @@ +from collections import OrderedDict from Utilities.readwritelock import RWLock from WebSockets.websockets import unsubscribe_removed_streams @@ -39,8 +40,8 @@ class IOTStreams: def get_streams_data(self): self._locker.acquire_read() - res = {'streams_count': len(self._context), - 'streams_listing': [value.get_data_dictionary() for value in self._context.values()]} + res = OrderedDict((('streams_count', len(self._context)), + ('streams_listing', [value.get_data_dictionary() for value in self._context.values()]))) self._locker.release() return res diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -2,7 +2,7 @@ from datetime import datetime from flask import request from flask_restful import Resource from json import loads -from jsonschema import Draft4Validator, FormatChecker +from jsonschema import Draft4Validator, FormatChecker, ValidationError from pytz import utc from tzlocal import get_localzone from Settings.iotlogger import add_log @@ -30,7 +30,7 @@ class StreamInput(Resource): stream = get_streams_context().get_existing_stream(schema_name, stream_name) except BaseException as ex: add_log(50, ex) - return ex, 404 + return ex.message, 404 return stream.get_data_dictionary(), 200 def post(self, schema_name, stream_name): # add data to a stream @@ -40,13 +40,13 @@ class StreamInput(Resource): stream = get_streams_context().get_existing_stream(schema_name, stream_name) except BaseException as ex: add_log(50, ex) - return ex, 404 + return ex.message, 404 try: # validate and insert data, if not return 400 stream.validate_and_insert(loads(request.data), current_stamp) - except BaseException as ex: + except (ValidationError, BaseException) as ex: add_log(50, ex) - return ex, 400 + return ex.message, 400 return 'The insertions were made with success!', 201 @@ -71,9 +71,9 @@ class StreamsHandling(Resource): schema_to_validate = loads(request.data) Create_Streams_Validator.validate(schema_to_validate) get_streams_context().add_new_stream(schema_to_validate) - except BaseException as ex: + except (ValidationError, BaseException) as ex: add_log(50, ex) - return ex, 400 + return ex.message, 400 add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], ' was created'])) return 'The stream was created with success!', 201 @@ -84,13 +84,13 @@ class StreamsHandling(Resource): Delete_Streams_Validator.validate(schema_to_validate) except BaseException as ex: add_log(50, ex) - return ex, 400 + return ex.message, 400 try: # check if stream exists, if not return 404 get_streams_context().delete_existing_stream(schema_to_validate) except BaseException as ex: add_log(50, ex) - return ex, 404 + return ex.message, 404 add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], ' was deleted'])) return 'The stream was deleted with success!', 204 diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -26,7 +26,7 @@ def mapi_get_webserver_streams(connectio extras."interval", extras."unit" FROM (SELECT "id", "name", "schema_id" FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "base", "interval", "unit" FROM iot.webserverstreams) AS extras - ON (tables."id"=extras."table_id")""".replace('\n', ' ') + ON (tables."id"=extras."table_id") ORDER BY tables."id" """.replace('\n', ' ') cursor.execute(sql_string) tables = cursor.fetchall() @@ -34,10 +34,10 @@ def mapi_get_webserver_streams(connectio sql_string = """SELECT columns."id", columns."table_id", columns."name" AS column, columns."type", columns."type_digits", columns."type_scale", columns."default", columns."null", extras."special", extras."validation1", extras."validation2" FROM (SELECT "id", "table_id", "name", "type", "type_digits", - "type_scale", "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT "id" FROM sys.tables - WHERE type=4) AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT "column_id", "special", - "validation1", "validation2" FROM iot.webservercolumns) AS extras ON (columns."id"=extras."column_id")"""\ - .replace('\n', ' ') + "type_scale", "default", "null", "number" FROM sys.columns) AS columns INNER JOIN (SELECT "id" + FROM sys.tables WHERE type=4) AS tables ON (tables."id"=columns."table_id") LEFT JOIN + (SELECT "column_id", "special", "validation1", "validation2" FROM iot.webservercolumns) AS extras + ON (columns."id"=extras."column_id") ORDER BY columns."table_id", columns."number" """.replace('\n', ' ') cursor.execute(sql_string) columns = cursor.fetchall() @@ -45,11 +45,13 @@ def mapi_get_webserver_streams(connectio return tables, columns except BaseException as ex: add_log(50, ex) + connection.rollback() raise def mapi_create_stream(connection, concatenated_name, stream): schema = stream.get_schema_name() + table = stream.get_stream_name() flush_statement = stream.get_webserverstreams_sql_statement() columns_dictionary = stream.get_columns_extra_sql_statements() # dictionary of column_name -> partial SQL statement @@ -58,13 +60,13 @@ def mapi_create_stream(connection, conca connection.execute("CREATE SCHEMA " + schema) connection.commit() except: - pass + connection.rollback() connection.execute(''.join(["CREATE STREAM TABLE ", concatenated_name, " (", stream.get_sql_create_statement(), ")"])) cursor = connection.cursor() cursor.execute("SELECT id FROM sys.schemas WHERE \"name\"='" + schema + "'") schema_id = str(cursor.fetchall()[0][0]) - cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=", schema_id, " AND \"name\"='", stream, + cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=", schema_id, " AND \"name\"='", table, "'"])) # get the created table id table_id = str(cursor.fetchall()[0][0]) cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (", table_id, flush_statement, ")"])) @@ -72,18 +74,19 @@ def mapi_create_stream(connection, conca columns = cursor.fetchall() inserts = [] - colums_ids = ','.join(map(lambda x: str(x[0]), columns)) + columns_ids = ','.join(map(lambda x: str(x[0]), columns)) for key, value in columns_dictionary.iteritems(): for entry in columns: # the imp_timestamp and host identifier are also fetched!! if entry[1] == key: # check for column's name - inserts.append(''.join(['(', entry[0], value, ')'])) # append the sql statement + inserts.append(''.join(['(', str(entry[0]), value, ')'])) # append the sql statement break cursor.execute("INSERT INTO iot.webservercolumns VALUES " + ','.join(inserts)) connection.commit() - stream.set_delete_ids(table_id, colums_ids) + stream.set_delete_ids(table_id, columns_ids) except BaseException as ex: add_log(50, ex) + connection.rollback() raise @@ -95,6 +98,7 @@ def mapi_delete_stream(connection, conca connection.commit() except BaseException as ex: add_log(50, ex) + connection.rollback() raise @@ -104,3 +108,4 @@ def mapi_flush_baskets(connection, schem connection.commit() except BaseException as ex: add_log(40, ex) + connection.rollback() diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -1,6 +1,7 @@ import struct from abc import ABCMeta, abstractmethod +from collections import OrderedDict from copy import deepcopy from datetime import datetime, timedelta from dateutil import parser @@ -39,7 +40,7 @@ class StreamDataType: self._data_type = kwargs['type'] # SQL name of the type self._is_nullable = kwargs.get('nullable', True) # boolean if 'default' in kwargs and kwargs['default'] is not None: - self._default_value = self.set_default_value(kwargs['default']) + self._default_value = self.process_default_value(kwargs['default']) else: self._default_value = None @@ -50,8 +51,8 @@ class StreamDataType: def get_nullable_constant(self): # get the nullable constant if the column is nullable return None - def set_default_value(self, default_value): # set the default value representation in the data type - self._default_value = default_value + def process_default_value(self, default_value): # process the default value representation in the data type + return default_value def get_default_value(self): # get the default value representation in the data type return self._default_value @@ -88,21 +89,19 @@ class StreamDataType: return self.pack_parsed_values(extracted_values, counter, parameters) def to_json_representation(self): # get a json representation of the data type while checking the stream's info - json_data = {'name': self._column_name, 'type': self._data_type, 'nullable': self._is_nullable} - if self._default_value is not None: - json_data['default'] = self._default_value - return json_data + return OrderedDict((('name', self._column_name), ('type', self._data_type), + ('default', self._default_value), ('nullable', self._is_nullable))) def process_sql_parameters(self, array): # get other possible parameters such as a limit, minimum and maximum pass def create_stream_sql(self): # get column creation statement on SQL array = [self._column_name, " ", self._data_type] - self.process_sql_parameters(array) # add extra parameters to the SQL statement if self._default_value is not None: array.extend([" DEFAULT '", str(self._default_value), "'"]) if not self._is_nullable: array.append(" NOT NULL") + self.process_sql_parameters(array) # add extra parameters to the SQL statement return ''.join(array) def get_extra_sql_statement(self): # data to iot.webservervalidation @@ -174,14 +173,14 @@ class RegexType(TextType): """Covers: Regex""" def __init__(self, **kwargs): - super(RegexType, self).__init__(**kwargs) self._regex = compile(kwargs['regex']) self._regex_text = kwargs['regex'] + super(RegexType, self).__init__(**kwargs) - def set_default_value(self, default_value): + def process_default_value(self, default_value): if self._regex.match(default_value) is None: _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list