Changeset: 6c4407fe078d for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6c4407fe078d Modified Files: clients/iotapi/src/Settings/filesystem.py clients/iotapi/src/Settings/mapiconnection.py clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/Streams/streamscontext.py clients/iotapi/src/Utilities/customthreading.py clients/iotapi/src/Utilities/readwritelock.py clients/iotapi/src/WebSockets/websockets.py clients/iotapi/src/main.py clients/iotclient/src/Flask/app.py clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/iotlogger.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/Utilities/customthreading.py clients/iotclient/src/Utilities/readwritelock.py clients/iotclient/src/main.py clients/iotclient/tests/main.py Branch: iot Log Message:
Fixed imports, created unique mapi connections for each stream, fixed small bugs diffs (truncated from 1302 to 300 lines): diff --git a/clients/iotapi/src/Settings/filesystem.py b/clients/iotapi/src/Settings/filesystem.py --- a/clients/iotapi/src/Settings/filesystem.py +++ b/clients/iotapi/src/Settings/filesystem.py @@ -1,7 +1,7 @@ import os import sys -from iotlogger import add_log +from .iotlogger import add_log Baskets_Location = None diff --git a/clients/iotapi/src/Settings/mapiconnection.py b/clients/iotapi/src/Settings/mapiconnection.py --- a/clients/iotapi/src/Settings/mapiconnection.py +++ b/clients/iotapi/src/Settings/mapiconnection.py @@ -1,7 +1,7 @@ import sys -import pymonetdb -from Settings.iotlogger import add_log +from pymonetdb import connect +from ..Settings.iotlogger import add_log Connection = None @@ -10,9 +10,8 @@ def init_monetdb_connection(hostname, po global Connection try: # the autocommit is set to true so each statement will be independent - Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=user_password, - database=database, autocommit=True) - Connection.execute("SET SCHEMA iot;") + Connection = connect(hostname=hostname, port=port, username=user_name, password=user_password, + database=database, autocommit=True) log_message = 'User %s connected successfully to database %s' % (user_name, database) print log_message add_log(20, log_message) @@ -26,16 +25,26 @@ def close_monetdb_connection(): Connection.close() +def check_hugeint_type(): + Connection.execute("START TRANSACTION") + cursor = Connection.cursor() + cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'") + result = cursor.fetchall()[0][0] + Connection.commit() + return result + + def fetch_streams(): - try: # TODO paginate results? + try: cursor = Connection.cursor() sql_string = """SELECT schemas."name" as schema, tables."name" as table, columns."name" as column, columns."type", columns."type_digits", columns."type_scale", columns."default", columns."null" FROM (SELECT "id", "name", "schema_id" FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" - FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT "table_id", "name", + FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT "table_id", "name", "type", "type_digits", "type_scale", "default", "null" FROM sys.columns) AS columns ON - (columns."table_id"=tables."id");""".replace('\n', ' ') # important STREAM TABLES TYPE is 4 + (columns."table_id"=tables."id")""".replace('\n', ' ') cursor.execute(sql_string) return cursor.fetchall() except BaseException as ex: add_log(50, ex) + raise diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -1,7 +1,7 @@ import struct + from abc import ABCMeta, abstractmethod from datetime import date, time, datetime - from dateutil.relativedelta import relativedelta LITTLE_ENDIAN_ALIGNMENT = '<' # for now is little-endian for Intel CPU's diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -1,14 +1,13 @@ from itertools import groupby +from .datatypes import * +from .streams import IOTStream +from .streamscontext import Streams_Context from Settings.mapiconnection import fetch_streams from Utilities.customthreading import PeriodicalThread -from datatypes import * -from streams import IOTStream -from streamscontext import Streams_Context -SWITCHER = [{'types': ['clob', 'url'], 'class': 'TextType'}, +Switcher = [{'types': ['clob', 'url'], 'class': 'TextType'}, {'types': ['char', 'varchar'], 'class': 'LimitedTextType'}, {'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class': 'SmallIntegerType'}, - {'types': ['hugeint'], 'class': 'HugeIntegerType'}, {'types': ['real', 'double'], 'class': 'FloatType'}, {'types': ['decimal'], 'class': 'DecimalType'}, {'types': ['boolean'], 'class': 'BooleanType'}, @@ -19,6 +18,10 @@ SWITCHER = [{'types': ['clob', 'url'], ' {'types': ['uuid'], 'class': 'UUIDType'}] +def polling_add_hugeint_type(): + Switcher.append({'types': ['hugeint'], 'class': 'HugeIntegerType'}) + + def init_stream_polling_thread(interval): thread = PeriodicalThread(interval=interval, worker_func=stream_polling) thread.start() @@ -30,15 +33,15 @@ def stream_polling(): array = fetch_streams() # TODO check whenever stream's columns are updated retained_streams = [] new_streams = {} - current_streams = Streams_Context.get_existing_streams() + current_streams = get_streams_context().get_existing_streams() if array is not None: - for key, group in groupby(array, lambda x: Streams_Context.get_context_entry_name(x[0], x[1])): + for key, group in groupby(array, lambda x: get_streams_context().get_context_entry_name(x[0], x[1])): if key not in current_streams: columns = {} for elem in group: - for entry in SWITCHER: # allocate the proper type wrapper + for entry in Switcher: # allocate the proper type wrapper if elem[3] in entry['types']: reflection_class = globals()[entry['class']] # import everything from datatypes!!! new_column = reflection_class(*elem[2:]) diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -1,13 +1,13 @@ import os -import struct -from datatypes import LITTLE_ENDIAN_ALIGNMENT +from collections import OrderedDict +from struct import unpack +from watchdog.events import FileSystemEventHandler, DirCreatedEvent, DirDeletedEvent +from watchdog.observers import Observer +from .datatypes import LITTLE_ENDIAN_ALIGNMENT from Settings.filesystem import get_baskets_base_location from Utilities.readwritelock import RWLock from WebSockets.websockets import notify_stream_inserts_to_clients -from watchdog.events import FileSystemEventHandler, DirCreatedEvent, DirDeletedEvent -from watchdog.observers import Observer -from collections import OrderedDict BASKETS_COUNT_FILE = 'count' @@ -40,7 +40,7 @@ class StreamBasketsHandler(FileSystemEve self._stream.delete_basket(basket_string) -class IOTStream(object): +class IOTStream: """Representation of a stream""" def __init__(self, schema_name, stream_name, columns): @@ -78,7 +78,7 @@ class IOTStream(object): def append_basket(self, path): if represents_int(path): with open(os.path.join(self._base_path, path, BASKETS_COUNT_FILE)) as f: - count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + 'i', f.read(4))[0] + count = unpack(LITTLE_ENDIAN_ALIGNMENT + 'i', f.read(4))[0] self._baskets_lock.acquire_write() self._baskets[int(path)] = count self._baskets_lock.release() diff --git a/clients/iotapi/src/Streams/streamscontext.py b/clients/iotapi/src/Streams/streamscontext.py --- a/clients/iotapi/src/Streams/streamscontext.py +++ b/clients/iotapi/src/Streams/streamscontext.py @@ -2,7 +2,7 @@ from Utilities.readwritelock import RWLo from WebSockets.websockets import unsubscribe_removed_streams -class IOTStreams(object): +class IOTStreams: """Streams context""" @classmethod diff --git a/clients/iotapi/src/Utilities/customthreading.py b/clients/iotapi/src/Utilities/customthreading.py --- a/clients/iotapi/src/Utilities/customthreading.py +++ b/clients/iotapi/src/Utilities/customthreading.py @@ -1,4 +1,4 @@ -import time +from time import sleep from threading import Thread, Event @@ -28,4 +28,4 @@ class PeriodicalThread(StoppableThread): def run(self): while not self.stop_event.is_set(): self._worker_func() - time.sleep(self._interval) + sleep(self._interval) diff --git a/clients/iotapi/src/Utilities/readwritelock.py b/clients/iotapi/src/Utilities/readwritelock.py --- a/clients/iotapi/src/Utilities/readwritelock.py +++ b/clients/iotapi/src/Utilities/readwritelock.py @@ -1,11 +1,9 @@ import threading -# Adapted from https://majid.info/blog/a-reader-writer-lock-for-python/ - - -class RWLock: - """A simple reader-writer lock Several readers can hold the lock simultaneously, XOR one writer. Write locks have priority over reads to prevent write starvation.""" +class RWLock: # Adapted from https://majid.info/blog/a-reader-writer-lock-for-python/ + """A simple reader-writer lock Several readers can hold the lock simultaneously, XOR one writer. + Write locks have priority over reads to prevent write starvation.""" def __init__(self): self.rwlock = 0 @@ -33,7 +31,7 @@ class RWLock: self.monitor.release() def promote(self): - """Promote an already-acquired read lock to a write lock WARNING: it is very easy to deadlock with this method""" + """Promote an already-acquired read lock to a write lock WARNING:it is very easy to deadlock with this method""" self.monitor.acquire() self.rwlock -= 1 while self.rwlock != 0: diff --git a/clients/iotapi/src/WebSockets/websockets.py b/clients/iotapi/src/WebSockets/websockets.py --- a/clients/iotapi/src/WebSockets/websockets.py +++ b/clients/iotapi/src/WebSockets/websockets.py @@ -1,12 +1,11 @@ -import json import sys +from json import loads, dumps +from jsonschema import Draft4Validator, FormatChecker +from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket +from .jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS, UNSUBSCRIBE_OPTS, READ_OPTS, INFO_OPTS from Settings.iotlogger import add_log -from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket from Utilities.readwritelock import RWLock -from jsonschema import Draft4Validator, FormatChecker - -from jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS, UNSUBSCRIBE_OPTS, READ_OPTS, INFO_OPTS Client_Messages_Validator = Draft4Validator(CLIENTS_INPUTS_SCHEMA, format_checker=FormatChecker()) WebSocketServer = None @@ -23,7 +22,7 @@ def unsubscribe_removed_streams(concaten for name in concatenated_names: add_log(20, ''.join(['Stream ', name, ' removed'])) -from Streams.streamscontext import Streams_Context, IOTStreams # avoid circular dependency +from ..Streams.streamscontext import Streams_Context, IOTStreams # avoid circular dependency def notify_stream_inserts_to_clients(schema_name, stream_name, basket_number, count): @@ -43,7 +42,7 @@ class IOTAPI(WebSocket): def sendJSONMessage(self, response, message): # IMPORTANT always use this method to send messages to clients!!!!! message['response'] = response - super(IOTAPI, self).sendMessage(json.dumps(message)) # send JSON Strings to clients + super(IOTAPI, self).sendMessage(dumps(message)) # send JSON Strings to clients def handleConnected(self): # overriden WebClientsLock.acquire_write() @@ -61,7 +60,7 @@ class IOTAPI(WebSocket): if self.opcode != 0x1: # TEXT frame self.sendJSONMessage(response="error", message={"message": "Only TEXT frames allowed!"}) try: - input_schema = json.loads(self.data) + input_schema = loads(self.data) Client_Messages_Validator.validate(input_schema) if input_schema['request'] in SUBSCRIBE_OPTS: diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py --- a/clients/iotapi/src/main.py +++ b/clients/iotapi/src/main.py @@ -9,7 +9,8 @@ from multiprocessing import Process from threading import Thread from Settings.filesystem import init_file_system, DEFAULT_FILESYSTEM from Settings.iotlogger import init_logging, add_log, DEFAULT_LOGGING -from Settings.mapiconnection import init_monetdb_connection +from Settings.mapiconnection import init_monetdb_connection, check_hugeint_type +from Streams.streampolling import polling_add_hugeint_type from Streams.streampolling import init_stream_polling_thread from WebSockets.websockets import init_websockets @@ -27,6 +28,10 @@ def start_process(polling_interval, file init_file_system(filesystem_location) # init filesystem # init mapi connection init_monetdb_connection(connection_hostname, con_port, con_user, con_password, con_database) + + if check_hugeint_type(): + polling_add_hugeint_type() + init_stream_polling_thread(polling_interval) # start polling thread1 = Thread(target=init_websockets, args=(sockets_host, sockets_port)) diff --git a/clients/iotclient/src/Flask/app.py b/clients/iotclient/src/Flask/app.py --- a/clients/iotclient/src/Flask/app.py +++ b/clients/iotclient/src/Flask/app.py @@ -1,6 +1,6 @@ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list