Changeset: 5b90b8e6494a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5b90b8e6494a
Modified Files:
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotapi/src/WebSockets/jsonschemas.py
        clients/iotapi/src/WebSockets/websockets.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:

Finished stream reading


diffs (truncated from 661 to 300 lines):

diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -47,4 +47,3 @@ def fetch_streams():
     except BaseException as ex:
         print >> sys.stdout, ex
         add_log(50, ex)
-        sys.exit(1)
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -28,8 +28,8 @@ class StreamBasketsHandler(FileSystemEve
     def on_created(self, event):  # whenever a basket directory is created, 
notify to subscribed clients
         if isinstance(event, 'DirCreatedEvent'):
             basket_string = os.path.basename(os.path.normpath(event.src_path))
-            self._stream.append_basket(basket_string)
-            notify_clients(self._stream.get_schema_name(), 
self._stream.get_stream_name())
+            count = self._stream.append_basket(basket_string)
+            notify_clients(self._stream.get_schema_name(), 
self._stream.get_stream_name(), count)
 
     def on_deleted(self, event):
         if isinstance(event, 'DirDeletedEvent'):
@@ -49,6 +49,7 @@ class IOTStream(object):
         for name in os.listdir(self._base_path):
             self.append_basket(name)
         self._lock = RWLock()
+
         self._observer = Observer()
         self._observer.schedule(StreamBasketsHandler(stream=self), 
self._base_path, recursive=False)
         self._observer.start()
@@ -66,6 +67,8 @@ class IOTStream(object):
                 self._lock.acquire_write()
                 self._baskets[int(path)] = count
                 self._lock.release()
+                return count
+        return 0
 
     def delete_basket(self, path):
         if represents_int(path):
@@ -126,7 +129,6 @@ class IOTStream(object):
                 offset = 0
                 current_basket_number += 1
 
-        # TODO check if this is viable, it could be 1000 tuples!!!!
-        keys = results.keys()
+        keys = results.keys()  # TODO check if this is viable for many tuples!!
         tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k 
in keys))]
         return {'total': read_tuples, 'tuples': tuples}
diff --git a/clients/iotapi/src/Streams/streamscontext.py 
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -5,7 +5,7 @@ from WebSockets.websockets import desubs
 
 
 class IOTStreams(object):
-    """Stream's context"""
+    """Streams context"""
 
     @classmethod
     def get_context_entry_name(cls, schema_name, stream_name):
diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py 
b/clients/iotapi/src/WebSockets/jsonschemas.py
--- a/clients/iotapi/src/WebSockets/jsonschemas.py
+++ b/clients/iotapi/src/WebSockets/jsonschemas.py
@@ -1,6 +1,11 @@
-PUBSUB_STREAMS_SCHEMA = {
+SUBSCRIBE_OPTS = ["sub", "subscribe"]
+UNSUBSCRIBE_OPTS = ["unsub", "unsubscribe"]
+CONCAT_SUB_OPTS = SUBSCRIBE_OPTS + UNSUBSCRIBE_OPTS
+READ_OPTS = ["read"]
+
+CLIENTS_INPUTS_SCHEMA = {
     "title": "JSON schema publish/subscribe streams",
-    "description": "Validate data inserted",
+    "description": "Validate clients inputs",
     "$schema": "http://json-schema.org/draft-04/schema#";,
     "type": "object",
 
@@ -8,7 +13,7 @@ PUBSUB_STREAMS_SCHEMA = {
         "properties": {
             "schema": {"type": "string"},
             "stream": {"type": "string"},
-            "action": {"type": "string", "enum": ["sub", "subscribe", "desub", 
"desubscribe"]},
+            "action": {"type": "string", "enum": CONCAT_SUB_OPTS},
         },
         "required": ["schema", "stream", "action"],
         "additionalProperties": False
@@ -22,7 +27,7 @@ PUBSUB_STREAMS_SCHEMA = {
         "properties": {
             "schema": {"type": "string"},
             "stream": {"type": "string"},
-            "action": {"type": "string", "enum": ["read"]},
+            "action": {"type": "string", "enum": READ_OPTS},
             "basket": {"type": "integer", "minimum": 1, "default": 1},
             "limit": {"type": "integer", "minimum": 0, "default": 0},
             "offset": {"type": "integer", "minimum": 0, "default": 0}
diff --git a/clients/iotapi/src/WebSockets/websockets.py 
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -7,24 +7,23 @@ from Streams.streamscontext import Strea
 from Utilities.readwritelock import RWLock
 from jsonschema import Draft4Validator, FormatChecker
 
-from jsonschemas import PUBSUB_STREAMS_SCHEMA
+from jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS, 
UNSUBSCRIBE_OPTS, READ_OPTS
 
-Client_Messages_Validator = Draft4Validator(PUBSUB_STREAMS_SCHEMA, 
format_checker=FormatChecker())
+Client_Messages_Validator = Draft4Validator(CLIENTS_INPUTS_SCHEMA, 
format_checker=FormatChecker())
 WebSocketServer = None
 WebClients = []  # TODO this probably won't scale for many
 WebClientsLock = RWLock()
 
 
-def notify_stream_inserts_to_clients(schema_name, stream_name):
+def notify_stream_inserts_to_clients(schema_name, stream_name, count):
     concatenated_name = IOTStreams.get_context_entry_name(schema_name, 
stream_name)
-    json_message = json.dumps({'notification': {'schema': schema_name, 
'stream': stream_name}})
     WebClientsLock.acquire_read()
     for client in WebClients:
-        client.send_notification_message(concatenated_name, json_message)
+        client.send_notification_message(concatenated_name, schema_name, 
stream_name, count)
     WebClientsLock.release()
 
 
-def desubscribe_removed_streams(concatenated_names):
+def unsubscribe_removed_streams(concatenated_names):
     WebClientsLock.acquire_read()
     for name in concatenated_names:
         for client in WebClients:
@@ -35,85 +34,93 @@ def desubscribe_removed_streams(concaten
 
 
 class IOTAPI(WebSocket):
-    def __init__(self):
-        super(IOTAPI, self).__init__()
+    """Client WebSocket"""
+
+    def __init__(self, server, sock, address):
+        super(IOTAPI, self).__init__(server, sock, address)
         self._subscriptions = {}  # dictionary of schema + '.' + stream -> 
IOTStream
         self._locker = RWLock()
 
-    def handleMessage(self):
+    def sendMessage(self, message):  # overriden
+        super(IOTAPI, self).sendMessage(json.dumps(message))  # send JSON 
Strings to clients
+
+    def handleConnected(self):  # overriden
+        WebClientsLock.acquire_write()
+        WebClients.append(self)
+        WebClientsLock.release()
+        add_log(20, 'Client connected: ' + self.address[0])
+
+    def handleClose(self):  # overriden
+        WebClientsLock.acquire_write()
+        WebClients.remove(self)
+        WebClientsLock.release()
+        add_log(20, 'Client disconnected: ' + self.address[0])
+
+    def handleMessage(self):  # overriden
         if self.opcode != 0x1:  # TEXT frame
             self.sendMessage({"error": "Only TEXT frames allowed!"})
         try:
             input_schema = json.loads(self.data)
             Client_Messages_Validator.validate(input_schema)
+            concatenated_name = 
IOTStreams.get_context_entry_name(input_schema['schema'], 
input_schema['stream'])
 
-            if input_schema['action'] in ("sub", "subscribe"):
-                concatenated_name = 
IOTStreams.get_context_entry_name(input_schema['schema'], 
input_schema['stream'])
-                self.subscribe(self, concatenated_name)
-            elif input_schema['action'] in ("desub", "desubscribe"):
-                concatenated_name = 
IOTStreams.get_context_entry_name(input_schema['schema'], 
input_schema['stream'])
-                self.desubscribe(self, concatenated_name)
+            if input_schema['action'] in SUBSCRIBE_OPTS:
+                self.subscribe(concatenated_name)
+            elif input_schema['action'] in UNSUBSCRIBE_OPTS:
+                self.unsubscribe(concatenated_name)
+            elif input_schema['action'] in READ_OPTS:
+                self.read_stream_batch(concatenated_name, 
int(input_schema['basket']), int(input_schema['limit']),
+                                       int(input_schema['offset']))
         except BaseException as ex:
+            self.sendMessage({"error": ex})
             add_log(50, ex)
-            self.sendMessage(json.dumps({"error": ex}))
-
-    def handleConnected(self):
-        WebClientsLock.acquire_write()
-        WebClients.append(self)
-        WebClientsLock.release()
-        add_log(20, 'Client connected: ' + self.address[0])
-
-    def handleClose(self):
-        WebClientsLock.acquire_write()
-        WebClients.remove(self)
-        WebClientsLock.release()
-        add_log(20, 'Client disconnected: ' + self.address[0])
 
     def subscribe(self, concatenated_name):
-        try:
-            stream = Streams_context.get_existing_stream(concatenated_name)
-        except:
-            raise
+        stream = Streams_context.get_existing_stream(concatenated_name)
         self._locker.acquire_write()
         self._subscriptions[concatenated_name] = stream
         self._locker.release()
-        self.sendMessage(json.dumps({"subscribed": "Subscribed to " + 
concatenated_name}))
+        self.sendMessage({"subscribed": "Subscribed to " + concatenated_name})
         add_log(20, ''.join(['Client ', self.address[0], 'subscribed stream ', 
concatenated_name]))
 
-    def desubscribe(self, concatenated_name):
+    def unsubscribe(self, concatenated_name):
         self._locker.acquire_write()
         if concatenated_name not in self._subscriptions:
             self._locker.release()
-            self.sendMessage(json.dumps({"error": "Stream " + 
concatenated_name + " not present in subscriptions!"}))
+            self.sendMessage({"error": "Stream " + concatenated_name + " not 
present in subscriptions!"})
         else:
             del self._subscriptions[concatenated_name]
             self._locker.release()
-            self.sendMessage(json.dumps({"desubscribed": "Desubscribed to " + 
concatenated_name}))
-            add_log(20, ''.join(['Client ', self.address[0], 'desubscribed 
stream ', concatenated_name]))
+            self.sendMessage({"unsubscribed": "Unsubscribed to " + 
concatenated_name})
+            add_log(20, ''.join(['Client ', self.address[0], ' unsubscribed 
stream ', concatenated_name]))
 
     def remove_subscribed_stream(self, concatenated_name):
         self._locker.acquire_write()
         if concatenated_name in self._subscriptions:
             del self._subscriptions[concatenated_name]
         self._locker.release()
-        self.sendMessage(json.dumps({"removed": "Stream removed from context: 
" + concatenated_name}))
+        self.sendMessage({"removed": "Stream removed from context: " + 
concatenated_name})
 
-    def send_notification_message(self, concatenated_name, json_message):
+    def send_notification_message(self, concatenated_name, schema_name, 
stream_name, count):
         self._locker.acquire_read()
         if concatenated_name in self._subscriptions:
             self._locker.release()
-            self.sendMessage(json_message)
+            self.sendMessage({'notification': {'schema': schema_name, 
'stream': stream_name, 'tuples': count}})
             add_log(20, ''.join(['Stream notification sent to client ', 
self.address[0]]))
         else:
             self._locker.release()
 
+    def read_stream_batch(self, concatenated_name, basket_number, limit, 
offset):
+        stream = Streams_context.get_existing_stream(concatenated_name)
+        self.sendMessage(stream.read_tuples(basket_number, limit, offset))
+
 
 def init_websockets(host, port):
     global WebSocketServer
     try:
         WebSocketServer = SimpleWebSocketServer(host, port, IOTAPI)
         WebSocketServer.serveforever()
-    except (Exception, OSError) as ex:
+    except (BaseException, OSError) as ex:
         print >> sys.stdout, ex
         add_log(50, ex)
         sys.exit(1)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -1,7 +1,7 @@
+import getpass
 import sys
+
 import pymonetdb
-import getpass
-
 from Settings.iotlogger import add_log
 
 Connection = None
@@ -37,12 +37,11 @@ def mapi_create_stream(schema, stream, c
     except:
         pass
 
-    try:  # attempt to create te stream table
+    try:  # attempt to create the stream table
         Connection.execute("SET SCHEMA " + schema + ";")
         Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", 
columns, ");"]))
     except BaseException as ex:
         add_log(40, ex)
-        pass
 
 
 def mapi_flush_baskets(schema, stream, baskets):
@@ -55,4 +54,3 @@ def mapi_flush_baskets(schema, stream, b
         Connection.execute(''.join(["CALL iot.basket('", schema, "','", 
stream, "','", baskets, "');"]))
     except BaseException as ex:
         add_log(40, ex)
-        pass
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -2,21 +2,17 @@ import copy
 import datetime
 import itertools
 import math
+import re
 import struct
+from abc import ABCMeta, abstractmethod
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to