Changeset: 4b3ab86b53d0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4b3ab86b53d0
Modified Files:
        clients/iotapi/src/main.py
        clients/iotclient/documentation/iot_server_arguments.rst
        clients/iotclient/documentation/restful_resources.rst
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streampolling.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/main.py
        clients/iotclient/tests/datatypesinsertstests.py
Branch: iot
Log Message:

Finished stream creation through webserver with MonetDB, now only the stream 
polling is remaining


diffs (truncated from 954 to 300 lines):

diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py
--- a/clients/iotapi/src/main.py
+++ b/clients/iotapi/src/main.py
@@ -65,7 +65,7 @@ def main():
                         help='Baskets location directory (default: %s)' % 
DEFAULT_FILESYSTEM, metavar='DIRECTORY')
     parser.add_argument('-l', '--log', type=check_path, nargs='?', 
default=DEFAULT_LOGGING,
                         help='Logging file location (default: %s)' % 
DEFAULT_LOGGING, metavar='FILE_PATH')
-    parser.add_argument('-po', '--polling', type=check_positive_int, 
nargs='?', default=60,
+    parser.add_argument('-po', '--polling', type=check_positive_int, 
nargs='?', default=60, metavar='POLLING',
                         help='Polling interval in seconds to the database for 
streams updates (default: 60)')
     parser.add_argument('-sh', '--shost', type=check_ipv4_address, nargs='?', 
default='0.0.0.0',
                         help='Web API server host (default: 0.0.0.0)', 
metavar='HOST')
diff --git a/clients/iotclient/documentation/iot_server_arguments.rst 
b/clients/iotclient/documentation/iot_server_arguments.rst
--- a/clients/iotclient/documentation/iot_server_arguments.rst
+++ b/clients/iotclient/documentation/iot_server_arguments.rst
@@ -32,19 +32,19 @@ Set the filesystem directory where the b
 
 Location of logfile. On the logfile is reported when streams are created or 
removed, when tuples are inserted and when the baskets are flushed. By default 
in UNIX systems is :code:`/var/log/iot/iotserver.log`, while on Windows is the 
:code:`iotserver.log` on the directory where the :code:`main.py` script was 
called.
 
-Host Identifier
----------------
+Web Server Behavior
+-------------------
 
-If the *identifier* parameter is provided, an extra column on streams will be 
added with a custom name of the host for later identification.
-
-**-i  - -identifier**
-
-Use a host identifier for every new stream.
+If a stream is created with a hostname, an extra column on streams will be 
added with a custom name of the host for later identification.
 
 **-n  - -name**
 
 Host identifier name. By default is the host's MAC address.
 
+**-po  - -polling**
+
+Set the polling interval in seconds to MonetDB database for streams updates. 
By default is :code:`60` seconds.
+
 Web Server Listening
 --------------------
 
@@ -69,7 +69,7 @@ Listening port of the administration ser
 Database Connection
 -------------------
 
-Credentials for the MAPI connection to MonetDB database.
+Credentials for the MAPI connection to the MonetDB database.
 
 .. note:: The user's password will be prompted during the initialization of 
the server.
 
diff --git a/clients/iotclient/documentation/restful_resources.rst 
b/clients/iotclient/documentation/restful_resources.rst
--- a/clients/iotclient/documentation/restful_resources.rst
+++ b/clients/iotclient/documentation/restful_resources.rst
@@ -20,13 +20,14 @@ The administration server provides resou
 
 **GET**
 
-Returns a JSON file with details about all the streams currently created on 
the webserver. For each stream besides its schema and name, it provides the 
currently number of tuples inserted on the baskets per column, description of 
columns (`See data types <streams_data_types.html#data_types>`__), the flushing 
method (`See streams creation for details 
<streams_creation.html#creating_streams>`__). An example is shown bellow:
+Returns a JSON file with details about all the streams currently created on 
the webserver. For each stream besides its schema, name and if has the hostname 
column, it provides the currently number of tuples inserted on the baskets per 
column, description of columns (`See data types 
<streams_data_types.html#data_types>`__), the flushing method (`See streams 
creation for details <streams_creation.html#creating_streams>`__). An example 
is shown bellow:
 
 .. code-block:: json
 
        [
          {
            "tuples_inserted_per_basket": 1,
+           "hostname": false,
            "columns": [
         {
                    "type": "real",
@@ -52,7 +53,7 @@ Returns a JSON file with details about a
 
 **POST**
 
-Creates a stream using a pre-defined JSON schema. The JSON must include the 
stream's schema, the stream's name, the stream's columns and the flushing 
method. The flushing can be either time based, tuple based or automatic 
(:code:`auto`). For tuple based flushing, the number of tuples to flush must be 
provided using the :code:`interval` field. In time based flushing, the 
:code:`interval` field tells the time units between flushes and the 
:code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours 
respectively. In automatic flushing, the baskets are flushed whenever a new 
batch is inserted. For columns `see data types for details 
<streams_data_types.html#data_types>`__.
+Creates a stream using a pre-defined JSON schema. The JSON must include the 
stream's schema, the stream's name, the stream's columns and the flushing 
method. The flushing can be either time based, tuple based or automatic 
(:code:`auto`). For tuple based flushing, the number of tuples to flush must be 
provided using the :code:`interval` field. In time based flushing, the 
:code:`interval` field tells the time units between flushes and the 
:code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours 
respectively. In automatic flushing, the baskets are flushed whenever a new 
batch is inserted. For columns `see data types for details 
<streams_data_types.html#data_types>`__. Additionaly a hostname parameter can 
be provided, and if it's true then an additional column will be created with 
the iot web server host name.
 
 Bellow is the JSON used to create the stream in streams_:
 
@@ -61,6 +62,7 @@ Bellow is the JSON used to create the st
        {
          "schema": "measures",
          "stream": "temperature",
+         "hostname": false,
          "flushing": {
            "base": "tuple",
            "interval": 50
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -32,7 +32,7 @@ class StreamInput(Resource):
         except BaseException as ex:
             add_log(50, ex)
             return ex, 404
-        return stream.get_data_dictionary(include_number_tuples=True), 200
+        return stream.get_data_dictionary(), 200
 
     def post(self, schema_name, stream_name):  # add data to a stream
         current_stamp = 
datetime.datetime.now(pytz.utc).astimezone(Local_Timezone).isoformat()
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -2,10 +2,10 @@ import sys
 import pymonetdb
 
 from Settings.iotlogger import add_log
-from src.Streams.streamscontext import IOTStreams
-from src.Streams.jsonschemas import init_create_streams_schema
-from src.Streams.streamcreator import creator_add_hugeint_type
-from src.Streams.streampolling import polling_add_hugeint_type
+from Streams.streamscontext import IOTStreams
+from Streams.jsonschemas import init_create_streams_schema
+from Streams.streamcreator import creator_add_hugeint_type
+from Streams.streampolling import polling_add_hugeint_type
 
 Connection = None
 
@@ -20,7 +20,7 @@ def init_monetdb_connection(hostname, po
         print log_message
         add_log(20, log_message)
 
-        if check_hugeint_type()[0] > 0:
+        if check_hugeint_type() > 0:
             polling_add_hugeint_type()
             creator_add_hugeint_type()
             init_create_streams_schema(add_hugeint=True)
@@ -40,7 +40,7 @@ def check_hugeint_type():
     Connection.execute("START TRANSACTION")
     cursor = Connection.cursor()
     cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'")
-    result = cursor.fetchall()
+    result = cursor.fetchall()[0]
     Connection.commit()
     return result
 
@@ -49,21 +49,22 @@ def mapi_get_webserver_streams():
     try:
         Connection.execute("START TRANSACTION")
         cursor = Connection.cursor()
-        sql_string = """SELECT tables."id", tables."name", schemas."name" as 
schema, tables."name" as table,
-            flushing."flushing", flushing."unit", flushing."interval" FROM 
(SELECT "id", "name", "schema_id"
-            FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", 
"name" FROM sys.schemas) AS schemas ON
-            (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", 
"flushing", "unit", "interval"
-            FROM iot.webserverflushing) AS flushing ON 
(tables."id"=flushing."table_id")""".replace('\n', ' ')
+        sql_string = """SELECT tables."id", tables."name", schemas."name" AS 
schema, tables."name" AS table,
+            extras."has_hostname", extras."flushing", extras."unit", 
extras."interval" FROM (SELECT "id", "name",
+            "schema_id" FROM sys.tables WHERE type=4) AS tables INNER JOIN 
(SELECT "id", "name" FROM sys.schemas)
+            AS schemas ON (tables."schema_id"=schemas."id") LEFT JOIN (SELECT 
"table_id", "has_hostname", "flushing",
+            "unit", "interval" FROM iot.webserverstreams) AS extras ON 
(tables."id"=extras."table_id")"""\
+            .replace('\n', ' ')
         cursor.execute(sql_string)
         tables = cursor.fetchall()
 
         cursor = Connection.cursor()
-        sql_string = """SELECT columns."table_id", columns."name" as column, 
columns."type", columns."type_digits",
+        sql_string = """SELECT columns."table_id", columns."name" AS column, 
columns."type", columns."type_digits",
             columns."type_scale", columns."default", columns."null", 
extras."special", extras."validation1",
             extras."validation2" FROM (SELECT "id", "table_id", "name", 
"type", "type_digits", "type_scale",
             "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT 
"id" FROM sys.tables WHERE type=4)
             AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT 
"column_id", "special", "validation1",
-            "validation2" FROM iot.webservervalidation) AS extras ON 
(columns."id"=extras."column_id")"""\
+            "validation2" FROM iot.webservercolumns) AS extras ON 
(columns."id"=extras."column_id")"""\
             .replace('\n', ' ')
         cursor.execute(sql_string)
         columns = cursor.fetchall()
@@ -78,50 +79,59 @@ def mapi_get_webserver_streams():
 def mapi_create_stream(stream):
     schema = stream.get_schema_name()
     table = stream.get_stream_name()
+    flush_statement = stream.get_webserverstreams_sql_statement()
+    columns_dictionary = stream.get_columns_extra_sql_statements()  # 
dictionary of column_name -> partial SQL statement
 
     try:
+        try:  # create schema if not exists, ignore the error if already exists
+            Connection.execute("START TRANSACTION")
+            Connection.execute("CREATE SCHEMA " + schema)
+            Connection.commit()
+        except:
+            Connection.commit()
         Connection.execute("START TRANSACTION")
-        try:  # create schema if not exists, ignore the error if already exists
-            Connection.execute("CREATE SCHEMA " + schema)
-        except:
-            pass
         Connection.execute(''.join(["CREATE STREAM TABLE ", 
IOTStreams.get_context_entry_name(schema, table), " (",
                                     stream.get_sql_create_statement(), ")"]))
+        cursor = Connection.cursor()
+        cursor.execute("SELECT id FROM sys.schemas WHERE \"name\"='" + schema 
+ "'")
+        schema_id = str(cursor.fetchall()[0][0])
+        cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=", 
schema_id, " AND \"name\"='", stream,
+                                "'"]))  # get the created table id
+        table_id = str(cursor.fetchall()[0][0])
+        cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (", 
table_id, flush_statement, ")"]))
+        cursor.execute('SELECT id, "name" FROM sys.columns WHERE table_id=' + 
table_id)
+        columns = cursor.fetchall()
 
-        cursor = Connection.cursor()
-        cursor.execute("SELECT id from sys.schemas where name='" + schema + 
"'")  # get the created table schema_id
-        schema_id = cursor.fetchall()[0]
-        cursor.execute(''.join(["SELECT id from sys.tables where schema_id=", 
str(schema_id), " AND name='", stream,
-                                "'"]))  # get the created table id
-        table_id = int(cursor.fetchall()[0])
-        cursor.execute(''.join(["INSERT INTO iot.webserverflushing VALUES (", 
str(table_id),
-                                stream.get_flushing_sql_statement(), ")"]))
+        inserts = []
+        colums_ids = ','.join(map(lambda x: str(x[0]), columns))
+        for key, value in columns_dictionary.iteritems():
+            for entry in columns:  # the imp_timestamp and host identifier are 
also fetched!!
+                if entry[1] == key:  # check for column's name
+                    inserts.append(''.join(['(', entry[0], value, ')']))  # 
append the sql statement
+                    break
 
+        cursor.execute("INSERT INTO iot.webservercolumns VALUES " + 
','.join(inserts))
         Connection.commit()
-        # TODO insert on the tables
-        stream.set_table_id(table_id)
+        stream.set_delete_ids(table_id, colums_ids)
     except BaseException as ex:
         add_log(50, ex)
-        return None
 
 
 def mapi_delete_stream(schema, stream, stream_id, columns_ids):
     try:
         Connection.execute("START TRANSACTION")
         Connection.execute("DROP TABLE " + 
IOTStreams.get_context_entry_name(schema, stream))
-        Connection.execute("DELETE FROM iot.webserverflushing WHERE table_id=" 
+ stream_id)
-        Connection.execute("DELETE FROM iot.webservervalidation WHERE 
column_id IN (" + ','.join(columns_ids) + ")")
-        return Connection.commit()
+        Connection.execute("DELETE FROM iot.webserverstreams WHERE table_id=" 
+ stream_id)
+        Connection.execute("DELETE FROM iot.webservercolumns WHERE column_id 
IN (" + columns_ids + ")")
+        Connection.commit()
     except BaseException as ex:
         add_log(50, ex)
-        return None
 
 
 def mapi_flush_baskets(schema, stream, baskets):
     try:
         Connection.execute("START TRANSACTION")
         Connection.execute(''.join(["CALL iot.basket('", schema, "','", 
stream, "','", baskets, "')"]))
-        return Connection.commit()
+        Connection.commit()
     except BaseException as ex:
         add_log(40, ex)
-        return 0
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -42,7 +42,6 @@ class StreamDataType(object):
             self._default_value = self.set_default_value(kwargs['default'])
         else:
             self._default_value = None
-        self._column_id = kwargs.get('id', None)
 
     def is_nullable(self):  # check if the column is nullable or not
         return self._is_nullable
@@ -57,12 +56,6 @@ class StreamDataType(object):
     def get_default_value(self):  # get the default value representation in 
the data type
         return self._default_value
 
-    def set_column_id(self, column_id):  # set column id for 
iot.webservervalidation table
-        self._column_id = column_id
-
-    def get_column_id(self):
-        return self._column_id
-
     def add_json_schema_entry(self, schema):  # add the entry for the stream's 
corresponding json schema
         dic = {}  # must be done after setting the default value!!!
         if self._default_value is not None:
diff --git a/clients/iotclient/src/Streams/jsonschemas.py 
b/clients/iotclient/src/Streams/jsonschemas.py
--- a/clients/iotclient/src/Streams/jsonschemas.py
+++ b/clients/iotclient/src/Streams/jsonschemas.py
@@ -17,7 +17,7 @@ INET6_TYPE = ["inet6"]
 REGEX_TYPE = ["regex"]
 ENUM_TYPE = ["enum"]
 BOOLEAN_TYPE = ["bool", "boolean"]
-SMALL_INTEGERS = ["tinyint", "smallint", "int", "integer", "wrd", "bigint"]
+SMALL_INTEGERS_TYPES = ["tinyint", "smallint", "int", "integer", "wrd", 
"bigint"]
 HUGE_INTEGER_TYPE = ["hugeint"]
 FLOATING_POINT_PRECISION_TYPES = ["real", "float", "double", "double 
precision"]
 DECIMAL_TYPES = ["dec", "decimal", "numeric"]
@@ -34,9 +34,9 @@ def init_create_streams_schema(add_hugei
     global CREATE_STREAMS_SCHEMA
 
     if add_hugeint:
-        integers = SMALL_INTEGERS + HUGE_INTEGER_TYPE
+        integers = SMALL_INTEGERS_TYPES + HUGE_INTEGER_TYPE
     else:
-        integers = SMALL_INTEGERS
+        integers = SMALL_INTEGERS_TYPES
 
     CREATE_STREAMS_SCHEMA = {
         "title": "JSON schema to create a stream",
@@ -46,6 +46,7 @@ def init_create_streams_schema(add_hugei
         "properties": {
             "schema": {"type": "string"},
             "stream": {"type": "string"},
+            "hostname": {"type": "boolean", "default": False},
             "flushing": {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to