Changeset: 86b955aa08b7 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=86b955aa08b7 Modified Files: clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streams.py clients/iotclient/documentation/iot_server_arguments.rst clients/iotclient/documentation/restful_resources.rst clients/iotclient/documentation/streams_data_types.rst clients/iotclient/requirements.txt clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Streams/streams.py Branch: iot Log Message:
First version of web api working. Updated some documentation. Fixed small errors. diffs (230 lines): diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -119,12 +119,17 @@ class UUIDType(StreamDataType): iterator = iter(array) for _ in xrange(limit): - next_uuid = ''.join(map(lambda x: "%02x" % x, [next(iterator) for _ in xrange(16)])) - next_uuid = ''.join([next_uuid[:8], '-', next_uuid[8:12], '-', next_uuid[12:16], '-', next_uuid[16:20], - '-', next_uuid[20:]]) - if next_uuid == NIL_UUID: - next_uuid = None - results.append(next_uuid) + next_uuid = [] + for i in xrange(20): + if i in (4, 7, 10, 13): + next_uuid.append("-") + else: + next_uuid.append("%02x" % next(iterator)) + + built_uuid = ''.join(next_uuid) + if built_uuid == NIL_UUID: + built_uuid = None + results.append(built_uuid) return results @@ -178,7 +183,7 @@ class HugeIntegerType(StreamDataType): results = [] iterator = iter(array) # has to iterate two values at once, so use iterator for value in iterator: - next_huge = value + (next(iterator) << 64) + next_huge = next(iterator) + (value << 64) if next_huge == self._nullable_constant: results.append(None) else: @@ -229,7 +234,7 @@ class DecimalType(StreamDataType): results = [] iterator = iter(array) # has to iterate two values at once, so use iterator for value in iterator: - next_huge_decimal = value + (next(iterator) << 64) + next_huge_decimal = next(iterator) + (value << 64) if next_huge_decimal == self._nullable_constant: results.append(None) else: @@ -302,8 +307,8 @@ class TimestampType(StreamDataType): # if value == INT32_MIN and second_value == 0: results.append(None) else: # dates in python start on year 1, so we must subtract one year - read_date = date.fromordinal(value) - relativedelta(years=1) - div1, milliseconds = divmod(second_value, 1000) + read_date = date.fromordinal(second_value) - relativedelta(years=1) + div1, milliseconds = divmod(value, 1000) div2, second = divmod(div1, 60) hour, minute = divmod(div2, 60) results.append(datetime.combine(read_date, time(hour=hour, minute=minute, second=second, diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -40,9 +40,9 @@ def stream_polling(): if elem[3] in entry['types']: reflection_class = globals()[entry['class']] # import everything from datatypes!!! new_column = reflection_class(**{'name': elem[2], 'type': elem[3], 'typewidth': elem[4]}) - columns[elem[2]] = new_column - new_streams[key] = IOTStream(elem[0], elem[1], **columns) - break + columns[elem[2]] = new_column # add new column to the dictionary + break + new_streams[key] = IOTStream(schema_name=elem[0], stream_name=elem[1], columns=columns) else: retained_streams.append(key) diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -5,7 +5,7 @@ from datatypes import LITTLE_ENDIAN_ALIG from Settings.filesystem import get_baskets_base_location from Utilities.readwritelock import RWLock from WebSockets.websockets import notify_stream_inserts_to_clients -from watchdog.events import FileSystemEventHandler +from watchdog.events import FileSystemEventHandler, DirCreatedEvent, DirDeletedEvent from watchdog.observers import Observer BASKETS_COUNT_FILE = 'count' @@ -27,13 +27,13 @@ class StreamBasketsHandler(FileSystemEve self._stream = stream def on_created(self, event): # whenever a basket directory is created, notify to subscribed clients - if isinstance(event, 'DirCreatedEvent'): + if isinstance(event, DirCreatedEvent): basket_string = os.path.basename(os.path.normpath(event.src_path)) count = self._stream.append_basket(basket_string) notify_stream_inserts_to_clients(self._stream.get_schema_name(), self._stream.get_stream_name(), count) def on_deleted(self, event): - if isinstance(event, 'DirDeletedEvent'): + if isinstance(event, DirDeletedEvent): basket_string = os.path.basename(os.path.normpath(event.src_path)) self._stream.delete_basket(basket_string) @@ -46,11 +46,10 @@ class IOTStream(object): self._stream_name = stream_name # name of the stream self._columns = columns # dictionary of name -> data_types self._base_path = os.path.join(get_baskets_base_location(), schema_name, stream_name) + self._lock = RWLock() self._baskets = {} # dictionary of basket_number -> total_tuples for name in os.listdir(self._base_path): self.append_basket(name) - self._lock = RWLock() - self._observer = Observer() self._observer.schedule(StreamBasketsHandler(stream=self), self._base_path, recursive=False) self._observer.start() @@ -63,8 +62,8 @@ class IOTStream(object): def append_basket(self, path): if represents_int(path): - with open(os.path.join(self._base_path, path)) as f: - count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + '1i', f.read(4))[0] + with open(os.path.join(self._base_path, path, BASKETS_COUNT_FILE)) as f: + count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + 'i', f.read(4))[0] self._lock.acquire_write() self._baskets[int(path)] = count self._lock.release() @@ -124,7 +123,7 @@ class IOTStream(object): for key, column in self._columns.iteritems(): next_file_name = os.path.join(next_path, key) - results[key].append(column.read_next_tuples(next_file_name, offset, next_read_size)) + results[key] += column.read_next_tuples(next_file_name, offset, next_read_size) read_tuples += next_read_size offset = 0 diff --git a/clients/iotclient/documentation/iot_server_arguments.rst b/clients/iotclient/documentation/iot_server_arguments.rst --- a/clients/iotclient/documentation/iot_server_arguments.rst +++ b/clients/iotclient/documentation/iot_server_arguments.rst @@ -32,10 +32,6 @@ Set the filesystem directory where the b Location of logfile. On the logfile is reported when streams are created or removed, when tuples are inserted and when the baskets are flushed. By default in UNIX systems is :code:`/var/log/iot/iot.log`, while on Windows is the :code:`iot.log` on the directory where the :code:`main.py` script was called. -**-c - -config=** - -Location of the JSON file where the information of existing streams on the server will be stored. By default is the :code:`config.json` file on filesystem directory. - Host Identifier --------------- diff --git a/clients/iotclient/documentation/restful_resources.rst b/clients/iotclient/documentation/restful_resources.rst --- a/clients/iotclient/documentation/restful_resources.rst +++ b/clients/iotclient/documentation/restful_resources.rst @@ -53,7 +53,7 @@ Returns a JSON file with details about a **POST** -Creates a stream using a pre-defined JSON schema. The JSON must include the stream's schema, the stream's name, the flushing method which can be either time or tuple based and the stream's columns. For tuple based flushing, the number of tuples to flush must be provided using the :code:`number` field. In time based flushing, the :code:`interval` field tells the time units between flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours respectively. For columns `see data types for details <streams_data_types.html#data_types>`__. +Creates a stream using a pre-defined JSON schema. The JSON must include the stream's schema, the stream's name, the stream's columns and the flushing method. The flushing can be either time based, tuple based or automatic (:code:`auto`). For tuple based flushing, the number of tuples to flush must be provided using the :code:`number` field. In time based flushing, the :code:`interval` field tells the time units between flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours respectively. In automatic flushing, the baskets are flushed whenver a new batch is inserted. For columns `see data types for details <streams_data_types.html#data_types>`__. Bellow is the JSON used to create the stream in streams_: diff --git a/clients/iotclient/documentation/streams_data_types.rst b/clients/iotclient/documentation/streams_data_types.rst --- a/clients/iotclient/documentation/streams_data_types.rst +++ b/clients/iotclient/documentation/streams_data_types.rst @@ -43,7 +43,7 @@ An *Uniform Resource Locator* as a speci Inet ---- -An *IPv4* address. The insertion as a JSON string is validated against the regular expression :code:`^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$` with further semantic validation. +An *IPv4* address. The insertion as a JSON string is validated against the regular expression :code:`^(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}(?:\/[0-2]\d|\/3[0-2])?$`. InetSix ------- diff --git a/clients/iotclient/requirements.txt b/clients/iotclient/requirements.txt --- a/clients/iotclient/requirements.txt +++ b/clients/iotclient/requirements.txt @@ -3,7 +3,7 @@ jsonschema==2.5.1 pymonetdb==0.1.1 python-dateutil==2.5.3 pytz==2016.4 -rfc3987==1.3.5 +rfc3987==1.3.6 Sphinx==1.4.1 sphinx-rtd-theme==0.1.9 strict-rfc3339==0.7 diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -84,8 +84,6 @@ class StreamsHandling(Resource): add_log(50, ex) return ex, 400 else: - add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.', schema_to_validate['stream'], - ' was created']) return 'The stream was created with success!', 201 def delete(self): diff --git a/clients/iotclient/src/Streams/streams.py b/clients/iotclient/src/Streams/streams.py --- a/clients/iotclient/src/Streams/streams.py +++ b/clients/iotclient/src/Streams/streams.py @@ -84,9 +84,17 @@ class BaseIOTStream(object): if Use_Host_Identifier: create_file_if_not_exists(os.path.join(self._current_base_path, HOST_IDENTIFIER_COLUMN_NAME)) + # WARNING DELETE this code afterwards is for debugging purposes for now + basket_counter_file_pointer = open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b") + basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT + "i",0)) + basket_counter_file_pointer.flush() + basket_counter_file_pointer.close() + # END WARNING + if created: # when the stream is reloaded from the config file, the create SQL statement is not sent sql_array = [column.create_stream_sql() for column in self._columns.values()] mapi_create_stream(self._schema_name, self._stream_name, ', '.join(sql_array + Extra_columns_SQL)) + add_log(20, ''.join(['The stream ', self._schema_name, '.', self._stream_name, ' was created'])) def get_schema_name(self): return self._schema_name @@ -136,6 +144,13 @@ class BaseIOTStream(object): self._current_base_path = os.path.join(self._base_path, str(self._baskets_counter)) os.makedirs(self._current_base_path) + # WARNING DELETE this code afterwards is for debugging purposes for now + basket_counter_file_pointer = open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b") + basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT + "i", 0)) + basket_counter_file_pointer.flush() + basket_counter_file_pointer.close() + # END WARNING + for key in self._columns.keys(): create_file_if_not_exists(os.path.join(self._current_base_path, key)) create_file_if_not_exists(os.path.join(self._current_base_path, IMPLICIT_TIMESTAMP_COLUMN_NAME)) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list