Changeset: 5e7861106555 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5e7861106555
Modified Files:
        clients/iotapi/src/Streams/streampolling.py
        clients/iotclient/documentation/restful_resources.rst
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streampolling.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/main.py
        sql/backends/monet5/iot/50_iot.sql
Branch: iot
Log Message:

Added check for hugeint type, arranged time types, starting generating SQL 
statements for new streams


diffs (truncated from 1734 to 300 lines):

diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -14,7 +14,7 @@ SWITCHER = [{'types': ['clob', 'url'], '
             {'types': ['decimal'], 'class': 'DecimalType'},
             {'types': ['boolean'], 'class': 'BooleanType'},
             {'types': ['date'], 'class': 'DateType'},
-            {'types': ['time', 'timez'], 'class': 'TimeType'},
+            {'types': ['time', 'timetz'], 'class': 'TimeType'},
             {'types': ['timestamp', 'timestamptz'], 'class': 'TimestampType'},
             {'types': ['inet'], 'class': 'INetType'},
             {'types': ['uuid'], 'class': 'UUIDType'}]
diff --git a/clients/iotclient/documentation/restful_resources.rst 
b/clients/iotclient/documentation/restful_resources.rst
--- a/clients/iotclient/documentation/restful_resources.rst
+++ b/clients/iotclient/documentation/restful_resources.rst
@@ -28,17 +28,16 @@ Returns a JSON file with details about a
          {
            "tuples_inserted_per_basket": 1,
            "columns": [
-             {
-               "type": "real",
-               "name": "temperature",
-               "nullable": false
-             },
-             {
-               "type": "text",
-               "name": "sensorid",
-               "nullable": false
-             }
-           ],
+        {
+                   "type": "real",
+                   "name": "temperature",
+                   "nullable": false
+        },
+        {
+                   "type": "text",
+                   "name": "sensorid",
+                   "nullable": false
+        }],
            "flushing": {
              "base": "tuple",
              "number": 50
@@ -53,7 +52,7 @@ Returns a JSON file with details about a
 
 **POST**
 
-Creates a stream using a pre-defined JSON schema. The JSON must include the 
stream's schema, the stream's name, the stream's columns and the flushing 
method. The flushing can be either time based, tuple based or automatic 
(:code:`auto`). For tuple based flushing, the number of tuples to flush must be 
provided using the :code:`number` field. In time based flushing, the 
:code:`interval` field tells the time units between flushes and the 
:code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours 
respectively. In automatic flushing, the baskets are flushed whenver a new 
batch is inserted. For columns `see data types for details 
<streams_data_types.html#data_types>`__.
+Creates a stream using a pre-defined JSON schema. The JSON must include the 
stream's schema, the stream's name, the stream's columns and the flushing 
method. The flushing can be either time based, tuple based or automatic 
(:code:`auto`). For tuple based flushing, the number of tuples to flush must be 
provided using the :code:`interval` field. In time based flushing, the 
:code:`interval` field tells the time units between flushes and the 
:code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours 
respectively. In automatic flushing, the baskets are flushed whenever a new 
batch is inserted. For columns `see data types for details 
<streams_data_types.html#data_types>`__.
 
 Bellow is the JSON used to create the stream in streams_:
 
@@ -64,8 +63,8 @@ Bellow is the JSON used to create the st
          "stream": "temperature",
          "flushing": {
            "base": "tuple",
-           "number": 50
-           },
+           "interval": 50
+      },
          "columns": [
            {
              "type": "real",
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -1,31 +1,23 @@
 import datetime
 import json
 import pytz
-import sys
 
 from flask import request
 from flask_restful import Resource
 from jsonschema import Draft4Validator, FormatChecker
 from tzlocal import get_localzone
 from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA
-from Streams.streamscontext import IOTStreamsContext
+from Streams.streamscontext import Streams_Context
 from Settings.iotlogger import add_log
 
-Streams_Context = IOTStreamsContext()
-Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
+Create_Streams_Validator = None
 Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
 Local_Timezone = get_localzone()  # for the correction of dates we must add 
the system's timezone
 
 
 def init_rest_resources():
-    global Streams_Context
-
-    try:
-        Streams_Context = IOTStreamsContext()
-    except BaseException as ex:
-        print ex
-        add_log(50, ex)
-        sys.exit(1)
+    global Create_Streams_Validator  # because of Hugeint
+    Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
 
 
 class StreamInput(Resource):
@@ -83,8 +75,9 @@ class StreamsHandling(Resource):
         except BaseException as ex:
             add_log(50, ex)
             return ex, 400
-        else:
-            return 'The stream was created with success!', 201
+        add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', 
schema_to_validate['stream'],
+                             ' was created']))
+        return 'The stream was created with success!', 201
 
     def delete(self):
         try:
@@ -99,6 +92,6 @@ class StreamsHandling(Resource):
         except BaseException as ex:
             add_log(50, ex)
             return ex, 404
-        add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.', 
schema_to_validate['stream'],
-                            ' was deleted'])
+        add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', 
schema_to_validate['stream'],
+                            ' was deleted']))
         return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -2,7 +2,6 @@ import os
 import sys
 
 from iotlogger import add_log
-from Utilities.filecreator import create_file_if_not_exists
 
 Baskets_Location = None
 
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -2,6 +2,10 @@ import sys
 import pymonetdb
 
 from Settings.iotlogger import add_log
+from src.Streams.streamscontext import IOTStreams
+from src.Streams.jsonschemas import init_create_streams_schema
+from src.Streams.streamcreator import creator_add_hugeint_type
+from src.Streams.streampolling import polling_add_hugeint_type
 
 Connection = None
 
@@ -15,6 +19,13 @@ def init_monetdb_connection(hostname, po
         log_message = 'User %s connected successfully to database %s' % 
(user_name, database)
         print log_message
         add_log(20, log_message)
+
+        if check_hugeint_type()[0] > 0:
+            polling_add_hugeint_type()
+            creator_add_hugeint_type()
+            init_create_streams_schema(add_hugeint=True)
+        else:
+            init_create_streams_schema(add_hugeint=False)
     except BaseException as ex:
         print ex
         add_log(50, ex)
@@ -25,14 +36,23 @@ def close_monetdb_connection():
     Connection.close()
 
 
+def check_hugeint_type():
+    Connection.execute("START TRANSACTION")
+    cursor = Connection.cursor()
+    cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'")
+    result = cursor.fetchall()
+    Connection.commit()
+    return result
+
+
 def mapi_get_webserver_streams():
     try:
-        Connection.execute("BEGIN TRANSACTION")
+        Connection.execute("START TRANSACTION")
         cursor = Connection.cursor()
         sql_string = """SELECT tables."id", tables."name", schemas."name" as 
schema, tables."name" as table,
             flushing."flushing", flushing."unit", flushing."interval" FROM 
(SELECT "id", "name", "schema_id"
-            FROM sys.tables) AS tables INNER JOIN (SELECT "id", "name" FROM 
sys.schemas) AS schemas
-            ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT 
"table_id", "flushing", "unit", "interval"
+            FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", 
"name" FROM sys.schemas) AS schemas ON
+            (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", 
"flushing", "unit", "interval"
             FROM iot.webserverflushing) AS flushing ON 
(tables."id"=flushing."table_id")""".replace('\n', ' ')
         cursor.execute(sql_string)
         tables = cursor.fetchall()
@@ -40,9 +60,11 @@ def mapi_get_webserver_streams():
         cursor = Connection.cursor()
         sql_string = """SELECT columns."table_id", columns."name" as column, 
columns."type", columns."type_digits",
             columns."type_scale", columns."default", columns."null", 
extras."special", extras."validation1",
-            extras."validation2" FROM (SELECT "id", "table_id", "name", 
"type", "type_digits", "type_scale", "default",
-            "null" FROM sys.columns) AS columns INNER JOIN (SELECT 
"column_id", "special", "validation1", "validation2"
-            FROM iot.webservervalidation) AS extras ON 
(columns."id"=extras."column_id")""".replace('\n', ' ')
+            extras."validation2" FROM (SELECT "id", "table_id", "name", 
"type", "type_digits", "type_scale",
+            "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT 
"id" FROM sys.tables WHERE type=4)
+            AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT 
"column_id", "special", "validation1",
+            "validation2" FROM iot.webservervalidation) AS extras ON 
(columns."id"=extras."column_id")"""\
+            .replace('\n', ' ')
         cursor.execute(sql_string)
         columns = cursor.fetchall()
 
@@ -53,37 +75,52 @@ def mapi_get_webserver_streams():
         return [], []
 
 
-def mapi_create_stream(schema, stream, columns):
+def mapi_create_stream(stream):
+    schema = stream.get_schema_name()
+    table = stream.get_stream_name()
+
     try:
-        Connection.execute("BEGIN TRANSACTION")
+        Connection.execute("START TRANSACTION")
         try:  # create schema if not exists, ignore the error if already exists
-            Connection.execute("CREATE SCHEMA " + schema + ";")
+            Connection.execute("CREATE SCHEMA " + schema)
         except:
             pass
-        Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", 
columns, ")"]))  # TODO concat!!
+        Connection.execute(''.join(["CREATE STREAM TABLE ", 
IOTStreams.get_context_entry_name(schema, table), " (",
+                                    stream.get_sql_create_statement(), ")"]))
+
+        cursor = Connection.cursor()
+        cursor.execute("SELECT id from sys.schemas where name='" + schema + 
"'")  # get the created table schema_id
+        schema_id = cursor.fetchall()[0]
+        cursor.execute(''.join(["SELECT id from sys.tables where schema_id=", 
str(schema_id), " AND name='", stream,
+                                "'"]))  # get the created table id
+        table_id = int(cursor.fetchall()[0])
+        cursor.execute(''.join(["INSERT INTO iot.webserverflushing VALUES (", 
str(table_id),
+                                stream.get_flushing_sql_statement(), ")"]))
+
+        Connection.commit()
         # TODO insert on the tables
-        return Connection.commit()
+        stream.set_table_id(table_id)
     except BaseException as ex:
         add_log(50, ex)
-        return 0
+        return None
 
 
 def mapi_delete_stream(schema, stream, stream_id, columns_ids):
     try:
-        Connection.execute("BEGIN TRANSACTION")
-        Connection.execute("DROP TABLE " + stream)  # TODO concat!!
+        Connection.execute("START TRANSACTION")
+        Connection.execute("DROP TABLE " + 
IOTStreams.get_context_entry_name(schema, stream))
         Connection.execute("DELETE FROM iot.webserverflushing WHERE table_id=" 
+ stream_id)
         Connection.execute("DELETE FROM iot.webservervalidation WHERE 
column_id IN (" + ','.join(columns_ids) + ")")
         return Connection.commit()
     except BaseException as ex:
         add_log(50, ex)
-        return 0
+        return None
 
 
 def mapi_flush_baskets(schema, stream, baskets):
     try:
-        Connection.execute("BEGIN TRANSACTION")
-        Connection.execute(''.join(["CALL iot.basket('", schema, "','", 
stream, "','", baskets, "');"]))
+        Connection.execute("START TRANSACTION")
+        Connection.execute(''.join(["CALL iot.basket('", schema, "','", 
stream, "','", baskets, "')"]))
         return Connection.commit()
     except BaseException as ex:
         add_log(40, ex)
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -2,6 +2,7 @@ import copy
 import datetime
 import iso8601
 import itertools
+import json
 import math
 import re
 import struct
@@ -20,17 +21,13 @@ INT8_MIN = -128
 INT16_MIN = -32768
 INT32_MIN = -2147483648
 INT64_MIN = -9223372036854775808
-INT64_MAX = +9223372036854775807
+INT64_MAX = 9223372036854775807
 INT128_MIN = -340282366920938463463374607431768211456
 
 FLOAT_NAN = struct.unpack('f', '\xff\xff\x7f\xff')[0]
 DOUBLE_NAN = struct.unpack('d', '\xff\xff\xff\xff\xff\xff\xef\xff')[0]
 
-
-class DataValidationException(Exception):
-    def __init__(self, errors):
-        super(DataValidationException, self).__init__()
-        self.message = errors  # dictionary of row_number -> error
+ENUM_TYPE_SEPARATOR = '\r'
 
 
 class StreamDataType(object):
@@ -40,11 +37,12 @@ class StreamDataType(object):
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to