Changeset: 82ab5c86a247 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=82ab5c86a247
Modified Files:
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/WebSockets/websockets.py
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/main.py
Branch: iot
Log Message:

Several bugfixes


diffs (truncated from 301 to 300 lines):

diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -33,7 +33,7 @@ def fetch_streams():
     try:  # TODO paginate results?
         cursor = Connection.cursor()
         sql_string = """SELECT storage."schema", storage."table", 
storage."column", storage."type", storage."typewidth"
-          FROM (SELECT "schema", "table", "column", "type" FROM sys.storage) 
AS storage
+          FROM (SELECT "schema", "table", "column", "type", "typewidth" FROM 
sys.storage) AS storage
           INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON 
(storage."table"=tables."name")
           INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON 
(storage."schema"=schemas."name");"""\
             .replace('\n', ' ')
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -4,7 +4,7 @@ import struct
 from datatypes import LITTLE_ENDIAN_ALIGNMENT
 from Settings.filesystem import get_baskets_base_location
 from Utilities.readwritelock import RWLock
-from WebSockets.websockets import notify_clients
+from WebSockets.websockets import notify_stream_inserts_to_clients
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
 
@@ -30,7 +30,7 @@ class StreamBasketsHandler(FileSystemEve
         if isinstance(event, 'DirCreatedEvent'):
             basket_string = os.path.basename(os.path.normpath(event.src_path))
             count = self._stream.append_basket(basket_string)
-            notify_clients(self._stream.get_schema_name(), 
self._stream.get_stream_name(), count)
+            notify_stream_inserts_to_clients(self._stream.get_schema_name(), 
self._stream.get_stream_name(), count)
 
     def on_deleted(self, event):
         if isinstance(event, 'DirDeletedEvent'):
diff --git a/clients/iotapi/src/WebSockets/websockets.py 
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -3,7 +3,6 @@ import sys
 
 from Settings.iotlogger import add_log
 from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
-from Streams.streamscontext import Streams_context, IOTStreams
 from Utilities.readwritelock import RWLock
 from jsonschema import Draft4Validator, FormatChecker
 
@@ -15,14 +14,6 @@ WebClients = []  # TODO this probably wo
 WebClientsLock = RWLock()
 
 
-def notify_stream_inserts_to_clients(schema_name, stream_name, count):
-    concatenated_name = IOTStreams.get_context_entry_name(schema_name, 
stream_name)
-    WebClientsLock.acquire_read()
-    for client in WebClients:
-        client.send_notification_message(concatenated_name, schema_name, 
stream_name, count)
-    WebClientsLock.release()
-
-
 def unsubscribe_removed_streams(concatenated_names):
     WebClientsLock.acquire_read()
     for name in concatenated_names:
@@ -32,6 +23,16 @@ def unsubscribe_removed_streams(concaten
     for name in concatenated_names:
         add_log(20, ''.join(['Stream ', name, ' removed']))
 
+from Streams.streamscontext import Streams_context, IOTStreams  # avoid 
circular dependency
+
+
+def notify_stream_inserts_to_clients(schema_name, stream_name, count):
+    concatenated_name = IOTStreams.get_context_entry_name(schema_name, 
stream_name)
+    WebClientsLock.acquire_read()
+    for client in WebClients:
+        client.send_notification_message(concatenated_name, schema_name, 
stream_name, count)
+    WebClientsLock.release()
+
 
 class IOTAPI(WebSocket):
     """Client WebSocket"""
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -8,7 +8,7 @@ from flask_restful import Resource
 from jsonschema import Draft4Validator, FormatChecker
 from tzlocal import get_localzone
 from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA
-from Streams.streamscontext import IOTStreams
+from Streams.streamscontext import IOTStreamsContext
 from Settings.iotlogger import add_log
 
 Streams_Context = None
@@ -23,7 +23,7 @@ def init_rest_resources():
     Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
     Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
     try:
-        Streams_Context = IOTStreams()
+        Streams_Context = IOTStreamsContext()
     except BaseException as ex:
         print >> sys.stdout, ex
         add_log(50, ex)
@@ -80,6 +80,8 @@ class StreamsHandling(Resource):
             add_log(50, ex)
             return ex, 400
         else:
+            add_log(20, ''.join['The stream ', schema_to_validate['schema'], 
'.', schema_to_validate['stream'],
+                                ' was created'])
             return 'The stream was created with success!', 201
 
     def delete(self):
@@ -95,4 +97,6 @@ class StreamsHandling(Resource):
         except BaseException as ex:
             add_log(50, ex)
             return ex, 404
+        add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.', 
schema_to_validate['stream'],
+                            ' was deleted'])
         return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -1,4 +1,3 @@
