Changeset: dae88d156163 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=dae88d156163
Modified Files:
        clients/iotclient/documentation/conf.py
        clients/iotclient/src/Flask/app.py
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/main.py
Branch: iot
Log Message:

Made important corrections


diffs (truncated from 302 to 300 lines):

diff --git a/clients/iotclient/documentation/conf.py 
b/clients/iotclient/documentation/conf.py
--- a/clients/iotclient/documentation/conf.py
+++ b/clients/iotclient/documentation/conf.py
@@ -12,8 +12,8 @@
 # All configuration values have a default; values that are commented out
 # serve to show the default.
 
-import sys
-import os
+# import sys
+# import os
 
 # If extensions (or modules to document with autodoc) are in another directory,
 # add these directories to sys.path here. If the directory is relative to the
@@ -110,7 +110,7 @@ pygments_style = 'sphinx'
 # If true, keep warnings as "system message" paragraphs in the built documents.
 #keep_warnings = False
 
-# If true, `todo` and `todoList` produce output, else they produce nothing.
+# If true, `ttodo` and `todoList` produce output, else they produce nothing.
 todo_include_todos = True
 
 
diff --git a/clients/iotclient/src/Flask/app.py 
b/clients/iotclient/src/Flask/app.py
--- a/clients/iotclient/src/Flask/app.py
+++ b/clients/iotclient/src/Flask/app.py
@@ -1,6 +1,5 @@
 from flask import Flask
 from flask_restful import Api
-
 from restresources import StreamInput, StreamsInfo, StreamsHandling
 
 
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -8,38 +8,37 @@ from flask_restful import Resource
 from jsonschema import Draft4Validator, FormatChecker
 from tzlocal import get_localzone
 from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA
-from Streams.streamscontext import IOTStreamsException, IOTStreams
+from Streams.streamscontext import IOTStreams
 from Settings.iotlogger import add_log
 
 Streams_Context = None
 Create_Streams_Validator = None
 Delete_Streams_Validator = None
-Local_Timezone = None
+Local_Timezone = get_localzone()  # for the correction of dates we must add 
the system's timezone
 
 
 def init_rest_resources():
     global Streams_Context, Create_Streams_Validator, 
Delete_Streams_Validator, Local_Timezone
 
-    Local_Timezone = get_localzone()  # for the correction of dates we must 
add the system's timezone
     Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
     Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
     try:
         Streams_Context = IOTStreams()
     except BaseException as ex:
         print >> sys.stdout, ex
-        add_log(50, ex.message)
+        add_log(50, ex)
         sys.exit(1)
 
 
-class StreamInput(Resource):  # TODO these operations are not atomic!!!
+class StreamInput(Resource):
     """RESTful API for stream's input"""
 
     def get(self, schema_name, stream_name):  # check a single stream data
         try:  # check if stream exists, if not return 404
             stream = Streams_Context.get_existing_stream(schema_name, 
stream_name)
         except BaseException as ex:
-            add_log(50, ex.message)
-            return ex.message, 404
+            add_log(50, ex)
+            return ex, 404
         return stream.get_data_dictionary(include_number_tuples=True), 200
 
     def post(self, schema_name, stream_name):  # add data to a stream
@@ -48,14 +47,14 @@ class StreamInput(Resource):  # TODO the
         try:  # check if stream exists, if not return 404
             stream = Streams_Context.get_existing_stream(schema_name, 
stream_name)
         except BaseException as ex:
-            add_log(50, ex.message)
-            return ex.message, 404
+            add_log(50, ex)
+            return ex, 404
 
         try:  # validate and insert data, if not return 400
             stream.validate_and_insert(json.loads(request.data), current_stamp)
         except BaseException as ex:
-            add_log(50, ex.message)
-            return ex.message, 400
+            add_log(50, ex)
+            return ex, 400
         return 'The insertions were made with success!', 201
 
 
@@ -78,8 +77,8 @@ class StreamsHandling(Resource):
             Create_Streams_Validator.validate(schema_to_validate)
             Streams_Context.add_new_stream(schema_to_validate)
         except BaseException as ex:
-            add_log(50, ex.message)
-            return ex.message, 400
+            add_log(50, ex)
+            return ex, 400
         else:
             return 'The stream was created with success!', 201
 
@@ -88,12 +87,12 @@ class StreamsHandling(Resource):
             schema_to_validate = json.loads(request.data)
             Delete_Streams_Validator.validate(schema_to_validate)
         except BaseException as ex:
-            add_log(50, ex.message)
-            return ex.message, 400
+            add_log(50, ex)
+            return ex, 400
 
         try:  # check if stream exists, if not return 404
             Streams_Context.delete_existing_stream(schema_to_validate)
         except BaseException as ex:
-            add_log(50, ex.message)
-            return ex.message, 404
+            add_log(50, ex)
+            return ex, 404
         return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -39,7 +39,7 @@ def init_file_system(host_identifier=Non
         Host_Identifier = host_identifier
     except (Exception, OSError) as ex:
         print >> sys.stdout, ex
-        add_log(50, ex.message)
+        add_log(50, ex)
         sys.exit(1)
 
 
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -22,8 +22,8 @@ def init_monetdb_connection(hostname, po
         print >> sys.stdout, log_message
         add_log(20, log_message)
     except BaseException as ex:
-        print >> sys.stdout, ex.message
-        add_log(50, ex.message)
+        print >> sys.stdout, ex
+        add_log(50, ex)
         sys.exit(1)
 
 
@@ -38,15 +38,21 @@ def mapi_create_stream(schema, stream, c
         pass
 
     try:  # attempt to create te stream table
-        Connection.execute(''.join(["CREATE STREAM TABLE ", schema, ".", 
stream, " (", columns, ");"]))
+        Connection.execute("SET SCHEMA " + schema + ";")
+        Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", 
columns, ");"]))
     except BaseException as ex:
