Changeset: 2dd269ec052a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2dd269ec052a Modified Files: clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
merger diffs (truncated from 551 to 300 lines): diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -104,7 +104,6 @@ class StreamDataType(object): def create_stream_sql(self): # get column creation statement on SQL array = [self._column_name, " ", self._data_type] self.process_sql_parameters(array) # add extra parameters to the SQL statement - if self._default_value is not None: array.extend([" DEFAULT '", str(self._default_value), "'"]) if not self._is_nullable: @@ -237,11 +236,11 @@ class RegexType(TextType): def add_json_schema_entry(self, schema): super(RegexType, self).add_json_schema_entry(schema) - schema[self._column_name]['pattern'] = self._regex + schema[self._column_name]['pattern'] = self._regex.pattern def to_json_representation(self): json_value = super(RegexType, self).to_json_representation() - json_value['regex'] = self._regex + json_value['regex'] = self._regex.pattern return json_value def process_sql_parameters(self, array): @@ -253,7 +252,7 @@ class LimitedTextType(TextType): def __init__(self, **kwargs): super(LimitedTextType, self).__init__(**kwargs) - self._limit = int(kwargs['limit']) + self._limit = kwargs['limit'] def add_json_schema_entry(self, schema): super(LimitedTextType, self).add_json_schema_entry(schema) @@ -263,7 +262,8 @@ class LimitedTextType(TextType): str_value = str(default_value) parsed_len = len(str_value) if parsed_len > self._limit: - raise Exception('The default string is higher than the limit: %d > %d' % (parsed_len, self._limit)) + raise Exception('The default string\'s length is longer than the limit: %d > %d!' + % (parsed_len, self._limit)) self._default_value = str_value def to_json_representation(self): @@ -340,9 +340,9 @@ class NumberBaseType(StreamDataType): def __init__(self, **kwargs): super(NumberBaseType, self).__init__(**kwargs) if 'minimum' in kwargs: - self._minimum = self.process_next_value(kwargs['minimum'], 0, {}, {}) + self._minimum = kwargs['minimum'] if 'maximum' in kwargs: - self._maximum = self.process_next_value(kwargs['maximum'], 0, {}, {}) + self._maximum = kwargs['maximum'] if hasattr(self, '_minimum') and hasattr(self, '_maximum') and self._minimum > self._maximum: raise Exception('The minimum value is higher than the maximum!') @@ -353,21 +353,16 @@ class NumberBaseType(StreamDataType): if hasattr(self, '_maximum'): schema[self._column_name]['maximum'] = self._maximum - @abstractmethod - def process_default_value(self, value): - return value - def set_default_value(self, default_value): - parsed_val = self.process_default_value(default_value) - if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and parsed_val < self._minimum: - raise Exception('The default value is less than the minimum: %d < %d' % (parsed_val, self._minimum)) - elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and parsed_val > self._maximum: - raise Exception('The default value is higher than the maximum: %d > %d' % (parsed_val, self._maximum)) - elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed_val < self._minimum: - raise Exception('The default value is out of range: %d < %d' % (parsed_val, self._minimum)) - elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed_val > self._maximum: - raise Exception('The default value is out of range: %d > %d' % (parsed_val, self._maximum)) - self._default_value = parsed_val + if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and default_value < self._minimum: + raise Exception('The default value is less than the minimum: %s < %s!' % (default_value, self._minimum)) + elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and default_value > self._maximum: + raise Exception('The default value is higher than the maximum: %s > %s!' % (default_value, self._maximum)) + elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and default_value < self._minimum: + raise Exception('The default value is out of range: %s < %s!' % (default_value, self._minimum)) + elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and default_value > self._maximum: + raise Exception('The default value is out of range: %s > %s!' % (default_value, self._maximum)) + self._default_value = default_value def to_json_representation(self): json_value = super(NumberBaseType, self).to_json_representation() @@ -379,15 +374,15 @@ class NumberBaseType(StreamDataType): def create_stream_sql(self): string = super(NumberBaseType, self).create_stream_sql() - array = [] + if hasattr(self, '_minimum') and not hasattr(self, '_maximum'): - array.extend([" CHECK (", self._column_name, " > ", str(self._minimum), ")"]) + return string + ''.join([" CHECK (", self._column_name, " > ", str(self._minimum), ")"]) elif hasattr(self, '_maximum') and not hasattr(self, '_minimum'): - array.extend([" CHECK (", self._column_name, " < ", str(self._maximum), ")"]) + return string + ''.join([" CHECK (", self._column_name, " < ", str(self._maximum), ")"]) elif hasattr(self, '_maximum') and hasattr(self, '_minimum'): - array.extend([" CHECK (", self._column_name, " BETWEEN ", str(self._minimum), - " AND ", str(self._maximum), ")"]) - return string.join(array) + return string + ''.join([" CHECK (", self._column_name, " BETWEEN ", str(self._minimum), + " AND ", str(self._maximum), ")"]) + return string class SmallIntegerType(NumberBaseType): @@ -395,10 +390,10 @@ class SmallIntegerType(NumberBaseType): def __init__(self, **kwargs): super(SmallIntegerType, self).__init__(**kwargs) - this_type = kwargs['type'] - self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 'integer': 'i', 'bigint': 'q'}.get(this_type) + self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 'integer': 'i', 'bigint': 'q'} \ + .get(kwargs['type']) self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 'int': INT32_MIN, 'integer': INT32_MIN, - 'bigint': INT64_MIN}.get(this_type) + 'bigint': INT64_MIN}.get(kwargs['type']) def add_json_schema_entry(self, schema): super(SmallIntegerType, self).add_json_schema_entry(schema) @@ -407,14 +402,11 @@ class SmallIntegerType(NumberBaseType): def get_nullable_constant(self): return self._nullable_constant - def process_default_value(self, value): - return int(value) - def process_next_value(self, entry, counter, parameters, errors): return int(entry) def pack_parsed_values(self, extracted_values, counter, parameters): - return struct.pack(ALIGNMENT + str(counter) + self._pack_sym, extracted_values) + return struct.pack(ALIGNMENT + str(counter) + self._pack_sym, *extracted_values) class HugeIntegerType(NumberBaseType): @@ -430,9 +422,6 @@ class HugeIntegerType(NumberBaseType): def get_nullable_constant(self): return INT128_MIN - def process_default_value(self, value): - return int(value) - def process_next_value(self, entry, counter, parameters, errors): return [entry & INT64_MAX, (entry >> 64) & INT64_MAX] @@ -457,9 +446,6 @@ class FloatType(NumberBaseType): def get_nullable_constant(self): return self._nullable_constant - def process_default_value(self, value): - return float(value) - def process_next_value(self, entry, counter, parameters, errors): return float(entry) @@ -473,17 +459,24 @@ class DecimalType(NumberBaseType): def __init__(self, **kwargs): super(DecimalType, self).__init__(**kwargs) if 'precision' in kwargs: - self._precision = int(kwargs['precision']) + self._precision = kwargs['precision'] else: self._precision = 18 if 'scale' in kwargs: - self._scale = int(kwargs['scale']) + self._scale = kwargs['scale'] else: self._scale = 0 if self._scale > self._precision: raise Exception('The scale must be between 0 and the precision!') + if self._default_value is not None: + self.check_value_precision(self._default_value, 'default') + if hasattr(self, '_minimum'): + self.check_value_precision(self._minimum, 'minimum') + if hasattr(self, '_maximum'): + self.check_value_precision(self._maximum, 'maximum') + if self._precision <= 2: # calculate the number of bytes to use according to the precision self._pack_sym = 'b' elif 2 < self._precision <= 4: @@ -498,6 +491,11 @@ class DecimalType(NumberBaseType): self._nullable_constant = {'b': INT8_MIN, 'h': INT16_MIN, 'i': INT32_MIN, 'q': INT64_MIN, 'Q': INT128_MIN} \ .get(self._pack_sym) + def check_value_precision(self, value, text): + number_digits = int(math.ceil(math.log10(abs(value)))) + if number_digits > self._precision: + raise Exception('Too many digits on %s value: %s > %s!' % (text, number_digits, self._precision)) + def add_json_schema_entry(self, schema): super(DecimalType, self).add_json_schema_entry(schema) schema[self._column_name]['type'] = 'number' @@ -505,16 +503,8 @@ class DecimalType(NumberBaseType): def get_nullable_constant(self): return self._nullable_constant - def process_default_value(self, value): - number_digits = int(math.ceil(math.log10(abs(value)))) - if number_digits > self._precision: - raise Exception('Too many digits on default value: %s > %s' % (number_digits, self._precision)) - return int(value) - def process_next_value(self, entry, counter, parameters, errors): - number_digits = int(math.ceil(math.log10(abs(entry)))) - if number_digits > self._precision: - errors[counter] = 'Too many digits: %s > %s' % (number_digits, self._precision) + self.check_value_precision(entry, 'entry') parsed_value = int(entry) if self._pack_sym != 'Q': return parsed_value @@ -524,7 +514,7 @@ class DecimalType(NumberBaseType): def pack_parsed_values(self, extracted_values, counter, parameters): if self._pack_sym == 'Q': extracted_values = list(itertools.chain(*extracted_values)) - counter <<= 1 + counter <<= 1 # duplicate the counter for packing return struct.pack(ALIGNMENT + str(counter) + self._pack_sym, *extracted_values) def to_json_representation(self): @@ -550,60 +540,56 @@ class BaseDateTimeType(StreamDataType): self._maximum = self.parse_entry(kwargs['maximum']) if hasattr(self, '_minimum') and hasattr(self, '_maximum') and self._minimum > self._maximum: raise Exception('The minimum value is higher than the maximum!') - self._default_value_text = None # needed later for the SQL creation statement + + def get_nullable_constant(self): + return "0" + + @abstractmethod + def parse_entry(self, entry): + pass def set_default_value(self, default_value): - parsed_val = self.parse_entry(default_value) # Process the default value as others + parsed_val = self.parse_entry(default_value) # Process the default value as the others if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and parsed_val < self._minimum: - raise Exception('The default value is less than the minimum: %s < %s' % (default_value, self._minimum_text)) + raise Exception('The default value is less than the minimum: %s < %s!' + % (default_value, self._minimum_text)) elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and parsed_val > self._maximum: - raise Exception('The default value is higher than the maximum: %s > %s' + raise Exception('The default value is higher than the maximum: %s > %s!' % (default_value, self._maximum_text)) elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed_val < self._minimum: - raise Exception('The default value is out of range: %s < %s' % (default_value, self._minimum_text)) + raise Exception('The default value is out of range: %s < %s!' % (default_value, self._minimum_text)) elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed_val > self._maximum: - raise Exception('The default value is out of range: %s > %s' % (default_value, self._maximum_text)) - self._default_value = parsed_val - self._default_value_text = default_value + raise Exception('The default value is out of range: %s > %s!' % (default_value, self._maximum_text)) + self._default_value = default_value + + @abstractmethod + def pack_next_value(self, parsed, counter, parameters, errors): + pass + + def process_next_value(self, entry, counter, parameters, errors): + if entry == self.get_nullable_constant(): # have to do this trick due to Python's datetime limitations + return self.pack_next_value(None, counter, parameters, errors) + parsed = self.parse_entry(entry) + if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and parsed < self._minimum: + errors[counter] = 'The value is higher than the minimum: %s < %s!' % (self._minimum_text, parsed) + elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and parsed > self._maximum: + errors[counter] = 'The value is higher than the maximum: %s > %s!' % (parsed, self._maximum_text) + elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed < self._minimum: + errors[counter] = 'The value is out of range: %s < %s!' % (self._minimum_text, parsed) + elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed > self._maximum: + errors[counter] = 'The value is out of range: %s > %s!' % (parsed, self._maximum_text) + return self.pack_next_value(parsed, counter, parameters, errors) def to_json_representation(self): json_value = super(BaseDateTimeType, self).to_json_representation() - json_value['type'] = 'string' if hasattr(self, '_minimum'): json_value['minimum'] = self._minimum_text if hasattr(self, '_maximum'): json_value['maximum'] = self._maximum_text return json_value - def create_stream_sql(self): - array = [self._column_name, " ", self._data_type] - if self._default_value is not None: - array.extend([" DEFAULT '", str(self._default_value_text), "'"]) - if not self._is_nullable: - array.append(" NOT NULL") - return ''.join(array) - @abstractmethod - def parse_entry(self, entry): - return 0 - - @abstractmethod - def pack_next_value(self, parsed, counter, parameters, errors): _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list