Changeset: 82ab5c86a247 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=82ab5c86a247 Modified Files: clients/iotapi/src/Settings/mapiconnection.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/WebSockets/websockets.py clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/main.py Branch: iot Log Message:
Several bugfixes diffs (truncated from 301 to 300 lines): diff --git a/clients/iotapi/src/Settings/mapiconnection.py b/clients/iotapi/src/Settings/mapiconnection.py --- a/clients/iotapi/src/Settings/mapiconnection.py +++ b/clients/iotapi/src/Settings/mapiconnection.py @@ -33,7 +33,7 @@ def fetch_streams(): try: # TODO paginate results? cursor = Connection.cursor() sql_string = """SELECT storage."schema", storage."table", storage."column", storage."type", storage."typewidth" - FROM (SELECT "schema", "table", "column", "type" FROM sys.storage) AS storage + FROM (SELECT "schema", "table", "column", "type", "typewidth" FROM sys.storage) AS storage INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON (storage."table"=tables."name") INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON (storage."schema"=schemas."name");"""\ .replace('\n', ' ') diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -4,7 +4,7 @@ import struct from datatypes import LITTLE_ENDIAN_ALIGNMENT from Settings.filesystem import get_baskets_base_location from Utilities.readwritelock import RWLock -from WebSockets.websockets import notify_clients +from WebSockets.websockets import notify_stream_inserts_to_clients from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -30,7 +30,7 @@ class StreamBasketsHandler(FileSystemEve if isinstance(event, 'DirCreatedEvent'): basket_string = os.path.basename(os.path.normpath(event.src_path)) count = self._stream.append_basket(basket_string) - notify_clients(self._stream.get_schema_name(), self._stream.get_stream_name(), count) + notify_stream_inserts_to_clients(self._stream.get_schema_name(), self._stream.get_stream_name(), count) def on_deleted(self, event): if isinstance(event, 'DirDeletedEvent'): diff --git a/clients/iotapi/src/WebSockets/websockets.py b/clients/iotapi/src/WebSockets/websockets.py --- a/clients/iotapi/src/WebSockets/websockets.py +++ b/clients/iotapi/src/WebSockets/websockets.py @@ -3,7 +3,6 @@ import sys from Settings.iotlogger import add_log from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket -from Streams.streamscontext import Streams_context, IOTStreams from Utilities.readwritelock import RWLock from jsonschema import Draft4Validator, FormatChecker @@ -15,14 +14,6 @@ WebClients = [] # TODO this probably wo WebClientsLock = RWLock() -def notify_stream_inserts_to_clients(schema_name, stream_name, count): - concatenated_name = IOTStreams.get_context_entry_name(schema_name, stream_name) - WebClientsLock.acquire_read() - for client in WebClients: - client.send_notification_message(concatenated_name, schema_name, stream_name, count) - WebClientsLock.release() - - def unsubscribe_removed_streams(concatenated_names): WebClientsLock.acquire_read() for name in concatenated_names: @@ -32,6 +23,16 @@ def unsubscribe_removed_streams(concaten for name in concatenated_names: add_log(20, ''.join(['Stream ', name, ' removed'])) +from Streams.streamscontext import Streams_context, IOTStreams # avoid circular dependency + + +def notify_stream_inserts_to_clients(schema_name, stream_name, count): + concatenated_name = IOTStreams.get_context_entry_name(schema_name, stream_name) + WebClientsLock.acquire_read() + for client in WebClients: + client.send_notification_message(concatenated_name, schema_name, stream_name, count) + WebClientsLock.release() + class IOTAPI(WebSocket): """Client WebSocket""" diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -8,7 +8,7 @@ from flask_restful import Resource from jsonschema import Draft4Validator, FormatChecker from tzlocal import get_localzone from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA -from Streams.streamscontext import IOTStreams +from Streams.streamscontext import IOTStreamsContext from Settings.iotlogger import add_log Streams_Context = None @@ -23,7 +23,7 @@ def init_rest_resources(): Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, format_checker=FormatChecker()) Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, format_checker=FormatChecker()) try: - Streams_Context = IOTStreams() + Streams_Context = IOTStreamsContext() except BaseException as ex: print >> sys.stdout, ex add_log(50, ex) @@ -80,6 +80,8 @@ class StreamsHandling(Resource): add_log(50, ex) return ex, 400 else: + add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], + ' was created']) return 'The stream was created with success!', 201 def delete(self): @@ -95,4 +97,6 @@ class StreamsHandling(Resource): except BaseException as ex: add_log(50, ex) return ex, 404 + add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], + ' was deleted']) return 'The stream was deleted with success!', 204 diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -1,4 +1,3 @@ -import getpass import sys import pymonetdb @@ -7,13 +6,11 @@ from Settings.iotlogger import add_log Connection = None -def init_monetdb_connection(hostname, port, user_name, database): +def init_monetdb_connection(hostname, port, user_name, connection_password, database): global Connection - user_password = getpass.getpass(prompt='Insert password for user ' + user_name + ':') - try: # the autocommit is set to true so each statement will be independent - Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=user_password, + Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=connection_password, database=database, autocommit=True) log_message = 'User %s connected successfully to database %s' % (user_name, database) print >> sys.stdout, log_message diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -4,11 +4,9 @@ import itertools import math import re import struct + +from dateutil import parser from abc import ABCMeta, abstractmethod - -import dateutil -from dateutil import parser - from jsonschemas import UUID_REGEX, MAC_ADDRESS_REGEX, TIME_REGEX, IPV4_REGEX # The null constants might change from system to system due to different CPU's limits @@ -704,7 +702,7 @@ class TimestampType(BaseDateTimeType): schema[self._column_name]['format'] = 'date-time' def parse_entry(self, entry): - parsed_timestamp = dateutil.parser.parse(entry) + parsed_timestamp = parser.parse(entry) if not self._has_timezone: parsed_timestamp = parsed_timestamp.replace(tzinfo=None) return parsed_timestamp diff --git a/clients/iotclient/src/Streams/streams.py b/clients/iotclient/src/Streams/streams.py --- a/clients/iotclient/src/Streams/streams.py +++ b/clients/iotclient/src/Streams/streams.py @@ -120,8 +120,8 @@ class BaseIOTStream(object): def flush_baskets(self, last=False): # the monitor has to be acquired in write mode before running this method!!! # write the tuple count in the basket - basket_counter_file_pointer = open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+") - basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT + "1i", self._tuples_in_per_basket)) + basket_counter_file_pointer = open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b") + basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT + "i", self._tuples_in_per_basket)) basket_counter_file_pointer.flush() basket_counter_file_pointer.close() mapi_flush_baskets(self._schema_name, self._stream_name, self._current_base_path) @@ -227,16 +227,19 @@ class TupleBasedStream(BaseIOTStream): def validate_and_insert(self, new_data, timestamp): super(TupleBasedStream, self).validate_and_insert(new_data, timestamp) + flag = False self._monitor.acquire_write() try: if self._tuples_in_per_basket >= self._limit: self.flush_baskets(last=False) + flag = True except BaseException as ex: self._monitor.release() add_log(50, ex) else: self._monitor.release() - add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name, self._stream_name)) + if flag: + add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name, self._stream_name)) class TimeBasedStream(BaseIOTStream): @@ -258,16 +261,19 @@ class TimeBasedStream(BaseIOTStream): return {'base': 'time', 'unit': self._time_unit, 'interval': self._interval} def time_based_flush(self): + flag = False self._monitor.acquire_write() try: - if self._tuples_in_per_basket > 0: + if self._tuples_in_per_basket > 0: # flush only when there are tuples in the baskets self.flush_baskets(last=False) + flag = True except BaseException as ex: self._monitor.release() add_log(50, ex) else: self._monitor.release() - add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name, self._stream_name)) + if flag: + add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name, self._stream_name)) def start_stream(self): self._local_thread.start() # start the time based flush on another thread diff --git a/clients/iotclient/src/Streams/streamscontext.py b/clients/iotclient/src/Streams/streamscontext.py --- a/clients/iotclient/src/Streams/streamscontext.py +++ b/clients/iotclient/src/Streams/streamscontext.py @@ -1,10 +1,11 @@ import json +import collections from Settings.filesystem import get_configfile_location from Utilities.readwritelock import RWLock - +from jsonschema import Draft4Validator, FormatChecker from jsonschemas import CONFIG_FILE_SCHEMA -from streamscreator import * +from streamscreator import validate_schema_and_create_stream Config_File_Location = None Config_File_Validator = None diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py --- a/clients/iotclient/src/main.py +++ b/clients/iotclient/src/main.py @@ -1,4 +1,5 @@ import getopt +import getpass import signal import sys import time @@ -21,14 +22,27 @@ def signal_handler(signal, frame): subprocess.terminate() -def start_process(admin_host, admin_port, app_host, app_port): +def start_process(admin_host, admin_port, app_host, app_port, host_identifier, new_configfile_location, + connection_hostname, connection_port, connection_user, connection_password, connection_database): + # WARNING The initiation order must be this!!! + init_logging() # init logging context + init_file_system(host_identifier, new_configfile_location) # init filesystem + init_streams_hosts() # init hostname column for streams + # init mapi connection + init_monetdb_connection(connection_hostname, connection_port, connection_user, connection_password, + connection_database) + init_streams_context() # init streams context + init_rest_resources() # init validators for RESTful requests + thread1 = Thread(target=start_flask_admin_app, args=(admin_host, admin_port)) thread2 = Thread(target=start_flask_iot_app, args=(app_host, app_port)) thread1.start() time.sleep(1) # problem while handling Flask's loggers, so it is used this sleep thread2.start() + add_log(20, 'Started IOT Stream Server') thread1.join() thread2.join() + add_log(20, 'Stopped IOT Stream Server') def main(argv): @@ -93,21 +107,13 @@ def main(argv): if not use_host_identifier: # in case of the user sets the host_identifier but not the use_host_identifier flag host_identifier = None - # WARNING The initiation order must be this!!! - init_logging() # init logging context - init_file_system(host_identifier, new_configfile_location) # init filesystem - init_streams_hosts() # init hostname column for streams - # init mapi connection - init_monetdb_connection(connection_hostname, connection_port, connection_user, connection_database) - init_streams_context() # init streams context - init_rest_resources() # init validators for RESTful requests - - subprocess = Process(target=start_process, args=(admin_host, admin_port, app_host, app_port)) + connection_password = getpass.getpass(prompt='Insert password for user ' + connection_user + ':') + subprocess = Process(target=start_process, args=(admin_host, admin_port, app_host, app_port, host_identifier, + new_configfile_location, connection_hostname, connection_port, + connection_user, connection_password, connection_database)) subprocess.start() - add_log(20, 'Started IOT Stream Server') signal.signal(signal.SIGINT, signal_handler) subprocess.join() - add_log(20, 'Stopped IOT Stream Server') if __name__ == "__main__": _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list