Changeset: 0dbef0f0ddf6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0dbef0f0ddf6
Modified Files:
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streampolling.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/Utilities/customthreading.py
        sql/backends/monet5/iot/50_iot.sql
Branch: iot
Log Message:

Several bugfixes while reading from the database


diffs (truncated from 1062 to 300 lines):

diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -1,6 +1,7 @@
 import struct
 
 from abc import ABCMeta, abstractmethod
+from collections import OrderedDict
 from datetime import date, time, datetime
 from dateutil.relativedelta import relativedelta
 
@@ -57,10 +58,8 @@ class StreamDataType(object):
         return results
 
     def to_json_representation(self):  # get a json representation of the data 
type while checking the stream's info
-        dic = {'name': self._column_name, 'type': self._data_type, 'nullable': 
self._is_nullable}
-        if self._default_value is not None:
-            dic['default'] = self._default_value
-        return dic
+        return OrderedDict((('name', self._column_name), ('type', 
self._data_type),
+                            ('default', self._default_value), ('nullable', 
self._is_nullable)))
 
 
 class TextType(StreamDataType):
@@ -329,7 +328,7 @@ class TimeType(StreamDataType):  # Store
         return results
 
 
-class TimestampType(StreamDataType):  # it's represented with the two integers 
from time and date
+class TimestampType(StreamDataType):  # It is represented with the two 
integers from time and date
     """Covers: TIMESTAMP"""
 
     def __init__(self, *args):
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -65,8 +65,8 @@ class IOTStream:
         return self._stream_name
 
     def get_data_dictionary(self):
-        dic = OrderedDict({'schema': self._schema_name, 'stream': 
self._stream_name,
-                           'columns': [value.to_json_representation() for 
value in self._columns.values()]})
+        dic = OrderedDict((('schema', self._schema_name), ('stream', 
self._stream_name),
+                           ('columns', [value.to_json_representation() for 
value in self._columns.values()])))
         self._baskets_lock.acquire_read()
         count = len(self._baskets)
         listing = [{'number': k, 'count': v} for k, v in self._baskets.items()]
@@ -146,4 +146,5 @@ class IOTStream:
 
         keys = results.keys()  # TODO check if this is viable for many tuples!!
         tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k 
in keys))]
-        return {'schema': self._schema_name, 'stream': self._stream_name, 
'count': read_tuples, 'tuples': tuples}
+        return OrderedDict((('schema', self._schema_name), ('stream', 
self._stream_name), ('count', read_tuples),
+                            ('tuples', tuples)))
diff --git a/clients/iotapi/src/Streams/streamscontext.py 
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -1,3 +1,4 @@
+from collections import OrderedDict
 from Utilities.readwritelock import RWLock
 from WebSockets.websockets import unsubscribe_removed_streams
 
@@ -39,8 +40,8 @@ class IOTStreams:
 
     def get_streams_data(self):
         self._locker.acquire_read()
-        res = {'streams_count': len(self._context),
-               'streams_listing': [value.get_data_dictionary() for value in 
self._context.values()]}
+        res = OrderedDict((('streams_count', len(self._context)),
+                           ('streams_listing', [value.get_data_dictionary() 
for value in self._context.values()])))
         self._locker.release()
         return res
 
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -2,7 +2,7 @@ from datetime import datetime
 from flask import request
 from flask_restful import Resource
 from json import loads
-from jsonschema import Draft4Validator, FormatChecker
+from jsonschema import Draft4Validator, FormatChecker, ValidationError
 from pytz import utc
 from tzlocal import get_localzone
 from Settings.iotlogger import add_log
@@ -30,7 +30,7 @@ class StreamInput(Resource):
             stream = get_streams_context().get_existing_stream(schema_name, 
stream_name)
         except BaseException as ex:
             add_log(50, ex)
-            return ex, 404
+            return ex.message, 404
         return stream.get_data_dictionary(), 200
 
     def post(self, schema_name, stream_name):  # add data to a stream
@@ -40,13 +40,13 @@ class StreamInput(Resource):
             stream = get_streams_context().get_existing_stream(schema_name, 
stream_name)
         except BaseException as ex:
             add_log(50, ex)
-            return ex, 404
+            return ex.message, 404
 
         try:  # validate and insert data, if not return 400
             stream.validate_and_insert(loads(request.data), current_stamp)
-        except BaseException as ex:
+        except (ValidationError, BaseException) as ex:
             add_log(50, ex)
