Changeset: c44d0bb79a54 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c44d0bb79a54 Modified Files: clients/iotapi/requirements.txt clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streampolling.py clients/iotclient/requirements.txt clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
Added stream polling diffs (truncated from 705 to 300 lines): diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt --- a/clients/iotapi/requirements.txt +++ b/clients/iotapi/requirements.txt @@ -4,7 +4,7 @@ jsonschema==2.5.1 pymonetdb==0.1.1 python-dateutil==2.5.3 pytz==2016.4 -Sphinx==1.4.1 +Sphinx==1.4.3 sphinx-rtd-theme==0.1.9 tzlocal==1.2.2 watchdog==0.8.3 diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -328,14 +328,6 @@ class TimeType(StreamDataType): # Store .isoformat()) return results - def to_json_representation(self): - json_value = super(TimeType, self).to_json_representation() - if self._data_type == 'timez': - json_value['with_timezone'] = True - else: - json_value['with_timezone'] = False - return json_value - class TimestampType(StreamDataType): # it's represented with the two integers from time and date """Covers: TIMESTAMP""" @@ -363,11 +355,3 @@ class TimestampType(StreamDataType): # results.append(datetime.combine(read_date, time(hour=hour, minute=minute, second=second, microsecond=milliseconds * 1000)).isoformat()) return results - - def to_json_representation(self): - json_value = super(TimestampType, self).to_json_representation() - if self._data_type == 'timestamptz': - json_value['with_timezone'] = True - else: - json_value['with_timezone'] = False - return json_value diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -1,5 +1,4 @@ from itertools import groupby - from Settings.mapiconnection import fetch_streams from Utilities.customthreading import PeriodicalThread from datatypes import * diff --git a/clients/iotclient/requirements.txt b/clients/iotclient/requirements.txt --- a/clients/iotclient/requirements.txt +++ b/clients/iotclient/requirements.txt @@ -1,13 +1,13 @@ -python-dateutil==2.5.3 fake-factory==0.5.7 Flask-RESTful==0.3.5 IPy==0.83 iso8601==0.1.11 jsonschema==2.5.1 pymonetdb==0.1.1 +python-dateutil==2.5.3 pytz==2016.4 rfc3987==1.3.6 -Sphinx==1.4.1 +Sphinx==1.4.3 sphinx-rtd-theme==0.1.9 strict-rfc3339==0.7 tzlocal==1.2.2 diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -49,22 +49,21 @@ def mapi_get_webserver_streams(): try: Connection.execute("START TRANSACTION") cursor = Connection.cursor() - sql_string = """SELECT tables."id", tables."name", schemas."name" AS schema, tables."name" AS table, - extras."has_hostname", extras."flushing", extras."unit", extras."interval" FROM (SELECT "id", "name", - "schema_id" FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) - AS schemas ON (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "has_hostname", "flushing", - "unit", "interval" FROM iot.webserverstreams) AS extras ON (tables."id"=extras."table_id")"""\ - .replace('\n', ' ') + sql_string = """SELECT tables."id", schemas."name" AS schema, tables."name" AS table, extras."has_hostname", + extras."base", extras."interval", extras."unit" FROM (SELECT "id", "name", "schema_id" FROM sys.tables + WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas ON + (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "has_hostname", "base", "interval", "unit" + FROM iot.webserverstreams) AS extras ON (tables."id"=extras."table_id")""".replace('\n', ' ') cursor.execute(sql_string) tables = cursor.fetchall() cursor = Connection.cursor() - sql_string = """SELECT columns."table_id", columns."name" AS column, columns."type", columns."type_digits", - columns."type_scale", columns."default", columns."null", extras."special", extras."validation1", - extras."validation2" FROM (SELECT "id", "table_id", "name", "type", "type_digits", "type_scale", - "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT "id" FROM sys.tables WHERE type=4) - AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT "column_id", "special", "validation1", - "validation2" FROM iot.webservercolumns) AS extras ON (columns."id"=extras."column_id")"""\ + sql_string = """SELECT columns."column_id", columns."table_id", columns."name" AS column, columns."type", + columns."type_digits", columns."type_scale", columns."default", columns."null", extras."special", + extras."validation1", extras."validation2" FROM (SELECT "id", "table_id", "name", "type", "type_digits", + "type_scale", "default", "null" FROM sys.columns) AS columns INNER JOIN (SELECT "id" FROM sys.tables + WHERE type=4) AS tables ON (tables."id"=columns."table_id") LEFT JOIN (SELECT "column_id", "special", + "validation1", "validation2" FROM iot.webservercolumns) AS extras ON (columns."id"=extras."column_id")"""\ .replace('\n', ' ') cursor.execute(sql_string) columns = cursor.fetchall() diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -30,7 +30,7 @@ DOUBLE_NAN = struct.unpack('d', '\xff\xf ENUM_TYPE_SEPARATOR = '\r' -class StreamDataType(object): +class StreamDataType: """MonetDB's data types for validation base class""" __metaclass__ = ABCMeta @@ -38,7 +38,7 @@ class StreamDataType(object): self._column_name = kwargs['name'] # name of the column self._data_type = kwargs['type'] # SQL name of the type self._is_nullable = kwargs.get('nullable', True) # boolean - if 'default' in kwargs: + if 'default' in kwargs and kwargs['default'] is not None: self._default_value = self.set_default_value(kwargs['default']) else: self._default_value = None @@ -189,7 +189,7 @@ class RegexType(TextType): def to_json_representation(self): json_value = super(RegexType, self).to_json_representation() - json_value['regex'] = self._regex.pattern + json_value['regex'] = self._regex_text return json_value def process_sql_parameters(self, array): @@ -418,15 +418,17 @@ class NumberBaseType(StreamDataType): return json_value def get_extra_sql_statement(self): - res_str = ",NULL," + res_str = [",NULL,"] if self._minimum is not None: - res_str += str(self._minimum) + "," + res_str.append(str(self._minimum)) + res_str.append(",") else: - res_str += "NULL," + res_str.append("NULL,") if self._maximum is not None: - res_str += str(self._maximum) + res_str.append(str(self._maximum)) else: - res_str += "NULL" + res_str.append("NULL") + return ''.join(res_str) class SmallIntegerType(NumberBaseType): @@ -434,10 +436,10 @@ class SmallIntegerType(NumberBaseType): def __init__(self, **kwargs): super(SmallIntegerType, self).__init__(**kwargs) - self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 'integer': 'i', 'wrd': 'q', 'bigint': 'q'} \ + self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 'integer': 'i', 'bigint': 'q'}\ .get(kwargs['type']) self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 'int': INT32_MIN, 'integer': INT32_MIN, - 'wrd': INT64_MIN, 'bigint': INT64_MIN}.get(kwargs['type']) + 'bigint': INT64_MIN}.get(kwargs['type']) def add_json_schema_entry(self, schema): super(SmallIntegerType, self).add_json_schema_entry(schema) @@ -480,8 +482,9 @@ class FloatType(NumberBaseType): def __init__(self, **kwargs): super(FloatType, self).__init__(**kwargs) this_type = kwargs['type'] - self._pack_sym = {'real': 'f', 'float': 'd', 'double': 'd'}.get(this_type) - self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 'double': DOUBLE_NAN}.get(this_type) + self._pack_sym = {'real': 'f', 'float': 'd', 'double': 'd', 'double precision': 'd'}.get(this_type) + self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 'double': DOUBLE_NAN, + 'double precision': DOUBLE_NAN}.get(this_type) def add_json_schema_entry(self, schema): super(FloatType, self).add_json_schema_entry(schema) @@ -630,15 +633,17 @@ class BaseDateTimeType(StreamDataType): return json_value def get_extra_sql_statement(self): - res_str = ",NULL," + res_str = [",NULL,"] if self._minimum is not None: - res_str += str(self._minimum_text) + "," + res_str.append(self._minimum_text) + res_str.append(",") else: - res_str += "NULL," + res_str.append("NULL,") if self._maximum is not None: - res_str += str(self._maximum_text) + res_str.append(self._maximum_text) else: - res_str += "NULL" + res_str.append("NULL") + return ''.join(res_str) class DateType(BaseDateTimeType): # Stored as an uint with the number of days since day 1 of month 1 (Jan) from year 0 @@ -704,9 +709,6 @@ class TimeWithTimeZoneType(TimeWithoutTi parsed = parsed.replace(tzinfo=None) - delta return parsed - def process_sql_parameters(self, array): - array[2] = 'time with time zone' - class TimestampWithoutTimeZoneType(BaseDateTimeType): # it's represented with the two integers from time and date """Covers: TIMESTAMP""" @@ -750,6 +752,3 @@ class TimestampWithTimeZoneType(Timestam delta = -delta parsed = parsed.replace(tzinfo=None) - delta return parsed - - def process_sql_parameters(self, array): - array[2] = 'timestamp with time zone' diff --git a/clients/iotclient/src/Streams/jsonschemas.py b/clients/iotclient/src/Streams/jsonschemas.py --- a/clients/iotclient/src/Streams/jsonschemas.py +++ b/clients/iotclient/src/Streams/jsonschemas.py @@ -7,25 +7,42 @@ TIMED_FLUSH_IDENTIFIER = "time" TUPLE_FLUSH_IDENTIFIER = "tuple" AUTO_FLUSH_IDENTIFIER = "auto" -UNBOUNDED_TEXT_TYPES = ["text", "string", "clob", "character large object"] -BOUNDED_TEXT_TYPES = ["char", "character", "varchar", "character varying"] -UUID_TYPE = ["uuid"] -MAC_TYPE = ["mac"] -URL_TYPE = ["url"] -INET_TYPE = ["inet"] -INET6_TYPE = ["inet6"] -REGEX_TYPE = ["regex"] -ENUM_TYPE = ["enum"] -BOOLEAN_TYPE = ["bool", "boolean"] -SMALL_INTEGERS_TYPES = ["tinyint", "smallint", "int", "integer", "wrd", "bigint"] -HUGE_INTEGER_TYPE = ["hugeint"] -FLOATING_POINT_PRECISION_TYPES = ["real", "float", "double", "double precision"] -DECIMAL_TYPES = ["dec", "decimal", "numeric"] -DATE_TYPE = ["date"] -TIME_WITHOUT_TIMEZONE_TYPE = ["time"] -TIME_WITH_TIMEZONE_TYPE = ["time with time zone", "timetz"] -TIMESTAMP_WITHOUT_TIMEZONE_TYPE = ["timestamp"] -TIMESTAMP_WITH_TIMEZONE_TYPE = ["timestamp with time zone", "timestamptz"] +UNBOUNDED_TEXT_TYPE = "clob" +UNBOUNDED_TEXT_INPUTS = [UNBOUNDED_TEXT_TYPE, "text", "string", "character large object"] +BOUNDED_TEXT_TYPES = ["char", "varchar"] +BOUNDED_TEXT_INPUTS = BOUNDED_TEXT_TYPES + ["character", "character varying"] + +UUID_TYPE = "uuid" +MAC_TYPE = "mac" +URL_TYPE = "url" +INET_TYPE = "inet" +INET6_TYPE = "inet6" +REGEX_TYPE = "regex" +ENUM_TYPE = "enum" + +BOOLEAN_TYPE = "boolean" +BOOLEAN_INPUTS = [BOOLEAN_TYPE, "bool"] + +SMALL_INTEGERS_TYPES = ["tinyint", "smallint", "int", "integer", "bigint"] +HUGE_INTEGER_TYPE = "hugeint" + +FLOATING_POINT_PRECISION_TYPES = ["real", "double"] +FLOATING_POINT_PRECISION_INPUTS = FLOATING_POINT_PRECISION_TYPES + ["float", "double precision"] + +DECIMAL_TYPE = "decimal" +DECIMAL_INPUTS = [DECIMAL_TYPE, "dec", "numeric"] + +DATE_TYPE = "date" + +TIME_WITHOUT_TIMEZONE_TYPE = "time" +TIME_WITH_TIMEZONE_TYPE_INTERNAL = "timetz" +TIME_WITH_TIMEZONE_TYPE_EXTERNAL = "time with time zone" +TIME_INPUTS = [TIME_WITHOUT_TIMEZONE_TYPE, TIME_WITH_TIMEZONE_TYPE_EXTERNAL] + +TIMESTAMP_WITHOUT_TIMEZONE_TYPE = "timestamp" +TIMESTAMP_WITH_TIMEZONE_TYPE_INTERNAL = "timestamptz" +TIMESTAMP_WITH_TIMEZONE_TYPE_EXTERNAL = "timestamp with time zone" +TIMESTAMP_INPUTS = [TIMESTAMP_WITHOUT_TIMEZONE_TYPE, TIMESTAMP_WITH_TIMEZONE_TYPE_EXTERNAL] CREATE_STREAMS_SCHEMA = None @@ -34,7 +51,7 @@ def init_create_streams_schema(add_hugei global CREATE_STREAMS_SCHEMA if add_hugeint: - integers = SMALL_INTEGERS_TYPES + HUGE_INTEGER_TYPE + integers = SMALL_INTEGERS_TYPES.append(HUGE_INTEGER_TYPE) else: integers = SMALL_INTEGERS_TYPES @@ -81,7 +98,7 @@ def init_create_streams_schema(add_hugei _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list