Changeset: dae88d156163 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=dae88d156163 Modified Files: clients/iotclient/documentation/conf.py clients/iotclient/src/Flask/app.py clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/main.py Branch: iot Log Message:
Made important corrections diffs (truncated from 302 to 300 lines): diff --git a/clients/iotclient/documentation/conf.py b/clients/iotclient/documentation/conf.py --- a/clients/iotclient/documentation/conf.py +++ b/clients/iotclient/documentation/conf.py @@ -12,8 +12,8 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys -import os +# import sys +# import os # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -110,7 +110,7 @@ pygments_style = 'sphinx' # If true, keep warnings as "system message" paragraphs in the built documents. #keep_warnings = False -# If true, `todo` and `todoList` produce output, else they produce nothing. +# If true, `ttodo` and `todoList` produce output, else they produce nothing. todo_include_todos = True diff --git a/clients/iotclient/src/Flask/app.py b/clients/iotclient/src/Flask/app.py --- a/clients/iotclient/src/Flask/app.py +++ b/clients/iotclient/src/Flask/app.py @@ -1,6 +1,5 @@ from flask import Flask from flask_restful import Api - from restresources import StreamInput, StreamsInfo, StreamsHandling diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -8,38 +8,37 @@ from flask_restful import Resource from jsonschema import Draft4Validator, FormatChecker from tzlocal import get_localzone from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA -from Streams.streamscontext import IOTStreamsException, IOTStreams +from Streams.streamscontext import IOTStreams from Settings.iotlogger import add_log Streams_Context = None Create_Streams_Validator = None Delete_Streams_Validator = None -Local_Timezone = None +Local_Timezone = get_localzone() # for the correction of dates we must add the system's timezone def init_rest_resources(): global Streams_Context, Create_Streams_Validator, Delete_Streams_Validator, Local_Timezone - Local_Timezone = get_localzone() # for the correction of dates we must add the system's timezone Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, format_checker=FormatChecker()) Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, format_checker=FormatChecker()) try: Streams_Context = IOTStreams() except BaseException as ex: print >> sys.stdout, ex - add_log(50, ex.message) + add_log(50, ex) sys.exit(1) -class StreamInput(Resource): # TODO these operations are not atomic!!! +class StreamInput(Resource): """RESTful API for stream's input""" def get(self, schema_name, stream_name): # check a single stream data try: # check if stream exists, if not return 404 stream = Streams_Context.get_existing_stream(schema_name, stream_name) except BaseException as ex: - add_log(50, ex.message) - return ex.message, 404 + add_log(50, ex) + return ex, 404 return stream.get_data_dictionary(include_number_tuples=True), 200 def post(self, schema_name, stream_name): # add data to a stream @@ -48,14 +47,14 @@ class StreamInput(Resource): # TODO the try: # check if stream exists, if not return 404 stream = Streams_Context.get_existing_stream(schema_name, stream_name) except BaseException as ex: - add_log(50, ex.message) - return ex.message, 404 + add_log(50, ex) + return ex, 404 try: # validate and insert data, if not return 400 stream.validate_and_insert(json.loads(request.data), current_stamp) except BaseException as ex: - add_log(50, ex.message) - return ex.message, 400 + add_log(50, ex) + return ex, 400 return 'The insertions were made with success!', 201 @@ -78,8 +77,8 @@ class StreamsHandling(Resource): Create_Streams_Validator.validate(schema_to_validate) Streams_Context.add_new_stream(schema_to_validate) except BaseException as ex: - add_log(50, ex.message) - return ex.message, 400 + add_log(50, ex) + return ex, 400 else: return 'The stream was created with success!', 201 @@ -88,12 +87,12 @@ class StreamsHandling(Resource): schema_to_validate = json.loads(request.data) Delete_Streams_Validator.validate(schema_to_validate) except BaseException as ex: - add_log(50, ex.message) - return ex.message, 400 + add_log(50, ex) + return ex, 400 try: # check if stream exists, if not return 404 Streams_Context.delete_existing_stream(schema_to_validate) except BaseException as ex: - add_log(50, ex.message) - return ex.message, 404 + add_log(50, ex) + return ex, 404 return 'The stream was deleted with success!', 204 diff --git a/clients/iotclient/src/Settings/filesystem.py b/clients/iotclient/src/Settings/filesystem.py --- a/clients/iotclient/src/Settings/filesystem.py +++ b/clients/iotclient/src/Settings/filesystem.py @@ -39,7 +39,7 @@ def init_file_system(host_identifier=Non Host_Identifier = host_identifier except (Exception, OSError) as ex: print >> sys.stdout, ex - add_log(50, ex.message) + add_log(50, ex) sys.exit(1) diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -22,8 +22,8 @@ def init_monetdb_connection(hostname, po print >> sys.stdout, log_message add_log(20, log_message) except BaseException as ex: - print >> sys.stdout, ex.message - add_log(50, ex.message) + print >> sys.stdout, ex + add_log(50, ex) sys.exit(1) @@ -38,15 +38,21 @@ def mapi_create_stream(schema, stream, c pass try: # attempt to create te stream table - Connection.execute(''.join(["CREATE STREAM TABLE ", schema, ".", stream, " (", columns, ");"])) + Connection.execute("SET SCHEMA " + schema + ";") + Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", columns, ");"])) except BaseException as ex: - add_log(40, ex.message) + add_log(40, ex) pass def mapi_flush_baskets(schema, stream, baskets): try: - Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", stream, "\",\"", baskets, "\");"])) + Connection.execute("SET SCHEMA iot;") + except: + pass + + try: + Connection.execute(''.join(["CALL iot.basket('", schema, "','", stream, "','", baskets, "');"])) except BaseException as ex: - add_log(40, ex.message) + add_log(40, ex) pass diff --git a/clients/iotclient/src/Streams/streams.py b/clients/iotclient/src/Streams/streams.py --- a/clients/iotclient/src/Streams/streams.py +++ b/clients/iotclient/src/Streams/streams.py @@ -38,8 +38,8 @@ def init_streams_hosts(): class StreamException(Exception): """Exception fired when the validation of a stream insert fails""" - def __init__(self, messages): - self.message = messages # dictionary of column -> list of error messages + def __init__(self, message): + self.message = message # dictionary of column -> list of error messages class DataCellStream(object): @@ -134,7 +134,7 @@ class DataCellStream(object): if self._tuples_in_per_basket > 0: self.flush_baskets(last) except BaseException as ex: - add_log(50, ex.message) + add_log(50, ex) self._monitor.release() def validate_and_insert(self, new_data, timestamp): @@ -170,7 +170,7 @@ class DataCellStream(object): try: transposed_data[key] = data_type.process_values(values) # convert into binary except DataValidationException as ex: - batch_errors[key] = ex.message + batch_errors[key] = ex if batch_errors: raise StreamException(message=batch_errors) @@ -213,8 +213,9 @@ class DataCellStream(object): self._tuples_in_per_basket += total_tuples if is_flushing_tuple_based and self._tuples_in_per_basket >= self._flush_method.limit: self.flush_baskets(last=False) - + except BaseException as ex: + self._monitor.release() + add_log(50, ex) + else: + self._monitor.release() add_log(20, 'Inserted %d tuples to stream %s.%s' % (total_tuples, self._schema_name, self._stream_name)) - except BaseException as ex: - add_log(50, ex.message) - self._monitor.release() diff --git a/clients/iotclient/src/Streams/streamscreator.py b/clients/iotclient/src/Streams/streamscreator.py --- a/clients/iotclient/src/Streams/streamscreator.py +++ b/clients/iotclient/src/Streams/streamscreator.py @@ -53,7 +53,7 @@ def validate_schema_and_create_stream(sc new_column.set_nullable(column['nullable']) validated_columns[next_name] = new_column except Exception as ex: - errors[next_name] = ex.message + errors[next_name] = ex break if errors: diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py --- a/clients/iotclient/src/main.py +++ b/clients/iotclient/src/main.py @@ -1,8 +1,10 @@ import getopt import signal import sys +import time from multiprocessing import Process +from threading import Thread from uuid import getnode as get_mac from Flask.app import start_flask_iot_app, start_flask_admin_app from Flask.restresources import init_rest_resources @@ -12,17 +14,25 @@ from Settings.mapiconnection import init from Streams.streamscontext import init_streams_context from Streams.streams import init_streams_hosts -subprocess1 = None -subprocess2 = None +subprocess = None def signal_handler(signal, frame): - subprocess1.terminate() - subprocess2.terminate() + subprocess.terminate() + + +def start_process(admin_host, admin_port, app_host, app_port): + thread1 = Thread(target=start_flask_admin_app, args=(admin_host, admin_port)) + thread2 = Thread(target=start_flask_iot_app, args=(app_host, app_port)) + thread1.start() + time.sleep(1) # problem while handling Flask's loggers, so it is used this sleep + thread2.start() + thread1.join() + thread2.join() def main(argv): - global subprocess1, subprocess2 + global subprocess try: opts, args = getopt.getopt(argv[1:], 'f:l:c:ui:in:ih:ip:ah:ap:h:p:d:u', @@ -92,15 +102,11 @@ def main(argv): init_streams_context() # init streams context init_rest_resources() # init validators for RESTful requests - subprocess1 = Process(target=start_flask_admin_app, args=(admin_host, admin_port)) - subprocess2 = Process(target=start_flask_iot_app, args=(app_host, app_port)) - subprocess1.start() - subprocess2.start() + subprocess = Process(target=start_process, args=(admin_host, admin_port, app_host, app_port)) + subprocess.start() add_log(20, 'Started IOT Stream Server') signal.signal(signal.SIGINT, signal_handler) - - subprocess1.join() - subprocess2.join() + subprocess.join() add_log(20, 'Stopped IOT Stream Server') _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list