-            return ex, 400
+            return ex.message, 400
         return 'The insertions were made with success!', 201
 
 
@@ -71,9 +71,9 @@ class StreamsHandling(Resource):
             schema_to_validate = loads(request.data)
             Create_Streams_Validator.validate(schema_to_validate)
             get_streams_context().add_new_stream(schema_to_validate)
-        except BaseException as ex:
+        except (ValidationError, BaseException) as ex:
             add_log(50, ex)
-            return ex, 400
+            return ex.message, 400
         add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', 
schema_to_validate['stream'],
                              ' was created']))
         return 'The stream was created with success!', 201
@@ -84,13 +84,13 @@ class StreamsHandling(Resource):
             Delete_Streams_Validator.validate(schema_to_validate)
         except BaseException as ex:
             add_log(50, ex)
-            return ex, 400
+            return ex.message, 400
 
         try:  # check if stream exists, if not return 404
             get_streams_context().delete_existing_stream(schema_to_validate)
         except BaseException as ex:
             add_log(50, ex)
-            return ex, 404
+            return ex.message, 404
         add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', 
schema_to_validate['stream'],
                             ' was deleted']))
         return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -26,7 +26,7 @@ def mapi_get_webserver_streams(connectio
             extras."interval", extras."unit" FROM (SELECT "id", "name", 
"schema_id" FROM sys.tables WHERE type=4)
             AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS 
schemas ON (tables."schema_id"=schemas."id")
             LEFT JOIN (SELECT "table_id", "base", "interval", "unit" FROM 
iot.webserverstreams) AS extras
-            ON (tables."id"=extras."table_id")""".replace('\n', ' ')
+            ON (tables."id"=extras."table_id") ORDER BY tables."id" 
""".replace('\n', ' ')
         cursor.execute(sql_string)
         tables = cursor.fetchall()
 
@@ -34,10 +34,10 @@ def mapi_get_webserver_streams(connectio
         sql_string = """SELECT columns."id", columns."table_id", 
columns."name" AS column, columns."type",
             columns."type_digits", columns."type_scale", columns."default", 
columns."null", extras."special",
             extras."validation1", extras."validation2" FROM (SELECT "id", 
"table_id", "name", "type", "type_digits",
-            "type_scale", "default", "null" FROM sys.columns) AS columns INNER 
JOIN (SELECT "id" FROM sys.tables
-            WHERE type=4) AS tables ON (tables."id"=columns."table_id") LEFT 
JOIN (SELECT "column_id", "special",
-            "validation1", "validation2" FROM iot.webservercolumns) AS extras 
ON (columns."id"=extras."column_id")"""\
-            .replace('\n', ' ')
+            "type_scale", "default", "null", "number" FROM sys.columns) AS 
columns INNER JOIN (SELECT "id"
+            FROM sys.tables WHERE type=4) AS tables ON 
(tables."id"=columns."table_id") LEFT JOIN
+            (SELECT "column_id", "special", "validation1", "validation2" FROM 
iot.webservercolumns) AS extras
+            ON (columns."id"=extras."column_id") ORDER BY columns."table_id", 
columns."number" """.replace('\n', ' ')
         cursor.execute(sql_string)
         columns = cursor.fetchall()
 
@@ -45,11 +45,13 @@ def mapi_get_webserver_streams(connectio
         return tables, columns
     except BaseException as ex:
         add_log(50, ex)
+        connection.rollback()
         raise
 
 
 def mapi_create_stream(connection, concatenated_name, stream):
     schema = stream.get_schema_name()
+    table = stream.get_stream_name()
     flush_statement = stream.get_webserverstreams_sql_statement()
     columns_dictionary = stream.get_columns_extra_sql_statements()  # 
dictionary of column_name -> partial SQL statement
 
@@ -58,13 +60,13 @@ def mapi_create_stream(connection, conca
             connection.execute("CREATE SCHEMA " + schema)
             connection.commit()
         except:
-            pass
+            connection.rollback()
         connection.execute(''.join(["CREATE STREAM TABLE ", concatenated_name, 
" (", stream.get_sql_create_statement(),
                                     ")"]))
         cursor = connection.cursor()
         cursor.execute("SELECT id FROM sys.schemas WHERE \"name\"='" + schema 
+ "'")
         schema_id = str(cursor.fetchall()[0][0])
-        cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=", 
schema_id, " AND \"name\"='", stream,
+        cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=", 
schema_id, " AND \"name\"='", table,
                                 "'"]))  # get the created table id
         table_id = str(cursor.fetchall()[0][0])
         cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (", 
table_id, flush_statement, ")"]))
@@ -72,18 +74,19 @@ def mapi_create_stream(connection, conca
         columns = cursor.fetchall()
 
         inserts = []
-        colums_ids = ','.join(map(lambda x: str(x[0]), columns))
+        columns_ids = ','.join(map(lambda x: str(x[0]), columns))
         for key, value in columns_dictionary.iteritems():
             for entry in columns:  # the imp_timestamp and host identifier are 
also fetched!!
                 if entry[1] == key:  # check for column's name
-                    inserts.append(''.join(['(', entry[0], value, ')']))  # 
append the sql statement
+                    inserts.append(''.join(['(', str(entry[0]), value, ')']))  
# append the sql statement
                     break
 
         cursor.execute("INSERT INTO iot.webservercolumns VALUES " + 
','.join(inserts))
         connection.commit()
-        stream.set_delete_ids(table_id, colums_ids)
+        stream.set_delete_ids(table_id, columns_ids)
     except BaseException as ex:
         add_log(50, ex)
+        connection.rollback()
         raise
 
 
@@ -95,6 +98,7 @@ def mapi_delete_stream(connection, conca
         connection.commit()
     except BaseException as ex:
         add_log(50, ex)
+        connection.rollback()
         raise
 
 
@@ -104,3 +108,4 @@ def mapi_flush_baskets(connection, schem
         connection.commit()
     except BaseException as ex:
         add_log(40, ex)
+        connection.rollback()
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -1,6 +1,7 @@
 import struct
 
 from abc import ABCMeta, abstractmethod
+from collections import OrderedDict
 from copy import deepcopy
 from datetime import datetime, timedelta
 from dateutil import parser
@@ -39,7 +40,7 @@ class StreamDataType:
         self._data_type = kwargs['type']  # SQL name of the type
         self._is_nullable = kwargs.get('nullable', True)  # boolean
         if 'default' in kwargs and kwargs['default'] is not None:
-            self._default_value = self.set_default_value(kwargs['default'])
+            self._default_value = self.process_default_value(kwargs['default'])
         else:
             self._default_value = None
 
@@ -50,8 +51,8 @@ class StreamDataType:
     def get_nullable_constant(self):  # get the nullable constant if the 
column is nullable
         return None
 
-    def set_default_value(self, default_value):  # set the default value 
representation in the data type
-        self._default_value = default_value
+    def process_default_value(self, default_value):  # process the default 
value representation in the data type
+        return default_value
 
     def get_default_value(self):  # get the default value representation in 
the data type
         return self._default_value
@@ -88,21 +89,19 @@ class StreamDataType:
         return self.pack_parsed_values(extracted_values, counter, parameters)
 
     def to_json_representation(self):  # get a json representation of the data 
type while checking the stream's info
-        json_data = {'name': self._column_name, 'type': self._data_type, 
'nullable': self._is_nullable}
-        if self._default_value is not None:
-            json_data['default'] = self._default_value
-        return json_data
+        return OrderedDict((('name', self._column_name), ('type', 
self._data_type),
+                            ('default', self._default_value), ('nullable', 
self._is_nullable)))
 
     def process_sql_parameters(self, array):  # get other possible parameters 
such as a limit, minimum and maximum
         pass
 
     def create_stream_sql(self):  # get column creation statement on SQL
         array = [self._column_name, " ", self._data_type]
-        self.process_sql_parameters(array)  # add extra parameters to the SQL 
statement
         if self._default_value is not None:
             array.extend([" DEFAULT '", str(self._default_value), "'"])
         if not self._is_nullable:
             array.append(" NOT NULL")
+        self.process_sql_parameters(array)  # add extra parameters to the SQL 
statement
         return ''.join(array)
 
     def get_extra_sql_statement(self):  # data to iot.webservervalidation
@@ -174,14 +173,14 @@ class RegexType(TextType):
     """Covers: Regex"""
 
     def __init__(self, **kwargs):
-        super(RegexType, self).__init__(**kwargs)
         self._regex = compile(kwargs['regex'])
         self._regex_text = kwargs['regex']
+        super(RegexType, self).__init__(**kwargs)
 
-    def set_default_value(self, default_value):
+    def process_default_value(self, default_value):
         if self._regex.match(default_value) is None:
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to