-        add_log(40, ex.message)
+        add_log(40, ex)
         pass
 
 
 def mapi_flush_baskets(schema, stream, baskets):
     try:
-        Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", 
stream, "\",\"", baskets, "\");"]))
+        Connection.execute("SET SCHEMA iot;")
+    except:
+        pass
+
+    try:
+        Connection.execute(''.join(["CALL iot.basket('", schema, "','", 
stream, "','", baskets, "');"]))
     except BaseException as ex:
-        add_log(40, ex.message)
+        add_log(40, ex)
         pass
diff --git a/clients/iotclient/src/Streams/streams.py 
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -38,8 +38,8 @@ def init_streams_hosts():
 class StreamException(Exception):
     """Exception fired when the validation of a stream insert fails"""
 
-    def __init__(self, messages):
-        self.message = messages  # dictionary of column -> list of error 
messages
+    def __init__(self, message):
+        self.message = message  # dictionary of column -> list of error 
messages
 
 
 class DataCellStream(object):
@@ -134,7 +134,7 @@ class DataCellStream(object):
             if self._tuples_in_per_basket > 0:
                 self.flush_baskets(last)
         except BaseException as ex:
-            add_log(50, ex.message)
+            add_log(50, ex)
         self._monitor.release()
 
     def validate_and_insert(self, new_data, timestamp):
@@ -170,7 +170,7 @@ class DataCellStream(object):
             try:
                 transposed_data[key] = data_type.process_values(values)  # 
convert into binary
             except DataValidationException as ex:
-                batch_errors[key] = ex.message
+                batch_errors[key] = ex
 
         if batch_errors:
             raise StreamException(message=batch_errors)
@@ -213,8 +213,9 @@ class DataCellStream(object):
             self._tuples_in_per_basket += total_tuples
             if is_flushing_tuple_based and self._tuples_in_per_basket >= 
self._flush_method.limit:
                 self.flush_baskets(last=False)
-
+        except BaseException as ex:
+            self._monitor.release()
+            add_log(50, ex)
+        else:
+            self._monitor.release()
             add_log(20, 'Inserted %d tuples to stream %s.%s' % (total_tuples, 
self._schema_name, self._stream_name))
-        except BaseException as ex:
-            add_log(50, ex.message)
-        self._monitor.release()
diff --git a/clients/iotclient/src/Streams/streamscreator.py 
b/clients/iotclient/src/Streams/streamscreator.py
--- a/clients/iotclient/src/Streams/streamscreator.py
+++ b/clients/iotclient/src/Streams/streamscreator.py
@@ -53,7 +53,7 @@ def validate_schema_and_create_stream(sc
                         new_column.set_nullable(column['nullable'])
                     validated_columns[next_name] = new_column
                 except Exception as ex:
-                    errors[next_name] = ex.message
+                    errors[next_name] = ex
                 break
 
     if errors:
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -1,8 +1,10 @@
 import getopt
 import signal
 import sys
+import time
 
 from multiprocessing import Process
+from threading import Thread
 from uuid import getnode as get_mac
 from Flask.app import start_flask_iot_app, start_flask_admin_app
 from Flask.restresources import init_rest_resources
@@ -12,17 +14,25 @@ from Settings.mapiconnection import init
 from Streams.streamscontext import init_streams_context
 from Streams.streams import init_streams_hosts
 
-subprocess1 = None
-subprocess2 = None
+subprocess = None
 
 
 def signal_handler(signal, frame):
-    subprocess1.terminate()
-    subprocess2.terminate()
+    subprocess.terminate()
+
+
+def start_process(admin_host, admin_port, app_host, app_port):
+    thread1 = Thread(target=start_flask_admin_app, args=(admin_host, 
admin_port))
+    thread2 = Thread(target=start_flask_iot_app, args=(app_host, app_port))
+    thread1.start()
+    time.sleep(1)  # problem while handling Flask's loggers, so it is used 
this sleep
+    thread2.start()
+    thread1.join()
+    thread2.join()
 
 
 def main(argv):
-    global subprocess1, subprocess2
+    global subprocess
 
     try:
         opts, args = getopt.getopt(argv[1:], 'f:l:c:ui:in:ih:ip:ah:ap:h:p:d:u',
@@ -92,15 +102,11 @@ def main(argv):
     init_streams_context()  # init streams context
     init_rest_resources()  # init validators for RESTful requests
 
-    subprocess1 = Process(target=start_flask_admin_app, args=(admin_host, 
admin_port))
-    subprocess2 = Process(target=start_flask_iot_app, args=(app_host, 
app_port))
-    subprocess1.start()
-    subprocess2.start()
+    subprocess = Process(target=start_process, args=(admin_host, admin_port, 
app_host, app_port))
+    subprocess.start()
     add_log(20, 'Started IOT Stream Server')
     signal.signal(signal.SIGINT, signal_handler)
-
-    subprocess1.join()
-    subprocess2.join()
+    subprocess.join()
     add_log(20, 'Stopped IOT Stream Server')
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to