Changeset: 4b3ab86b53d0 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4b3ab86b53d0 Modified Files: clients/iotapi/src/main.py clients/iotclient/documentation/iot_server_arguments.rst clients/iotclient/documentation/restful_resources.rst clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/main.py clients/iotclient/tests/datatypesinsertstests.py Branch: iot Log Message:
Finished stream creation through webserver with MonetDB, now only the stream polling is remaining diffs (truncated from 954 to 300 lines): diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py --- a/clients/iotapi/src/main.py +++ b/clients/iotapi/src/main.py @@ -65,7 +65,7 @@ def main(): help='Baskets location directory (default: %s)' % DEFAULT_FILESYSTEM, metavar='DIRECTORY') parser.add_argument('-l', '--log', type=check_path, nargs='?', default=DEFAULT_LOGGING, help='Logging file location (default: %s)' % DEFAULT_LOGGING, metavar='FILE_PATH') - parser.add_argument('-po', '--polling', type=check_positive_int, nargs='?', default=60, + parser.add_argument('-po', '--polling', type=check_positive_int, nargs='?', default=60, metavar='POLLING', help='Polling interval in seconds to the database for streams updates (default: 60)') parser.add_argument('-sh', '--shost', type=check_ipv4_address, nargs='?', default='0.0.0.0', help='Web API server host (default: 0.0.0.0)', metavar='HOST') diff --git a/clients/iotclient/documentation/iot_server_arguments.rst b/clients/iotclient/documentation/iot_server_arguments.rst --- a/clients/iotclient/documentation/iot_server_arguments.rst +++ b/clients/iotclient/documentation/iot_server_arguments.rst @@ -32,19 +32,19 @@ Set the filesystem directory where the b Location of logfile. On the logfile is reported when streams are created or removed, when tuples are inserted and when the baskets are flushed. By default in UNIX systems is :code:`/var/log/iot/iotserver.log`, while on Windows is the :code:`iotserver.log` on the directory where the :code:`main.py` script was called. -Host Identifier ---------------- +Web Server Behavior +------------------- -If the *identifier* parameter is provided, an extra column on streams will be added with a custom name of the host for later identification. - -**-i - -identifier** - -Use a host identifier for every new stream. +If a stream is created with a hostname, an extra column on streams will be added with a custom name of the host for later identification. **-n - -name** Host identifier name. By default is the host's MAC address. +**-po - -polling** + +Set the polling interval in seconds to MonetDB database for streams updates. By default is :code:`60` seconds. + Web Server Listening -------------------- @@ -69,7 +69,7 @@ Listening port of the administration ser Database Connection ------------------- -Credentials for the MAPI connection to MonetDB database. +Credentials for the MAPI connection to the MonetDB database. .. note:: The user's password will be prompted during the initialization of the server. diff --git a/clients/iotclient/documentation/restful_resources.rst b/clients/iotclient/documentation/restful_resources.rst --- a/clients/iotclient/documentation/restful_resources.rst +++ b/clients/iotclient/documentation/restful_resources.rst @@ -20,13 +20,14 @@ The administration server provides resou **GET** -Returns a JSON file with details about all the streams currently created on the webserver. For each stream besides its schema and name, it provides the currently number of tuples inserted on the baskets per column, description of columns (`See data types <streams_data_types.html#data_types>`__), the flushing method (`See streams creation for details <streams_creation.html#creating_streams>`__). An example is shown bellow: +Returns a JSON file with details about all the streams currently created on the webserver. For each stream besides its schema, name and if has the hostname column, it provides the currently number of tuples inserted on the baskets per column, description of columns (`See data types <streams_data_types.html#data_types>`__), the flushing method (`See streams creation for details <streams_creation.html#creating_streams>`__). An example is shown bellow: .. code-block:: json [ { "tuples_inserted_per_basket": 1, + "hostname": false, "columns": [ { "type": "real", @@ -52,7 +53,7 @@ Returns a JSON file with details about a **POST** -Creates a stream using a pre-defined JSON schema. The JSON must include the stream's schema, the stream's name, the stream's columns and the flushing method. The flushing can be either time based, tuple based or automatic (:code:`auto`). For tuple based flushing, the number of tuples to flush must be provided using the :code:`interval` field. In time based flushing, the :code:`interval` field tells the time units between flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours respectively. In automatic flushing, the baskets are flushed whenever a new batch is inserted. For columns `see data types for details <streams_data_types.html#data_types>`__. +Creates a stream using a pre-defined JSON schema. The JSON must include the stream's schema, the stream's name, the stream's columns and the flushing method. The flushing can be either time based, tuple based or automatic (:code:`auto`). For tuple based flushing, the number of tuples to flush must be provided using the :code:`interval` field. In time based flushing, the :code:`interval` field tells the time units between flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours respectively. In automatic flushing, the baskets are flushed whenever a new batch is inserted. For columns `see data types for details <streams_data_types.html#data_types>`__. Additionaly a hostname parameter can be provided, and if it's true then an additional column will be created with the iot web server host name. Bellow is the JSON used to create the stream in streams_: @@ -61,6 +62,7 @@ Bellow is the JSON used to create the st { "schema": "measures", "stream": "temperature", + "hostname": false, "flushing": { "base": "tuple", "interval": 50 diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -32,7 +32,7 @@ class StreamInput(Resource): except BaseException as ex: add_log(50, ex) return ex, 404 - return stream.get_data_dictionary(include_number_tuples=True), 200 + return stream.get_data_dictionary(), 200 def post(self, schema_name, stream_name): # add data to a stream current_stamp = datetime.datetime.now(pytz.utc).astimezone(Local_Timezone).isoformat() diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -2,10 +2,10 @@ import sys import pymonetdb from Settings.iotlogger import add_log -from src.Streams.streamscontext import IOTStreams -from src.Streams.jsonschemas import init_create_streams_schema -from src.Streams.streamcreator import creator_add_hugeint_type -from src.Streams.streampolling import polling_add_hugeint_type +from Streams.streamscontext import IOTStreams +from Streams.jsonschemas import init_create_streams_schema +from Streams.streamcreator import creator_add_hugeint_type +from Streams.streampolling import polling_add_hugeint_type Connection = None @@ -20,7 +20,7 @@ def init_monetdb_connection(hostname, po print log_message add_log(20, log_message) - if check_hugeint_type()[0] > 0: + if check_hugeint_type() > 0: polling_add_hugeint_type() creator_add_hugeint_type() init_create_streams_schema(add_hugeint=True) @@ -40,7 +40,7 @@ def check_hugeint_type(): Connection.execute("START TRANSACTION") cursor = Connection.cursor() cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'") - result = cursor.fetchall() + result = cursor.fetchall()[0] Connection.commit() return result @@ -49,21 +49,22 @@ def mapi_get_webserver_streams(): try: Connection.execute("START TRANSACTION") cursor = Connection.cursor() - sql_string = """SELECT tables."id", tables."name", schemas."name" as schema, tables."name" as table, - flushing."flushing", flushing."unit", flushing."interval" FROM (SELECT "id", "name", "schema_id" - FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas ON - (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "flushing", "unit", "interval" - FROM iot.webserverflushing) AS flushing ON (tables."id"=flushing."table_id")""".replace('\n', ' ') + sql_string = """SELECT tables."id", tables."name", schemas."name" AS schema, tables."name" AS table, + extras."has_hostname", extras."flushing", extras."unit", extras."interval" FROM (SELECT "id", "name", + "schema_id" FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) + AS schemas ON (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "has_hostname", "flushing", + "unit", "interval" FROM iot.webserverstreams) AS extras ON (tables."id"=extras."table_id")"""\ + .replace('\n', ' ') cursor.execute(sql_string) tables = cursor.fetchall() cursor = Connection.cursor() - sql_string = """SELECT columns."table_id", columns."name" as column, columns."type", columns."type_digits", + sql_string = """SELECT columns."table_id", columns."name" AS column, columns."type", columns."type_digits", columns."type_scale", columns."default", columns."null", extras."special", extras."validation1", extras."validation2" FROM (SELECT "id", "table_id", "name", "type", "type_digits", "type_scale", "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT "id" FROM sys.tables WHERE type=4) AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT "column_id", "special", "validation1", - "validation2" FROM iot.webservervalidation) AS extras ON (columns."id"=extras."column_id")"""\ + "validation2" FROM iot.webservercolumns) AS extras ON (columns."id"=extras."column_id")"""\ .replace('\n', ' ') cursor.execute(sql_string) columns = cursor.fetchall() @@ -78,50 +79,59 @@ def mapi_get_webserver_streams(): def mapi_create_stream(stream): schema = stream.get_schema_name() table = stream.get_stream_name() + flush_statement = stream.get_webserverstreams_sql_statement() + columns_dictionary = stream.get_columns_extra_sql_statements() # dictionary of column_name -> partial SQL statement try: + try: # create schema if not exists, ignore the error if already exists + Connection.execute("START TRANSACTION") + Connection.execute("CREATE SCHEMA " + schema) + Connection.commit() + except: + Connection.commit() Connection.execute("START TRANSACTION") - try: # create schema if not exists, ignore the error if already exists - Connection.execute("CREATE SCHEMA " + schema) - except: - pass Connection.execute(''.join(["CREATE STREAM TABLE ", IOTStreams.get_context_entry_name(schema, table), " (", stream.get_sql_create_statement(), ")"])) + cursor = Connection.cursor() + cursor.execute("SELECT id FROM sys.schemas WHERE \"name\"='" + schema + "'") + schema_id = str(cursor.fetchall()[0][0]) + cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=", schema_id, " AND \"name\"='", stream, + "'"])) # get the created table id + table_id = str(cursor.fetchall()[0][0]) + cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (", table_id, flush_statement, ")"])) + cursor.execute('SELECT id, "name" FROM sys.columns WHERE table_id=' + table_id) + columns = cursor.fetchall() - cursor = Connection.cursor() - cursor.execute("SELECT id from sys.schemas where name='" + schema + "'") # get the created table schema_id - schema_id = cursor.fetchall()[0] - cursor.execute(''.join(["SELECT id from sys.tables where schema_id=", str(schema_id), " AND name='", stream, - "'"])) # get the created table id - table_id = int(cursor.fetchall()[0]) - cursor.execute(''.join(["INSERT INTO iot.webserverflushing VALUES (", str(table_id), - stream.get_flushing_sql_statement(), ")"])) + inserts = [] + colums_ids = ','.join(map(lambda x: str(x[0]), columns)) + for key, value in columns_dictionary.iteritems(): + for entry in columns: # the imp_timestamp and host identifier are also fetched!! + if entry[1] == key: # check for column's name + inserts.append(''.join(['(', entry[0], value, ')'])) # append the sql statement + break + cursor.execute("INSERT INTO iot.webservercolumns VALUES " + ','.join(inserts)) Connection.commit() - # TODO insert on the tables - stream.set_table_id(table_id) + stream.set_delete_ids(table_id, colums_ids) except BaseException as ex: add_log(50, ex) - return None def mapi_delete_stream(schema, stream, stream_id, columns_ids): try: Connection.execute("START TRANSACTION") Connection.execute("DROP TABLE " + IOTStreams.get_context_entry_name(schema, stream)) - Connection.execute("DELETE FROM iot.webserverflushing WHERE table_id=" + stream_id) - Connection.execute("DELETE FROM iot.webservervalidation WHERE column_id IN (" + ','.join(columns_ids) + ")") - return Connection.commit() + Connection.execute("DELETE FROM iot.webserverstreams WHERE table_id=" + stream_id) + Connection.execute("DELETE FROM iot.webservercolumns WHERE column_id IN (" + columns_ids + ")") + Connection.commit() except BaseException as ex: add_log(50, ex) - return None def mapi_flush_baskets(schema, stream, baskets): try: Connection.execute("START TRANSACTION") Connection.execute(''.join(["CALL iot.basket('", schema, "','", stream, "','", baskets, "')"])) - return Connection.commit() + Connection.commit() except BaseException as ex: add_log(40, ex) - return 0 diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -42,7 +42,6 @@ class StreamDataType(object): self._default_value = self.set_default_value(kwargs['default']) else: self._default_value = None - self._column_id = kwargs.get('id', None) def is_nullable(self): # check if the column is nullable or not return self._is_nullable @@ -57,12 +56,6 @@ class StreamDataType(object): def get_default_value(self): # get the default value representation in the data type return self._default_value - def set_column_id(self, column_id): # set column id for iot.webservervalidation table - self._column_id = column_id - - def get_column_id(self): - return self._column_id - def add_json_schema_entry(self, schema): # add the entry for the stream's corresponding json schema dic = {} # must be done after setting the default value!!! if self._default_value is not None: diff --git a/clients/iotclient/src/Streams/jsonschemas.py b/clients/iotclient/src/Streams/jsonschemas.py --- a/clients/iotclient/src/Streams/jsonschemas.py +++ b/clients/iotclient/src/Streams/jsonschemas.py @@ -17,7 +17,7 @@ INET6_TYPE = ["inet6"] REGEX_TYPE = ["regex"] ENUM_TYPE = ["enum"] BOOLEAN_TYPE = ["bool", "boolean"] -SMALL_INTEGERS = ["tinyint", "smallint", "int", "integer", "wrd", "bigint"] +SMALL_INTEGERS_TYPES = ["tinyint", "smallint", "int", "integer", "wrd", "bigint"] HUGE_INTEGER_TYPE = ["hugeint"] FLOATING_POINT_PRECISION_TYPES = ["real", "float", "double", "double precision"] DECIMAL_TYPES = ["dec", "decimal", "numeric"] @@ -34,9 +34,9 @@ def init_create_streams_schema(add_hugei global CREATE_STREAMS_SCHEMA if add_hugeint: - integers = SMALL_INTEGERS + HUGE_INTEGER_TYPE + integers = SMALL_INTEGERS_TYPES + HUGE_INTEGER_TYPE else: - integers = SMALL_INTEGERS + integers = SMALL_INTEGERS_TYPES CREATE_STREAMS_SCHEMA = { "title": "JSON schema to create a stream", @@ -46,6 +46,7 @@ def init_create_streams_schema(add_hugei "properties": { "schema": {"type": "string"}, "stream": {"type": "string"}, + "hostname": {"type": "boolean", "default": False}, "flushing": { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list