-import getpass
 import sys
 
 import pymonetdb
@@ -7,13 +6,11 @@ from Settings.iotlogger import add_log
 Connection = None
 
 
-def init_monetdb_connection(hostname, port, user_name, database):
+def init_monetdb_connection(hostname, port, user_name, connection_password, 
database):
     global Connection
 
-    user_password = getpass.getpass(prompt='Insert password for user ' + 
user_name + ':')
-
     try:  # the autocommit is set to true so each statement will be independent
-        Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
+        Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=connection_password,
                                        database=database, autocommit=True)
         log_message = 'User %s connected successfully to database %s' % 
(user_name, database)
         print >> sys.stdout, log_message
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -4,11 +4,9 @@ import itertools
 import math
 import re
 import struct
+
+from dateutil import parser
 from abc import ABCMeta, abstractmethod
-
-import dateutil
-from dateutil import parser
-
 from jsonschemas import UUID_REGEX, MAC_ADDRESS_REGEX, TIME_REGEX, IPV4_REGEX
 
 # The null constants might change from system to system due to different CPU's 
limits
@@ -704,7 +702,7 @@ class TimestampType(BaseDateTimeType):  
         schema[self._column_name]['format'] = 'date-time'
 
     def parse_entry(self, entry):
-        parsed_timestamp = dateutil.parser.parse(entry)
+        parsed_timestamp = parser.parse(entry)
         if not self._has_timezone:
             parsed_timestamp = parsed_timestamp.replace(tzinfo=None)
         return parsed_timestamp
diff --git a/clients/iotclient/src/Streams/streams.py 
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -120,8 +120,8 @@ class BaseIOTStream(object):
 
     def flush_baskets(self, last=False):  # the monitor has to be acquired in 
write mode before running this method!!!
         # write the tuple count in the basket
-        basket_counter_file_pointer = 
open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+")
-        basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT 
+ "1i", self._tuples_in_per_basket))
+        basket_counter_file_pointer = 
open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b")
+        basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT 
+ "i", self._tuples_in_per_basket))
         basket_counter_file_pointer.flush()
         basket_counter_file_pointer.close()
         mapi_flush_baskets(self._schema_name, self._stream_name, 
self._current_base_path)
@@ -227,16 +227,19 @@ class TupleBasedStream(BaseIOTStream):
 
     def validate_and_insert(self, new_data, timestamp):
         super(TupleBasedStream, self).validate_and_insert(new_data, timestamp)
+        flag = False
         self._monitor.acquire_write()
         try:
             if self._tuples_in_per_basket >= self._limit:
                 self.flush_baskets(last=False)
+                flag = True
         except BaseException as ex:
             self._monitor.release()
             add_log(50, ex)
         else:
             self._monitor.release()
-            add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name, 
self._stream_name))
+            if flag:
+                add_log(20, 'Flushed stream %s.%s baskets' % 
(self._schema_name, self._stream_name))
 
 
 class TimeBasedStream(BaseIOTStream):
@@ -258,16 +261,19 @@ class TimeBasedStream(BaseIOTStream):
         return {'base': 'time', 'unit': self._time_unit, 'interval': 
self._interval}
 
     def time_based_flush(self):
+        flag = False
         self._monitor.acquire_write()
         try:
