Changeset: ee944c8e9c97 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ee944c8e9c97 Removed Files: clients/iotclient/src/Streams/flushing.py Modified Files: clients/iotapi/src/Settings/mapiconnection.py clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/Streams/streamscontext.py clients/iotapi/src/WebSockets/jsonschemas.py clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/Utilities/filecreator.py Branch: iot Log Message:
Cleaned code diffs (truncated from 578 to 300 lines): diff --git a/clients/iotapi/src/Settings/mapiconnection.py b/clients/iotapi/src/Settings/mapiconnection.py --- a/clients/iotapi/src/Settings/mapiconnection.py +++ b/clients/iotapi/src/Settings/mapiconnection.py @@ -12,9 +12,6 @@ def init_monetdb_connection(hostname, po user_password = getpass.getpass(prompt='Insert password for user ' + user_name + ':') - if user_password == '': - user_password = 'monetdb' - try: # the autocommit is set to true so each statement will be independent Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=user_password, database=database, autocommit=True) @@ -35,15 +32,12 @@ def close_monetdb_connection(): def fetch_streams(): try: # TODO paginate results? cursor = Connection.cursor() - sql_string = """ - SELECT storage."schema", storage."table", storage."column", storage."type", storage."location", - storage."typewidth" + sql_string = """SELECT storage."schema", storage."table", storage."column", storage."type", storage."typewidth" FROM (SELECT "schema", "table", "column", "type" FROM sys.storage) AS storage INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON (storage."table"=tables."name") - INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON (storage."schema"=schemas."name"); - """.replace('\n', ' ') + INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON (storage."schema"=schemas."name");"""\ + .replace('\n', ' ') cursor.execute(sql_string) return cursor.fetchall() except BaseException as ex: - print >> sys.stdout, ex add_log(50, ex) diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -2,8 +2,9 @@ from itertools import groupby from Settings.mapiconnection import fetch_streams from Utilities.customthreading import PeriodicalThread - -from streamscontext import Streams_context, DataCellStream +from datatypes import * +from streams import IOTStream +from streamscontext import Streams_context SWITCHER = [{'types': ['clob', 'char', 'varchar', 'url'], 'class': 'TextType'}, {'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class': 'SmallIntegerType'}, @@ -23,7 +24,7 @@ def init_stream_polling_thread(interval) thread.start() -# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, elem[4] is location, elem[5] is typewidth +# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, elem[4] is typewidth def stream_polling(): current_streams = Streams_context.get_existing_streams() retained_streams = [] @@ -35,11 +36,13 @@ def stream_polling(): columns = {} for elem in group: - reflection_class = globals()[elem[3]] # import everything from datatypes!!! - kwargs = {'name': elem[2], 'type': elem[3], 'location': elem[4], 'typewidth': elem[5]} - new_column = reflection_class(kwargs) - columns[elem[2]] = new_column - new_streams[key] = DataCellStream(key, columns) + for entry in SWITCHER: # allocate the proper type wrapper + if elem[3] in entry['types']: + reflection_class = globals()[entry['class']] # import everything from datatypes!!! + new_column = reflection_class({'name': elem[2], 'type': elem[3], 'typewidth': elem[4]}) + columns[elem[2]] = new_column + new_streams[key] = IOTStream(key, columns) + break else: retained_streams.append(key) diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -1,6 +1,7 @@ import os import struct +from datatypes import LITTLE_ENDIAN_ALIGNMENT from Settings.filesystem import get_baskets_base_location from Utilities.readwritelock import RWLock from WebSockets.websockets import notify_clients @@ -63,7 +64,7 @@ class IOTStream(object): def append_basket(self, path): if represents_int(path): with open(os.path.join(self._base_path, path)) as f: - count = struct.unpack('i', f.read(4))[0] + count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + '1i', f.read(4))[0] self._lock.acquire_write() self._baskets[int(path)] = count self._lock.release() diff --git a/clients/iotapi/src/Streams/streamscontext.py b/clients/iotapi/src/Streams/streamscontext.py --- a/clients/iotapi/src/Streams/streamscontext.py +++ b/clients/iotapi/src/Streams/streamscontext.py @@ -1,7 +1,5 @@ -import collections - from Utilities.readwritelock import RWLock -from WebSockets.websockets import desubscribe_removed_streams +from WebSockets.websockets import unsubscribe_removed_streams class IOTStreams(object): @@ -12,7 +10,7 @@ class IOTStreams(object): return schema_name + '.' + stream_name def __init__(self): - self._context = collections.OrderedDict() # dictionary of schema_name + '.' + stream_name -> DataCellStream + self._context = {} # dictionary of schema_name + '.' + stream_name -> DataCellStream self._locker = RWLock() def get_existing_streams(self): @@ -28,7 +26,7 @@ class IOTStreams(object): del self._context[k] self._context.update(new_streams) self._locker.release() - desubscribe_removed_streams(removed_streams) + unsubscribe_removed_streams(removed_streams) def get_existing_stream(self, concatenated_name): self._locker.acquire_read() diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py b/clients/iotapi/src/WebSockets/jsonschemas.py --- a/clients/iotapi/src/WebSockets/jsonschemas.py +++ b/clients/iotapi/src/WebSockets/jsonschemas.py @@ -19,17 +19,11 @@ CLIENTS_INPUTS_SCHEMA = { "additionalProperties": False }, { "properties": { - "action": {"type": "string", "enum": ["info"]} - }, - "required": ["action"], - "additionalProperties": False - }, { - "properties": { "schema": {"type": "string"}, "stream": {"type": "string"}, "action": {"type": "string", "enum": READ_OPTS}, "basket": {"type": "integer", "minimum": 1, "default": 1}, - "limit": {"type": "integer", "minimum": 0, "default": 0}, + "limit": {"type": "integer", "minimum": 0, "default": 100}, "offset": {"type": "integer", "minimum": 0, "default": 0} }, "required": ["schema", "stream", "action"], diff --git a/clients/iotclient/src/Settings/filesystem.py b/clients/iotclient/src/Settings/filesystem.py --- a/clients/iotclient/src/Settings/filesystem.py +++ b/clients/iotclient/src/Settings/filesystem.py @@ -32,10 +32,10 @@ def init_file_system(host_identifier=Non os.makedirs(Baskets_Location) if new_configfile_location is not None: - Config_File_Location = create_file_if_not_exists(new_configfile_location, hidden=False, init_text='[]') + Config_File_Location = create_file_if_not_exists(new_configfile_location, init_text='[]') else: Config_File_Location = create_file_if_not_exists( - os.path.join(Filesystem_Location, CONFIG_FILE_DEFAULT_NAME), hidden=False, init_text='[]') + os.path.join(Filesystem_Location, CONFIG_FILE_DEFAULT_NAME), init_text='[]') Host_Identifier = host_identifier except (Exception, OSError) as ex: diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -12,9 +12,6 @@ def init_monetdb_connection(hostname, po user_password = getpass.getpass(prompt='Insert password for user ' + user_name + ':') - if user_password == '': - user_password = 'monetdb' - try: # the autocommit is set to true so each statement will be independent Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=user_password, database=database, autocommit=True) diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -541,7 +541,7 @@ class DecimalType(NumberBaseType): def check_value_precision(self, value, text): number_digits = int(math.ceil(math.log10(abs(value)))) if number_digits > self._precision: - raise Exception('Too many digits on %s value: %s > %s!' % (text, number_digits, self._precision)) + raise Exception('Too many digits on %s: %s > %s!' % (text, number_digits, self._precision)) def add_json_schema_entry(self, schema): super(DecimalType, self).add_json_schema_entry(schema) diff --git a/clients/iotclient/src/Streams/flushing.py b/clients/iotclient/src/Streams/flushing.py deleted file mode 100644 --- a/clients/iotclient/src/Streams/flushing.py +++ /dev/null @@ -1,52 +0,0 @@ -from Utilities.customthreading import PeriodicalThread -from abc import ABCMeta, abstractmethod - - -class StreamFlushingMethod(object): - """Base class for flushing""" - - __metaclass__ = ABCMeta - - def __init__(self): - pass - - @abstractmethod - def get_dictionary_info(self): - pass - - -class TimeBasedFlushing(StreamFlushingMethod): - """Time based flushing""" - - def __init__(self, interval, time_unit): - super(TimeBasedFlushing, self).__init__() - self._interval = interval - self._time_unit = time_unit - self._local_thread = None - - def init_local_thread(self, stream): - if self._time_unit == "s": - interval = self._interval - elif self._time_unit == "m": - interval = self._interval * 60 - else: - interval = self._interval * 3600 - self._local_thread = PeriodicalThread(interval, stream.time_based_flush) - self._local_thread.start() - - def stop_local_thread(self): - self._local_thread.stop() - - def get_dictionary_info(self): - return {'base': 'time', 'unit': self._time_unit, 'interval': self._interval} - - -class TupleBasedFlushing(StreamFlushingMethod): - """Tuple based flushing""" - - def __init__(self, limit): - super(TupleBasedFlushing, self).__init__() - self.limit = limit - - def get_dictionary_info(self): - return {'base': 'tuple', 'number': self.limit} diff --git a/clients/iotclient/src/Streams/streams.py b/clients/iotclient/src/Streams/streams.py --- a/clients/iotclient/src/Streams/streams.py +++ b/clients/iotclient/src/Streams/streams.py @@ -1,14 +1,24 @@ import os +import struct +from abc import ABCMeta, abstractmethod from collections import defaultdict, OrderedDict from Settings.filesystem import get_baskets_base_location, get_host_identifier from Settings.iotlogger import add_log from Settings.mapiconnection import mapi_create_stream, mapi_flush_baskets -from Utilities.filecreator import create_file_if_not_exists, get_hidden_file_name +from Utilities.filecreator import create_file_if_not_exists from Utilities.readwritelock import RWLock -from datatypes import TimestampType, TextType, DataValidationException -from flushing import TimeBasedFlushing, TupleBasedFlushing +from Utilities.customthreading import PeriodicalThread +from datatypes import TimestampType, TextType, DataValidationException, LITTLE_ENDIAN_ALIGNMENT + +IMPLICIT_TIMESTAMP_COLUMN_NAME = 'implicit_timestamp' +Timestamps_Handler = TimestampType(name=IMPLICIT_TIMESTAMP_COLUMN_NAME, type="timestamp") # timestamp +Extra_columns_SQL = [Timestamps_Handler.create_stream_sql()] # array for SQL creation + +HOST_IDENTIFIER_COLUMN_NAME = 'host_identifier' +Hostname_Bin_Value = None +BASKETS_COUNT_FILE = 'count' def represents_int(s): @@ -18,13 +28,6 @@ def represents_int(s): except ValueError: return False -IMPLICIT_TIMESTAMP_COLUMN_NAME = 'implicit_timestamp' -Timestamps_Handler = TimestampType(name=IMPLICIT_TIMESTAMP_COLUMN_NAME, type="timestamp") # timestamp -Extra_columns_SQL = [Timestamps_Handler.create_stream_sql()] # array for SQL creation - -HOST_IDENTIFIER_COLUMN_NAME = 'host_identifier' -Hostname_Bin_Value = None - def init_streams_hosts(): global Hostname_Bin_Value @@ -43,14 +46,14 @@ class StreamException(Exception): self.message = message # dictionary of column -> list of error messages -class IOTStream(object): - """Representation of the stream for validation""" +class BaseIOTStream(object): + """Representation of a stream for validation""" + __metaclass__ = ABCMeta _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list