Changeset: a91cdecbf145 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a91cdecbf145 Modified Files: clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streams.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
Finished data reconstruction diffs (truncated from 334 to 300 lines): diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -1,12 +1,13 @@ import struct +from abc import ABCMeta, abstractmethod from datetime import date, time, datetime -from abc import ABCMeta, abstractmethod from dateutil.relativedelta import relativedelta -ALIGNMENT = '<' # for now is little-endian for Intel CPU's +LITTLE_ENDIAN_ALIGNMENT = '<' # for now is little-endian for Intel CPU's NIL_STRING = "\200\n" # added newline for performance +NIL_UUID = "00000000-0000-0000-0000-000000000000" INT8_MIN = 0x80 INT16_MIN = 0x8000 @@ -20,24 +21,40 @@ DOUBLE_NAN = struct.unpack('d', '\xff\xf class StreamDataType(object): - """MonetDB's data types for validation base class""" + """MonetDB's data types for reading base class""" __metaclass__ = ABCMeta def __init__(self, **kwargs): self._column_name = kwargs['name'] # name of the column self._data_type = kwargs['type'] # SQL name of the type - # self._location = kwargs['location'] + '.tail' # Location of the file def is_file_mode_binary(self): return True @abstractmethod - def read_next_batch(self, file_pointer, count): - return [] + def skip_tuples(self, file_pointer, offset): + pass + + @abstractmethod + def read_next_batch(self, file_pointer, limit): + pass + + def read_next_tuples(self, file_name, offset, read_size): + open_string = 'r' + if not self.is_file_mode_binary(): + open_string += 'u' + file_pointer = open(file_name, open_string) + + if offset > 0: + self.skip_tuples(file_pointer, offset) + + results = self.read_next_batch(file_pointer, read_size) + file_pointer.close() + return results class TextType(StreamDataType): - """Covers: CHAR, VARCHAR, CLOB""" + """Covers: CHAR, VARCHAR, CLOB and URL""" def __init__(self, **kwargs): super(TextType, self).__init__(**kwargs) @@ -46,9 +63,69 @@ class TextType(StreamDataType): def is_file_mode_binary(self): return False - def read_next_batch(self, file_pointer, count): - array = file_pointer.readlines() - return map(lambda x: None if x == self._nullable_constant else x[:-1], array) + def skip_tuples(self, file_pointer, offset): + for _ in xrange(offset): + next(file_pointer) + + def read_next_batch(self, file_pointer, limit): + array = [] + for _ in xrange(limit): + next_line = next(file_pointer) + if next_line == self._nullable_constant: + array.append(None) + else: + array.append(next_line[:-1]) # remove newline + return array + + +class INetType(StreamDataType): + """Covers: Inet""" + + def __init__(self, **kwargs): + super(INetType, self).__init__(**kwargs) + + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset << 3) + + def read_next_batch(self, file_pointer, limit): + results = [] + read_size = limit << 3 + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', file_pointer.read(read_size)) + iterator = iter(array) + + for _ in xrange(limit): + next_ipv4 = [next(iterator) for _ in xrange(8)] + if next_ipv4[7] == 1: # check nil value + results.append(None) + else: + parsed_ip = '.'.join([str(next_ipv4[0]), str(next_ipv4[1]), str(next_ipv4[2]), str(next_ipv4[3])]) + results.append(parsed_ip + '/' + str(next_ipv4[4])) + return results + + +class UUIDType(StreamDataType): + """Covers: UUID""" + + def __init__(self, **kwargs): + super(UUIDType, self).__init__(**kwargs) + + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset << 4) + + def read_next_batch(self, file_pointer, limit): + results = [] + read_size = limit << 4 + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', file_pointer.read(read_size)) + iterator = iter(array) + + for _ in xrange(limit): + next_uuid = ''.join(map(lambda x: "%02x" % x, [next(iterator) for _ in xrange(16)])) + next_uuid = ''.join([next_uuid[:8], '-', next_uuid[8:12], '-', next_uuid[12:16], '-', next_uuid[16:20], + '-', next_uuid[20:]]) + if next_uuid == NIL_UUID: + next_uuid = None + results.append(next_uuid) + return results class BooleanType(StreamDataType): @@ -58,8 +135,11 @@ class BooleanType(StreamDataType): super(BooleanType, self).__init__(**kwargs) self._nullable_constant = INT8_MIN - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count) + 'b', file_pointer.read(count)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b', file_pointer.read(limit)) return map(lambda x: None if x == self._nullable_constant else bool(x), array) @@ -74,8 +154,12 @@ class SmallIntegerType(StreamDataType): self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 'int': INT32_MIN, 'integer': INT32_MIN, 'bigint': INT64_MIN}.get(self._data_type) - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, file_pointer.read(count * self._size)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset * self._size) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + self._pack_sym, + file_pointer.read(limit * self._size)) return map(lambda x: None if x == self._nullable_constant else int(x), array) @@ -86,8 +170,11 @@ class HugeIntegerType(StreamDataType): super(HugeIntegerType, self).__init__(**kwargs) self._nullable_constant = INT128_MIN - def read_next_batch(self, file_pointer, count): # [entry & INT64_MAX, (entry >> 64) & INT64_MAX] - array = struct.unpack(ALIGNMENT + str(count << 1) + 'Q', file_pointer.read(count << 3)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset << 4) + + def read_next_batch(self, file_pointer, limit): # [entry & INT64_MAX, (entry >> 64) & INT64_MAX] + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q', file_pointer.read(limit << 4)) results = [] iterator = iter(array) # has to iterate two values at once, so use iterator for value in iterator: @@ -108,8 +195,12 @@ class FloatType(StreamDataType): self._size = struct.calcsize(self._pack_sym) self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 'double': DOUBLE_NAN}.get(self._data_type) - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, file_pointer.read(count * self._size)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset * self._size) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + self._pack_sym, + file_pointer.read(limit * self._size)) return map(lambda x: None if x == self._nullable_constant else float(x), array) @@ -126,20 +217,24 @@ class DecimalType(StreamDataType): if self._pack_sym == 'Q': self._size <<= 1 # has to read two values at once - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, file_pointer.read(count * self._size)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset * self._size) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + self._pack_sym, + file_pointer.read(limit * self._size)) if self._pack_sym != 'Q': return map(lambda x: None if x == self._nullable_constant else float(x), array) - - results = [] - iterator = iter(array) # has to iterate two values at once, so use iterator - for value in iterator: - next_huge_decimal = value + (next(iterator) << 64) - if next_huge_decimal == self._nullable_constant: - results.append(None) - else: - results.append(next_huge_decimal) - return results + else: + results = [] + iterator = iter(array) # has to iterate two values at once, so use iterator + for value in iterator: + next_huge_decimal = value + (next(iterator) << 64) + if next_huge_decimal == self._nullable_constant: + results.append(None) + else: + results.append(next_huge_decimal) + return results class DateType(StreamDataType): # Stored as an uint with the number of days since day 1 of month 1 (Jan) from year 0 @@ -149,8 +244,11 @@ class DateType(StreamDataType): # Store super(DateType, self).__init__(**kwargs) self._nullable_constant = INT32_MIN - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count) + 'I', file_pointer.read(count << 2)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset << 2) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'I', file_pointer.read(limit << 2)) results = [] for value in array: if value == self._nullable_constant: @@ -167,8 +265,11 @@ class TimeType(StreamDataType): # Store super(TimeType, self).__init__(**kwargs) self._nullable_constant = INT32_MIN - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count) + 'I', file_pointer.read(count << 2)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset << 2) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'I', file_pointer.read(limit << 2)) results = [] for value in array: if value == self._nullable_constant: @@ -188,8 +289,11 @@ class TimestampType(StreamDataType): # def __init__(self, **kwargs): super(TimestampType, self).__init__(**kwargs) - def read_next_batch(self, file_pointer, count): - array = struct.unpack(ALIGNMENT + str(count << 1) + 'I', file_pointer.read(count << 3)) + def skip_tuples(self, file_pointer, offset): + file_pointer.seek(offset << 3) + + def read_next_batch(self, file_pointer, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'I', file_pointer.read(limit << 3)) results = [] iterator = iter(array) # has to iterate two values at once, so use iterator diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -1,6 +1,6 @@ +import os import struct -import os from Settings.filesystem import get_baskets_base_location from Utilities.readwritelock import RWLock from WebSockets.websockets import notify_clients @@ -120,11 +120,7 @@ class IOTStream(object): for key, column in self._columns.iteritems(): next_file_name = os.path.join(next_path, key) - open_string = 'r' - if not column.is_file_mode_binary(): - open_string += 'u' - file_pointer = open(next_file_name, open_string) - results[key].append(column.read_next_batch(file_pointer, offset, next_read_size)) + results[key].append(column.read_next_tuples(next_file_name, offset, next_read_size)) read_tuples += next_read_size offset = 0 diff --git a/clients/iotclient/src/Streams/streamscreator.py b/clients/iotclient/src/Streams/streamscreator.py _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list