-            if self._tuples_in_per_basket > 0:
+            if self._tuples_in_per_basket > 0:  # flush only when there are 
tuples in the baskets
                 self.flush_baskets(last=False)
+                flag = True
         except BaseException as ex:
             self._monitor.release()
             add_log(50, ex)
         else:
             self._monitor.release()
-            add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name, 
self._stream_name))
+            if flag:
+                add_log(20, 'Flushed stream %s.%s baskets' % 
(self._schema_name, self._stream_name))
 
     def start_stream(self):
         self._local_thread.start()  # start the time based flush on another 
thread
diff --git a/clients/iotclient/src/Streams/streamscontext.py 
b/clients/iotclient/src/Streams/streamscontext.py
--- a/clients/iotclient/src/Streams/streamscontext.py
+++ b/clients/iotclient/src/Streams/streamscontext.py
@@ -1,10 +1,11 @@
 import json
+import collections
 
 from Settings.filesystem import get_configfile_location
 from Utilities.readwritelock import RWLock
-
+from jsonschema import Draft4Validator, FormatChecker
 from jsonschemas import CONFIG_FILE_SCHEMA
-from streamscreator import *
+from streamscreator import validate_schema_and_create_stream
 
 Config_File_Location = None
 Config_File_Validator = None
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -1,4 +1,5 @@
 import getopt
+import getpass
 import signal
 import sys
 import time
@@ -21,14 +22,27 @@ def signal_handler(signal, frame):
     subprocess.terminate()
 
 
-def start_process(admin_host, admin_port, app_host, app_port):
+def start_process(admin_host, admin_port, app_host, app_port, host_identifier, 
new_configfile_location,
+                  connection_hostname, connection_port, connection_user, 
connection_password, connection_database):
+    # WARNING The initiation order must be this!!!
+    init_logging()  # init logging context
+    init_file_system(host_identifier, new_configfile_location)  # init 
filesystem
+    init_streams_hosts()  # init hostname column for streams
+    # init mapi connection
+    init_monetdb_connection(connection_hostname, connection_port, 
connection_user, connection_password,
+                            connection_database)
+    init_streams_context()  # init streams context
+    init_rest_resources()  # init validators for RESTful requests
+
     thread1 = Thread(target=start_flask_admin_app, args=(admin_host, 
admin_port))
     thread2 = Thread(target=start_flask_iot_app, args=(app_host, app_port))
     thread1.start()
     time.sleep(1)  # problem while handling Flask's loggers, so it is used 
this sleep
     thread2.start()
+    add_log(20, 'Started IOT Stream Server')
     thread1.join()
     thread2.join()
+    add_log(20, 'Stopped IOT Stream Server')
 
 
 def main(argv):
@@ -93,21 +107,13 @@ def main(argv):
     if not use_host_identifier:  # in case of the user sets the 
host_identifier but not the use_host_identifier flag
         host_identifier = None
 
-    # WARNING The initiation order must be this!!!
-    init_logging()  # init logging context
-    init_file_system(host_identifier, new_configfile_location)  # init 
filesystem
-    init_streams_hosts()  # init hostname column for streams
-    # init mapi connection
-    init_monetdb_connection(connection_hostname, connection_port, 
connection_user, connection_database)
-    init_streams_context()  # init streams context
-    init_rest_resources()  # init validators for RESTful requests
-
-    subprocess = Process(target=start_process, args=(admin_host, admin_port, 
app_host, app_port))
+    connection_password = getpass.getpass(prompt='Insert password for user ' + 
connection_user + ':')
+    subprocess = Process(target=start_process, args=(admin_host, admin_port, 
app_host, app_port, host_identifier,
+                                                     new_configfile_location, 
connection_hostname, connection_port,
+                                                     connection_user, 
connection_password, connection_database))
     subprocess.start()
-    add_log(20, 'Started IOT Stream Server')
     signal.signal(signal.SIGINT, signal_handler)
     subprocess.join()
-    add_log(20, 'Stopped IOT Stream Server')
 
 if __name__ == "__main__":
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to