Changeset: 2d1c8b51a218 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2d1c8b51a218
Added Files:
        clients/iotclient/README
        clients/iotclient/requirements.txt
        clients/iotclient/src/Flask/__init__.py
        clients/iotclient/src/Flask/app.py
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/__init__.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/iotlogger.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/__init__.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/flushing.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/semanticvalidation.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Utilities/__init__.py
        clients/iotclient/src/Utilities/filecreator.py
        clients/iotclient/src/Utilities/numberutilities.py
        clients/iotclient/src/Utilities/readwritelock.py
        clients/iotclient/src/__init__.py
        clients/iotclient/src/main.py
Branch: iot
Log Message:

First version of IOT webserver added


diffs (truncated from 1667 to 300 lines):

diff --git a/clients/iotclient/README b/clients/iotclient/README
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/README
@@ -0,0 +1,30 @@
+MonetDB IOT RESTful WebServer to use with the new DataCell engine written in 
Python 2 using Flask-RESTful framework.
+
+The program is composed by two webservers: one for administration purposes 
regarding the creation and deletion of streams, and other for stream input 
using JSON requests.
+
+
+REST resources available on administration webserver running by default on 
port 8001 (should be privated from outside networks):
+
+/streams - GET - Returns JSON file with details about all the streams 
currently supported on the webserver.
+
+/context - POST - Creates a stream using the JSON Schema defined on 
src/Streams/jsonschemas.py on CREATE_STREAMS_SCHEMA variable.
+
+/context - DELETE - Deletes a stream using the JSON Schema defined on 
src/Streams/jsonschemas.py on DELETE_STREAMS_SCHEMA variable.
+
+
+REST resources available on iot webserver running by default on port 8000:
+
+/streams - GET - Returns JSON file with details about all the streams 
currently supported on the webserver.
+
+/stream/<string:schema_name>/<string:stream_name> - GET - Returns JSON file 
with details of the stream_name on schema_name
+
+/stream/<string:schema_name>/<string:stream_name> - POST - Inserts data to the 
stream using a JSON schema generated while creating the stream. The input must 
be a JSON array even for a single input.
+
+
+Currently supports most of MonetDB datatypes. An implicit Timestamp column is 
added with the tuples arrival date. The flushing of the baskets can be either 
Tuple or Time based, as specied on the streams' creation.
+
+More details coming soon.
+
+
+Maintainer: Pedro Ferreira at CWI, email: p.e.ferre...@cwi.nl
+
diff --git a/clients/iotclient/requirements.txt 
b/clients/iotclient/requirements.txt
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/requirements.txt
@@ -0,0 +1,8 @@
+Flask-RESTful==0.3.5
+jsonschema==2.5.1
+rfc3987==1.3.5
+strict-rfc3339==0.6
+python-dateutil==2.5.2
+pytz==2016.3
+pymonetdb==1.0
+tzlocal==1.2.2
diff --git a/clients/iotclient/src/Flask/__init__.py 
b/clients/iotclient/src/Flask/__init__.py
new file mode 100644
diff --git a/clients/iotclient/src/Flask/app.py 
b/clients/iotclient/src/Flask/app.py
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/src/Flask/app.py
@@ -0,0 +1,24 @@
+from flask import Flask
+from flask_restful import Api
+
+from restresources import StreamInput, StreamsInfo, StreamsHandling
+
+
+def start_flask_iot_app(host, port):
+    iot_app = Flask(__name__)
+    iot_app.config['BUNDLE_ERRORS'] = True
+    iot_api = Api(iot_app)
+
+    iot_api.add_resource(StreamsInfo, '/streams')
+    iot_api.add_resource(StreamInput, 
'/stream/<string:schema_name>/<string:stream_name>')
+    iot_app.run(host=host, port=port, threaded=True)
+
+
+def start_flask_admin_app(host, port):
+    admin_app = Flask(__name__)
+    admin_app.config['BUNDLE_ERRORS'] = True
+    admin_api = Api(admin_app)
+
+    admin_api.add_resource(StreamsInfo, '/streams')
+    admin_api.add_resource(StreamsHandling, '/context')
+    admin_app.run(host=host, port=port, threaded=True)
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -0,0 +1,78 @@
+import datetime
+import json
+import pytz
+
+from flask import request
+from flask_restful import Resource
+from jsonschema import Draft4Validator, FormatChecker
+from tzlocal import get_localzone
+from src.Streams.jsonschemas import CREATE_STREAMS_SCHEMA, 
DELETE_STREAMS_SCHEMA
+from src.Streams.streamscontext import IOTStreamsException, IOTStreams
+
+Stream_context = IOTStreams()
+local_tz = get_localzone()  # for the correction of dates we must add the 
timezone
+
+
+class StreamInput(Resource):
+    """RESTful API for stream's input"""
+
+    def get(self, schema_name, stream_name):  # check a single stream data
+        try:  # check if stream exists, if not return 404
+            stream = Stream_context.get_existing_stream(schema_name, 
stream_name)
+        except IOTStreamsException as ex:
+            return ex.message, 404
+        return stream.get_data_dictionary(), 200
+
+    def post(self, schema_name, stream_name):  # add data to a stream
+        current_stamp = 
datetime.datetime.now(pytz.utc).astimezone(local_tz).isoformat()
+
+        try:  # check if stream exists, if not return 404
+            stream = Stream_context.get_existing_stream(schema_name, 
stream_name)
+        except IOTStreamsException as ex:
+            return ex.message, 404
+
+        try:  # validate and insert data, if not return 400
+            stream.validate_and_insert(json.loads(request.data), current_stamp)
+        except Exception as ex:
+            return ex.message, 400
+        return '', 201  # all ok, return 201
+
+
+class StreamsInfo(Resource):
+    """Collect all streams information"""
+
+    def get(self):  # get all streams data
+        return Stream_context.get_streams_data(), 200
+
+
+class StreamsHandling(Resource):
+    """Admin class for creating/deleting streams"""
+
+    CREATE_STREAMS_VALIDATOR = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
+    DELETE_STREAMS_VALIDATOR = Draft4Validator(DELETE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
+
+    def __init__(self):
+        super(StreamsHandling, self).__init__()
+
+    def post(self):
+        try:
+            schema_to_validate = json.loads(request.data)
+            
StreamsHandling.CREATE_STREAMS_VALIDATOR.validate(schema_to_validate)
+            Stream_context.add_new_stream(schema_to_validate)
+        except Exception as ex:
+            return ex.message, 400
+        else:
+            return '', 201
+
+    def delete(self):
+        try:
+            schema_to_validate = json.loads(request.data)
+            
StreamsHandling.DELETE_STREAMS_VALIDATOR.validate(schema_to_validate)
+        except Exception as ex:
+            return ex.message, 400
+
+        try:  # check if stream exists, if not return 404
+            Stream_context.delete_existing_stream(schema_to_validate)
+        except IOTStreamsException as ex:
+            return ex.message, 404
+        return '', 204
diff --git a/clients/iotclient/src/Settings/__init__.py 
b/clients/iotclient/src/Settings/__init__.py
new file mode 100644
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -0,0 +1,27 @@
+import os
+import sys
+
+baskets_base_location = None
+
+if sys.platform in ("linux", "linux2", "darwin"):
+    filesystem_location = '/etc/iotcollector'
+elif sys.platform == "win32":
+    filesystem_location = os.path.dirname(os.path.realpath(__file__))
+
+
+def set_filesystem_location(new_location):
+    global filesystem_location
+
+    if os.path.isdir(new_location):
+        filesystem_location = new_location
+    else:
+        print >> sys.stderr, "The provided filesystem doesn't exist!"
+        sys.exit(1)
+
+
+def init_file_system():
+    global baskets_base_location
+
+    baskets_base_location = os.path.join(filesystem_location, "baskets")
+    if not os.path.exists(baskets_base_location):
+        os.makedirs(baskets_base_location)
diff --git a/clients/iotclient/src/Settings/iotlogger.py 
b/clients/iotclient/src/Settings/iotlogger.py
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/src/Settings/iotlogger.py
@@ -0,0 +1,34 @@
+import logging
+import os
+import sys
+
+if sys.platform in ("linux", "linux2", "darwin"):
+    logging_location = '/var/log/iotdaemon/iot.log'
+elif sys.platform == "win32":
+    logging_location = os.path.relpath("iot.log")
+
+logging_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+logger = logging.getLogger("IOTLog")
+
+
+def set_logging_location(new_location):
+    global logging_location
+    logging_location = new_location
+
+
+def init_logging():
+    global logger
+    try:
+        logger = logging.getLogger("IOTLog")
+        logger.setLevel(logging.DEBUG)
+        formatter = logging.Formatter(logging_format)
+
+        logging_path = os.path.dirname(logging_location)
+        if not os.path.exists(logging_path):
+            os.makedirs(logging_path)
+        log_handler = logging.FileHandler(logging_location, mode='a+')
+        log_handler.setFormatter(formatter)
+        logger.addHandler(log_handler)
+    except Exception as ex:
+        print >> sys.stderr, ex.message
+        sys.exit(1)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -0,0 +1,33 @@
+import sys
+import pymonetdb
+
+Connection = None
+
+
+def init_monetdb_connection(hostname, port, user_name, database):
+    global Connection
+
+    user_password = 'monetdb'  # raw_input("Enter Password: ")
+
+    try:
+        Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
+                                       database=database)
+    except Exception as ex:
+        print >> sys.stderr, ex.message
+        sys.exit(1)
+
+
+def close_monetdb_connection():
+    Connection.close()
+
+
+def mapi_create_stream(schema, stream, columns):
+    try:  # create schema if not exists, ignore the error if already exists
+        Connection.execute("CREATE SCHEMA " + schema + ";")
+    except:
+        pass
+    Connection.execute(''.join(["CREATE STREAM TABLE ", schema, ".", stream, " 
(", columns, ");"]))
+
+
+def mapi_flush_baskets(schema, stream, baskets):
+    Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", stream, 
"\",\"", baskets, "\");"]))
diff --git a/clients/iotclient/src/Streams/__init__.py 
b/clients/iotclient/src/Streams/__init__.py
new file mode 100644
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
new file mode 100644
--- /dev/null
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -0,0 +1,577 @@
+import datetime
+import dateutil
+import itertools
+import struct
+import copy
+
+from abc import ABCMeta, abstractmethod
+from dateutil import parser
+from src.Streams.jsonschemas import UUID_REG
+
+# TODO later check the byte order 
https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
+# TODO Also check the consequences of aligment on packing HUGEINTs!
+# TODO The null constants might change from system to system due to different 
CPU's
+ALIGNMENT = '<'  # for now is little-endian for Intel CPU's
+
+NIL_STRING = "\200"
+
+INT8_MIN = 0x80
+INT16_MIN = 0x8000
+INT32_MIN = 0x80000000
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to