Changeset: 5b90b8e6494a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5b90b8e6494a Modified Files: clients/iotapi/src/Settings/mapiconnection.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/Streams/streamscontext.py clients/iotapi/src/WebSockets/jsonschemas.py clients/iotapi/src/WebSockets/websockets.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
Finished stream reading diffs (truncated from 661 to 300 lines): diff --git a/clients/iotapi/src/Settings/mapiconnection.py b/clients/iotapi/src/Settings/mapiconnection.py --- a/clients/iotapi/src/Settings/mapiconnection.py +++ b/clients/iotapi/src/Settings/mapiconnection.py @@ -47,4 +47,3 @@ def fetch_streams(): except BaseException as ex: print >> sys.stdout, ex add_log(50, ex) - sys.exit(1) diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -28,8 +28,8 @@ class StreamBasketsHandler(FileSystemEve def on_created(self, event): # whenever a basket directory is created, notify to subscribed clients if isinstance(event, 'DirCreatedEvent'): basket_string = os.path.basename(os.path.normpath(event.src_path)) - self._stream.append_basket(basket_string) - notify_clients(self._stream.get_schema_name(), self._stream.get_stream_name()) + count = self._stream.append_basket(basket_string) + notify_clients(self._stream.get_schema_name(), self._stream.get_stream_name(), count) def on_deleted(self, event): if isinstance(event, 'DirDeletedEvent'): @@ -49,6 +49,7 @@ class IOTStream(object): for name in os.listdir(self._base_path): self.append_basket(name) self._lock = RWLock() + self._observer = Observer() self._observer.schedule(StreamBasketsHandler(stream=self), self._base_path, recursive=False) self._observer.start() @@ -66,6 +67,8 @@ class IOTStream(object): self._lock.acquire_write() self._baskets[int(path)] = count self._lock.release() + return count + return 0 def delete_basket(self, path): if represents_int(path): @@ -126,7 +129,6 @@ class IOTStream(object): offset = 0 current_basket_number += 1 - # TODO check if this is viable, it could be 1000 tuples!!!! - keys = results.keys() + keys = results.keys() # TODO check if this is viable for many tuples!! tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k in keys))] return {'total': read_tuples, 'tuples': tuples} diff --git a/clients/iotapi/src/Streams/streamscontext.py b/clients/iotapi/src/Streams/streamscontext.py --- a/clients/iotapi/src/Streams/streamscontext.py +++ b/clients/iotapi/src/Streams/streamscontext.py @@ -5,7 +5,7 @@ from WebSockets.websockets import desubs class IOTStreams(object): - """Stream's context""" + """Streams context""" @classmethod def get_context_entry_name(cls, schema_name, stream_name): diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py b/clients/iotapi/src/WebSockets/jsonschemas.py --- a/clients/iotapi/src/WebSockets/jsonschemas.py +++ b/clients/iotapi/src/WebSockets/jsonschemas.py @@ -1,6 +1,11 @@ -PUBSUB_STREAMS_SCHEMA = { +SUBSCRIBE_OPTS = ["sub", "subscribe"] +UNSUBSCRIBE_OPTS = ["unsub", "unsubscribe"] +CONCAT_SUB_OPTS = SUBSCRIBE_OPTS + UNSUBSCRIBE_OPTS +READ_OPTS = ["read"] + +CLIENTS_INPUTS_SCHEMA = { "title": "JSON schema publish/subscribe streams", - "description": "Validate data inserted", + "description": "Validate clients inputs", "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", @@ -8,7 +13,7 @@ PUBSUB_STREAMS_SCHEMA = { "properties": { "schema": {"type": "string"}, "stream": {"type": "string"}, - "action": {"type": "string", "enum": ["sub", "subscribe", "desub", "desubscribe"]}, + "action": {"type": "string", "enum": CONCAT_SUB_OPTS}, }, "required": ["schema", "stream", "action"], "additionalProperties": False @@ -22,7 +27,7 @@ PUBSUB_STREAMS_SCHEMA = { "properties": { "schema": {"type": "string"}, "stream": {"type": "string"}, - "action": {"type": "string", "enum": ["read"]}, + "action": {"type": "string", "enum": READ_OPTS}, "basket": {"type": "integer", "minimum": 1, "default": 1}, "limit": {"type": "integer", "minimum": 0, "default": 0}, "offset": {"type": "integer", "minimum": 0, "default": 0} diff --git a/clients/iotapi/src/WebSockets/websockets.py b/clients/iotapi/src/WebSockets/websockets.py --- a/clients/iotapi/src/WebSockets/websockets.py +++ b/clients/iotapi/src/WebSockets/websockets.py @@ -7,24 +7,23 @@ from Streams.streamscontext import Strea from Utilities.readwritelock import RWLock from jsonschema import Draft4Validator, FormatChecker -from jsonschemas import PUBSUB_STREAMS_SCHEMA +from jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS, UNSUBSCRIBE_OPTS, READ_OPTS -Client_Messages_Validator = Draft4Validator(PUBSUB_STREAMS_SCHEMA, format_checker=FormatChecker()) +Client_Messages_Validator = Draft4Validator(CLIENTS_INPUTS_SCHEMA, format_checker=FormatChecker()) WebSocketServer = None WebClients = [] # TODO this probably won't scale for many WebClientsLock = RWLock() -def notify_stream_inserts_to_clients(schema_name, stream_name): +def notify_stream_inserts_to_clients(schema_name, stream_name, count): concatenated_name = IOTStreams.get_context_entry_name(schema_name, stream_name) - json_message = json.dumps({'notification': {'schema': schema_name, 'stream': stream_name}}) WebClientsLock.acquire_read() for client in WebClients: - client.send_notification_message(concatenated_name, json_message) + client.send_notification_message(concatenated_name, schema_name, stream_name, count) WebClientsLock.release() -def desubscribe_removed_streams(concatenated_names): +def unsubscribe_removed_streams(concatenated_names): WebClientsLock.acquire_read() for name in concatenated_names: for client in WebClients: @@ -35,85 +34,93 @@ def desubscribe_removed_streams(concaten class IOTAPI(WebSocket): - def __init__(self): - super(IOTAPI, self).__init__() + """Client WebSocket""" + + def __init__(self, server, sock, address): + super(IOTAPI, self).__init__(server, sock, address) self._subscriptions = {} # dictionary of schema + '.' + stream -> IOTStream self._locker = RWLock() - def handleMessage(self): + def sendMessage(self, message): # overriden + super(IOTAPI, self).sendMessage(json.dumps(message)) # send JSON Strings to clients + + def handleConnected(self): # overriden + WebClientsLock.acquire_write() + WebClients.append(self) + WebClientsLock.release() + add_log(20, 'Client connected: ' + self.address[0]) + + def handleClose(self): # overriden + WebClientsLock.acquire_write() + WebClients.remove(self) + WebClientsLock.release() + add_log(20, 'Client disconnected: ' + self.address[0]) + + def handleMessage(self): # overriden if self.opcode != 0x1: # TEXT frame self.sendMessage({"error": "Only TEXT frames allowed!"}) try: input_schema = json.loads(self.data) Client_Messages_Validator.validate(input_schema) + concatenated_name = IOTStreams.get_context_entry_name(input_schema['schema'], input_schema['stream']) - if input_schema['action'] in ("sub", "subscribe"): - concatenated_name = IOTStreams.get_context_entry_name(input_schema['schema'], input_schema['stream']) - self.subscribe(self, concatenated_name) - elif input_schema['action'] in ("desub", "desubscribe"): - concatenated_name = IOTStreams.get_context_entry_name(input_schema['schema'], input_schema['stream']) - self.desubscribe(self, concatenated_name) + if input_schema['action'] in SUBSCRIBE_OPTS: + self.subscribe(concatenated_name) + elif input_schema['action'] in UNSUBSCRIBE_OPTS: + self.unsubscribe(concatenated_name) + elif input_schema['action'] in READ_OPTS: + self.read_stream_batch(concatenated_name, int(input_schema['basket']), int(input_schema['limit']), + int(input_schema['offset'])) except BaseException as ex: + self.sendMessage({"error": ex}) add_log(50, ex) - self.sendMessage(json.dumps({"error": ex})) - - def handleConnected(self): - WebClientsLock.acquire_write() - WebClients.append(self) - WebClientsLock.release() - add_log(20, 'Client connected: ' + self.address[0]) - - def handleClose(self): - WebClientsLock.acquire_write() - WebClients.remove(self) - WebClientsLock.release() - add_log(20, 'Client disconnected: ' + self.address[0]) def subscribe(self, concatenated_name): - try: - stream = Streams_context.get_existing_stream(concatenated_name) - except: - raise + stream = Streams_context.get_existing_stream(concatenated_name) self._locker.acquire_write() self._subscriptions[concatenated_name] = stream self._locker.release() - self.sendMessage(json.dumps({"subscribed": "Subscribed to " + concatenated_name})) + self.sendMessage({"subscribed": "Subscribed to " + concatenated_name}) add_log(20, ''.join(['Client ', self.address[0], 'subscribed stream ', concatenated_name])) - def desubscribe(self, concatenated_name): + def unsubscribe(self, concatenated_name): self._locker.acquire_write() if concatenated_name not in self._subscriptions: self._locker.release() - self.sendMessage(json.dumps({"error": "Stream " + concatenated_name + " not present in subscriptions!"})) + self.sendMessage({"error": "Stream " + concatenated_name + " not present in subscriptions!"}) else: del self._subscriptions[concatenated_name] self._locker.release() - self.sendMessage(json.dumps({"desubscribed": "Desubscribed to " + concatenated_name})) - add_log(20, ''.join(['Client ', self.address[0], 'desubscribed stream ', concatenated_name])) + self.sendMessage({"unsubscribed": "Unsubscribed to " + concatenated_name}) + add_log(20, ''.join(['Client ', self.address[0], ' unsubscribed stream ', concatenated_name])) def remove_subscribed_stream(self, concatenated_name): self._locker.acquire_write() if concatenated_name in self._subscriptions: del self._subscriptions[concatenated_name] self._locker.release() - self.sendMessage(json.dumps({"removed": "Stream removed from context: " + concatenated_name})) + self.sendMessage({"removed": "Stream removed from context: " + concatenated_name}) - def send_notification_message(self, concatenated_name, json_message): + def send_notification_message(self, concatenated_name, schema_name, stream_name, count): self._locker.acquire_read() if concatenated_name in self._subscriptions: self._locker.release() - self.sendMessage(json_message) + self.sendMessage({'notification': {'schema': schema_name, 'stream': stream_name, 'tuples': count}}) add_log(20, ''.join(['Stream notification sent to client ', self.address[0]])) else: self._locker.release() + def read_stream_batch(self, concatenated_name, basket_number, limit, offset): + stream = Streams_context.get_existing_stream(concatenated_name) + self.sendMessage(stream.read_tuples(basket_number, limit, offset)) + def init_websockets(host, port): global WebSocketServer try: WebSocketServer = SimpleWebSocketServer(host, port, IOTAPI) WebSocketServer.serveforever() - except (Exception, OSError) as ex: + except (BaseException, OSError) as ex: print >> sys.stdout, ex add_log(50, ex) sys.exit(1) diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -1,7 +1,7 @@ +import getpass import sys + import pymonetdb -import getpass - from Settings.iotlogger import add_log Connection = None @@ -37,12 +37,11 @@ def mapi_create_stream(schema, stream, c except: pass - try: # attempt to create te stream table + try: # attempt to create the stream table Connection.execute("SET SCHEMA " + schema + ";") Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", columns, ");"])) except BaseException as ex: add_log(40, ex) - pass def mapi_flush_baskets(schema, stream, baskets): @@ -55,4 +54,3 @@ def mapi_flush_baskets(schema, stream, b Connection.execute(''.join(["CALL iot.basket('", schema, "','", stream, "','", baskets, "');"])) except BaseException as ex: add_log(40, ex) - pass diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -2,21 +2,17 @@ import copy import datetime import itertools import math +import re import struct +from abc import ABCMeta, abstractmethod _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list