Changeset: 5e7861106555 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5e7861106555 Modified Files: clients/iotapi/src/Streams/streampolling.py clients/iotclient/documentation/restful_resources.rst clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/main.py sql/backends/monet5/iot/50_iot.sql Branch: iot Log Message:
Added check for hugeint type, arranged time types, starting generating SQL statements for new streams diffs (truncated from 1734 to 300 lines): diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -14,7 +14,7 @@ SWITCHER = [{'types': ['clob', 'url'], ' {'types': ['decimal'], 'class': 'DecimalType'}, {'types': ['boolean'], 'class': 'BooleanType'}, {'types': ['date'], 'class': 'DateType'}, - {'types': ['time', 'timez'], 'class': 'TimeType'}, + {'types': ['time', 'timetz'], 'class': 'TimeType'}, {'types': ['timestamp', 'timestamptz'], 'class': 'TimestampType'}, {'types': ['inet'], 'class': 'INetType'}, {'types': ['uuid'], 'class': 'UUIDType'}] diff --git a/clients/iotclient/documentation/restful_resources.rst b/clients/iotclient/documentation/restful_resources.rst --- a/clients/iotclient/documentation/restful_resources.rst +++ b/clients/iotclient/documentation/restful_resources.rst @@ -28,17 +28,16 @@ Returns a JSON file with details about a { "tuples_inserted_per_basket": 1, "columns": [ - { - "type": "real", - "name": "temperature", - "nullable": false - }, - { - "type": "text", - "name": "sensorid", - "nullable": false - } - ], + { + "type": "real", + "name": "temperature", + "nullable": false + }, + { + "type": "text", + "name": "sensorid", + "nullable": false + }], "flushing": { "base": "tuple", "number": 50 @@ -53,7 +52,7 @@ Returns a JSON file with details about a **POST** -Creates a stream using a pre-defined JSON schema. The JSON must include the stream's schema, the stream's name, the stream's columns and the flushing method. The flushing can be either time based, tuple based or automatic (:code:`auto`). For tuple based flushing, the number of tuples to flush must be provided using the :code:`number` field. In time based flushing, the :code:`interval` field tells the time units between flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours respectively. In automatic flushing, the baskets are flushed whenver a new batch is inserted. For columns `see data types for details <streams_data_types.html#data_types>`__. +Creates a stream using a pre-defined JSON schema. The JSON must include the stream's schema, the stream's name, the stream's columns and the flushing method. The flushing can be either time based, tuple based or automatic (:code:`auto`). For tuple based flushing, the number of tuples to flush must be provided using the :code:`interval` field. In time based flushing, the :code:`interval` field tells the time units between flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours respectively. In automatic flushing, the baskets are flushed whenever a new batch is inserted. For columns `see data types for details <streams_data_types.html#data_types>`__. Bellow is the JSON used to create the stream in streams_: @@ -64,8 +63,8 @@ Bellow is the JSON used to create the st "stream": "temperature", "flushing": { "base": "tuple", - "number": 50 - }, + "interval": 50 + }, "columns": [ { "type": "real", diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -1,31 +1,23 @@ import datetime import json import pytz -import sys from flask import request from flask_restful import Resource from jsonschema import Draft4Validator, FormatChecker from tzlocal import get_localzone from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA -from Streams.streamscontext import IOTStreamsContext +from Streams.streamscontext import Streams_Context from Settings.iotlogger import add_log -Streams_Context = IOTStreamsContext() -Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, format_checker=FormatChecker()) +Create_Streams_Validator = None Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, format_checker=FormatChecker()) Local_Timezone = get_localzone() # for the correction of dates we must add the system's timezone def init_rest_resources(): - global Streams_Context - - try: - Streams_Context = IOTStreamsContext() - except BaseException as ex: - print ex - add_log(50, ex) - sys.exit(1) + global Create_Streams_Validator # because of Hugeint + Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, format_checker=FormatChecker()) class StreamInput(Resource): @@ -83,8 +75,9 @@ class StreamsHandling(Resource): except BaseException as ex: add_log(50, ex) return ex, 400 - else: - return 'The stream was created with success!', 201 + add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], + ' was created'])) + return 'The stream was created with success!', 201 def delete(self): try: @@ -99,6 +92,6 @@ class StreamsHandling(Resource): except BaseException as ex: add_log(50, ex) return ex, 404 - add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], - ' was deleted']) + add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], + ' was deleted'])) return 'The stream was deleted with success!', 204 diff --git a/clients/iotclient/src/Settings/filesystem.py b/clients/iotclient/src/Settings/filesystem.py --- a/clients/iotclient/src/Settings/filesystem.py +++ b/clients/iotclient/src/Settings/filesystem.py @@ -2,7 +2,6 @@ import os import sys from iotlogger import add_log -from Utilities.filecreator import create_file_if_not_exists Baskets_Location = None diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -2,6 +2,10 @@ import sys import pymonetdb from Settings.iotlogger import add_log +from src.Streams.streamscontext import IOTStreams +from src.Streams.jsonschemas import init_create_streams_schema +from src.Streams.streamcreator import creator_add_hugeint_type +from src.Streams.streampolling import polling_add_hugeint_type Connection = None @@ -15,6 +19,13 @@ def init_monetdb_connection(hostname, po log_message = 'User %s connected successfully to database %s' % (user_name, database) print log_message add_log(20, log_message) + + if check_hugeint_type()[0] > 0: + polling_add_hugeint_type() + creator_add_hugeint_type() + init_create_streams_schema(add_hugeint=True) + else: + init_create_streams_schema(add_hugeint=False) except BaseException as ex: print ex add_log(50, ex) @@ -25,14 +36,23 @@ def close_monetdb_connection(): Connection.close() +def check_hugeint_type(): + Connection.execute("START TRANSACTION") + cursor = Connection.cursor() + cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'") + result = cursor.fetchall() + Connection.commit() + return result + + def mapi_get_webserver_streams(): try: - Connection.execute("BEGIN TRANSACTION") + Connection.execute("START TRANSACTION") cursor = Connection.cursor() sql_string = """SELECT tables."id", tables."name", schemas."name" as schema, tables."name" as table, flushing."flushing", flushing."unit", flushing."interval" FROM (SELECT "id", "name", "schema_id" - FROM sys.tables) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas - ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT "table_id", "flushing", "unit", "interval" + FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas ON + (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "flushing", "unit", "interval" FROM iot.webserverflushing) AS flushing ON (tables."id"=flushing."table_id")""".replace('\n', ' ') cursor.execute(sql_string) tables = cursor.fetchall() @@ -40,9 +60,11 @@ def mapi_get_webserver_streams(): cursor = Connection.cursor() sql_string = """SELECT columns."table_id", columns."name" as column, columns."type", columns."type_digits", columns."type_scale", columns."default", columns."null", extras."special", extras."validation1", - extras."validation2" FROM (SELECT "id", "table_id", "name", "type", "type_digits", "type_scale", "default", - "null" FROM sys.columns) AS columns INNER JOIN (SELECT "column_id", "special", "validation1", "validation2" - FROM iot.webservervalidation) AS extras ON (columns."id"=extras."column_id")""".replace('\n', ' ') + extras."validation2" FROM (SELECT "id", "table_id", "name", "type", "type_digits", "type_scale", + "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT "id" FROM sys.tables WHERE type=4) + AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT "column_id", "special", "validation1", + "validation2" FROM iot.webservervalidation) AS extras ON (columns."id"=extras."column_id")"""\ + .replace('\n', ' ') cursor.execute(sql_string) columns = cursor.fetchall() @@ -53,37 +75,52 @@ def mapi_get_webserver_streams(): return [], [] -def mapi_create_stream(schema, stream, columns): +def mapi_create_stream(stream): + schema = stream.get_schema_name() + table = stream.get_stream_name() + try: - Connection.execute("BEGIN TRANSACTION") + Connection.execute("START TRANSACTION") try: # create schema if not exists, ignore the error if already exists - Connection.execute("CREATE SCHEMA " + schema + ";") + Connection.execute("CREATE SCHEMA " + schema) except: pass - Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", columns, ")"])) # TODO concat!! + Connection.execute(''.join(["CREATE STREAM TABLE ", IOTStreams.get_context_entry_name(schema, table), " (", + stream.get_sql_create_statement(), ")"])) + + cursor = Connection.cursor() + cursor.execute("SELECT id from sys.schemas where name='" + schema + "'") # get the created table schema_id + schema_id = cursor.fetchall()[0] + cursor.execute(''.join(["SELECT id from sys.tables where schema_id=", str(schema_id), " AND name='", stream, + "'"])) # get the created table id + table_id = int(cursor.fetchall()[0]) + cursor.execute(''.join(["INSERT INTO iot.webserverflushing VALUES (", str(table_id), + stream.get_flushing_sql_statement(), ")"])) + + Connection.commit() # TODO insert on the tables - return Connection.commit() + stream.set_table_id(table_id) except BaseException as ex: add_log(50, ex) - return 0 + return None def mapi_delete_stream(schema, stream, stream_id, columns_ids): try: - Connection.execute("BEGIN TRANSACTION") - Connection.execute("DROP TABLE " + stream) # TODO concat!! + Connection.execute("START TRANSACTION") + Connection.execute("DROP TABLE " + IOTStreams.get_context_entry_name(schema, stream)) Connection.execute("DELETE FROM iot.webserverflushing WHERE table_id=" + stream_id) Connection.execute("DELETE FROM iot.webservervalidation WHERE column_id IN (" + ','.join(columns_ids) + ")") return Connection.commit() except BaseException as ex: add_log(50, ex) - return 0 + return None def mapi_flush_baskets(schema, stream, baskets): try: - Connection.execute("BEGIN TRANSACTION") - Connection.execute(''.join(["CALL iot.basket('", schema, "','", stream, "','", baskets, "');"])) + Connection.execute("START TRANSACTION") + Connection.execute(''.join(["CALL iot.basket('", schema, "','", stream, "','", baskets, "')"])) return Connection.commit() except BaseException as ex: add_log(40, ex) diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -2,6 +2,7 @@ import copy import datetime import iso8601 import itertools +import json import math import re import struct @@ -20,17 +21,13 @@ INT8_MIN = -128 INT16_MIN = -32768 INT32_MIN = -2147483648 INT64_MIN = -9223372036854775808 -INT64_MAX = +9223372036854775807 +INT64_MAX = 9223372036854775807 INT128_MIN = -340282366920938463463374607431768211456 FLOAT_NAN = struct.unpack('f', '\xff\xff\x7f\xff')[0] DOUBLE_NAN = struct.unpack('d', '\xff\xff\xff\xff\xff\xff\xef\xff')[0] - -class DataValidationException(Exception): - def __init__(self, errors): - super(DataValidationException, self).__init__() - self.message = errors # dictionary of row_number -> error +ENUM_TYPE_SEPARATOR = '\r' class StreamDataType(object): @@ -40,11 +37,12 @@ class StreamDataType(object): _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list