Changeset: 635c867ab029 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=635c867ab029 Added Files: clients/iotapi/tests/websocketclient.py Modified Files: clients/iotapi/requirements.txt clients/iotapi/src/WebSockets/jsonschemas.py clients/iotapi/src/WebSockets/websockets.py clients/iotapi/tests/frontendtests.py clients/iotapi/tests/main.py clients/iotclient/requirements.txt Branch: iot Log Message:
Fixed front end test diffs (truncated from 449 to 300 lines): diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt --- a/clients/iotapi/requirements.txt +++ b/clients/iotapi/requirements.txt @@ -1,12 +1,12 @@ git+https://github.com/dpallot/simple-websocket-server.git -IPy==0.83 -jsonschema==2.5.1 -python-dateutil==2.5.3 -python-monetdb==11.24.0 -pytz==2016.4 -requests==2.10.0 -Sphinx==1.4.4 -sphinx-rtd-theme==0.1.9 -tzlocal==1.2.2 -watchdog==0.8.3 -websocket-client==0.37.0 +IPy>=0.83 +jsonschema>=2.5.1 +python-dateutil>=2.5.3 +python-monetdb>=11.19.3.2 +pytz>=2016.4 +requests>=2.10.0 +Sphinx>=1.4.4 +sphinx-rtd-theme>=0.1.9 +tornado>=3.2.2 +tzlocal>=1.2.2 +watchdog>=0.8.3 diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py b/clients/iotapi/src/WebSockets/jsonschemas.py --- a/clients/iotapi/src/WebSockets/jsonschemas.py +++ b/clients/iotapi/src/WebSockets/jsonschemas.py @@ -1,37 +1,35 @@ SUBSCRIBE_OPTS = ["sub", "subscribe"] UNSUBSCRIBE_OPTS = ["unsub", "unsubscribe"] INFO_OPTS = ["info"] -CONCAT_SUB_OPTS = SUBSCRIBE_OPTS + UNSUBSCRIBE_OPTS READ_OPTS = ["read"] CLIENTS_INPUTS_SCHEMA = { - "title": "JSON schema publish/subscribe streams", + "title": "JSON schema fro web api", "description": "Validate clients inputs", "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", - "anyOf": [{ "properties": { + "request": {"type": "string", "enum": SUBSCRIBE_OPTS + UNSUBSCRIBE_OPTS + INFO_OPTS}, "schema": {"type": "string"}, - "stream": {"type": "string"}, - "request": {"type": "string", "enum": CONCAT_SUB_OPTS + INFO_OPTS}, + "stream": {"type": "string"} }, - "required": ["schema", "stream", "request"], + "required": ["request", "schema", "stream"], "additionalProperties": False }, { "properties": { + "request": {"type": "string", "enum": READ_OPTS}, "schema": {"type": "string"}, "stream": {"type": "string"}, - "request": {"type": "string", "enum": READ_OPTS}, - "basket": {"type": "integer", "minimum": 1, "default": 1}, + "basket": {"type": "integer", "minimum": 1}, "limit": {"type": "integer", "minimum": 0, "default": 100}, "offset": {"type": "integer", "minimum": 0, "default": 0} }, - "required": ["schema", "stream", "request"], + "required": ["request", "schema", "stream", "basket"], "additionalProperties": False }, { "properties": { - "request": {"type": "string", "enum": INFO_OPTS} # for all streams + "request": {"type": "string", "enum": INFO_OPTS} }, "required": ["request"], "additionalProperties": False diff --git a/clients/iotapi/src/WebSockets/websockets.py b/clients/iotapi/src/WebSockets/websockets.py --- a/clients/iotapi/src/WebSockets/websockets.py +++ b/clients/iotapi/src/WebSockets/websockets.py @@ -68,10 +68,10 @@ class IOTAPI(WebSocket): elif input_schema['request'] in UNSUBSCRIBE_OPTS: self.unsubscribe(input_schema['schema'], input_schema['stream']) elif input_schema['request'] in READ_OPTS: - basket = input_schema.get('basket', 1) limit = input_schema.get('limit', 100) offset = input_schema.get('offset', 0) - self.read_stream_batch(input_schema['schema'], input_schema['stream'], basket, limit, offset) + self.read_stream_batch(input_schema['schema'], input_schema['stream'], + input_schema['basket'], limit, offset) elif input_schema['request'] in INFO_OPTS: if len(input_schema) == 1: # get all streams information self.get_streams_data() @@ -88,7 +88,7 @@ class IOTAPI(WebSocket): self._subscriptions[concatenated_name] = stream self._subscriptions_locker.release() self.sendJSONMessage((('response', 'subscribed'), ('schema', schema_name), ('stream', stream_name))) - add_log(20, ''.join(['Client ', self.address[0], 'subscribed to stream ', concatenated_name])) + add_log(20, ''.join(['Client ', self.address[0], ' subscribed to stream ', concatenated_name])) def unsubscribe(self, schema_name, stream_name): concatenated_name = IOTStreams.get_context_entry_name(schema_name, stream_name) diff --git a/clients/iotapi/tests/frontendtests.py b/clients/iotapi/tests/frontendtests.py --- a/clients/iotapi/tests/frontendtests.py +++ b/clients/iotapi/tests/frontendtests.py @@ -1,100 +1,107 @@ import json import os import requests -import websocket from distutils.dir_util import copy_tree from threading import Thread from time import sleep +from tornado import ioloop from unittest import TestCase +from websocketclient import WebSocketClient __all__ = ['NullablesTest'] WEB_SOCKETS_THREADS_TIMEOUT = 15 -class BaseFrontEndTest(TestCase): +class BaseFrontEndTest(TestCase, WebSocketClient): - def __init__(self, **kwargs): - super(BaseFrontEndTest, self).__init__() - self._web_server_baskets_location = os.path.join(kwargs['iot_client_path'], 'baskets') - self._web_api_baskets_location = os.path.join(kwargs['iot_api_path'], 'baskets') - self.schema = "tests" - - def export_inserts(self, schema, stream, basket): - input_dir = os.path.join(self._web_server_baskets_location, schema, stream, basket) - output_dir = os.path.join(self._web_api_baskets_location, schema, stream, basket) - copy_tree(input_dir, output_dir) - - -class TestWebSocket(websocket.WebSocketApp): - - def __init__(self, test, url, header=[], on_open=None, on_message=None, on_error=None, on_close=None, on_ping=None, - on_pong=None, on_cont_message=None, keep_running=True, get_mask_key=None, cookie=None, - subprotocols=None, on_data=None): - super(TestWebSocket, self).__init__(url, header, on_open, on_message, on_error, on_close, on_ping, - on_pong, on_cont_message, keep_running, get_mask_key, cookie, - subprotocols, on_data) - self.test = test + def __init__(self, iot_client_path, iot_api_path, stream_name): + TestCase.__init__(self) + WebSocketClient.__init__(self) + self._web_server_baskets_location = os.path.join(iot_client_path, 'baskets') + self._web_api_baskets_location = os.path.join(iot_api_path, 'baskets') + self.schema_name = "tests" + self.stream_name = stream_name self.ws_state = 1 - -def on_open(ws): - ws.send(json.dumps({"request": "sub", "schema": ws.test.schema, "stream": ws.test.stream})) - - -def on_message(ws, message): - resp = json.loads(message) - if resp['response'] == 'error': - ws.test.fail(msg=resp['message']) - elif ws.ws_state == 1: - ws.test.assertDictEqual({"response": "subscribed", "schema": ws.test.schema, "stream": ws.test.stream}, resp) - ws.ws_state = 2 - elif ws.ws_state == 2: - ws.test.assertDictEqual({"response": "notification", "schema": ws.test.schema, "stream": ws.test.stream, - "basket": 1, "count": 3}, resp) - ws.send(json.dumps({"request": "read", "schema": ws.test.schema, "stream": ws.test.stream, - "basket": 1, "offset": 0, "limit": 3})) - ws.ws_state = 3 - elif ws.ws_state == 3: - ws.test.assertIn('implicit_timestamp', resp['tuples'][0], msg='Timestamp not in stream') - del resp['tuples'][0]['implicit_timestamp'] - del resp['tuples'][1]['implicit_timestamp'] - del resp['tuples'][2]['implicit_timestamp'] - res_dic = {'vala': None, "valb": None, "valc": None, "vald": None, "vale": None, "valf": None, "valg": None, - "valh": None, "vali": None, "valj": None, "valk": None, "vall": None, "valm": None, "valn": None, - "valo": None, "valp": None, "valq": None, "valr": None, "vals": None, "valt": None, "valu": None, - "valv": None, "valw": None, "valx": None, "valy": None, "valz": None} - tuples_response = {"response": "read", "schema": ws.test.schema, "stream": ws.test.stream, - "count": 3, "tuples": [res_dic, res_dic, res_dic]} - ws.test.assertDictEqual(tuples_response, resp) - ws.close() - elif ws.ws_state == 4: - ws.test.assertDictEqual({"response": "removed", "schema": ws.test.schema, "stream": ws.test.stream}, resp) - ws.close() - - -def on_error(ws, error): - ws.close() - ws.test.fail(msg=error) - - -def web_socket(test): - ws = TestWebSocket(test=test, url="ws://127.0.0.1:8002/", on_message=on_message, on_open=on_open, on_error=on_error) - test.ws = ws - ws.run_forever() + def export_inserts(self, basket): + input_dir = os.path.join(self._web_server_baskets_location, self.schema_name, self.stream_name, basket) + output_dir = os.path.join(self._web_api_baskets_location, self.schema_name, self.stream_name, basket) + copy_tree(input_dir, output_dir) class NullablesTest(BaseFrontEndTest): - def __init__(self, **kwargs): - super(NullablesTest, self).__init__(**kwargs) - self.stream = "nulls" - self.ws = None + def __init__(self, iot_client_path, iot_api_path): + super(NullablesTest, self).__init__(iot_client_path, iot_api_path, stream_name="nulls") + self._error = "" + + def web_socket_cycle(self): + self.connect("ws://127.0.0.1:8002/") + try: + ioloop.IOLoop.instance().start() + except: + pass + + def _on_connection_success(self): + self.send(''.join(['{"request": "sub", "schema": "', self.schema_name, '", "stream": "', self.stream_name, + '"}'])) + + def _on_connection_error(self, exception): + self.set_error(exception) + + def set_error(self, msg): + self._error = msg + self.close() + ioloop.IOLoop.instance().stop() + + def _on_message(self, message): + resp = json.loads(message) + if resp['response'] == 'error': + self.set_error("Received error message!") + elif self.ws_state == 1: + if resp != {"response": "subscribed", "schema": self.schema_name, "stream": self.stream_name}: + self.set_error("Wrong subscribed response!") + else: + self.ws_state = 2 + elif self.ws_state == 2: + correct_dic = {"response": "notification", "schema": self.schema_name, "stream": self.stream_name, + "basket": 1, "count": 3} + if resp != correct_dic: + self.set_error("Wrong notification response!") + else: + self.send(''.join(['{"request": "read", "schema": "', self.schema_name, '","stream": "', + self.stream_name, '", "basket": 1, "offset": 0, "limit": 3}'])) + self.ws_state = 3 + elif self.ws_state == 3: + if 'implicit_timestamp' not in resp['tuples'][0]: + self.set_error('Timestamp not in result stream') + else: + del resp['tuples'][0]['implicit_timestamp'] + del resp['tuples'][1]['implicit_timestamp'] + del resp['tuples'][2]['implicit_timestamp'] + res = {"vala": None, "valb": None, "valc": None, "vald": None, "vale": None, "valf": None, "valg": None, + "valh": None, "vali": None, "valj": None, "valk": None, "vall": None, "valm": None, "valn": None, + "valo": None, "valp": None, "valq": None, "valr": None, "vals": None, "valt": None, "valu": None, + "valv": None, "valw": None, "valx": None, "valy": None, "valz": None} + tuples_response = {"response": "read", "schema": self.schema_name, "stream": self.stream_name, + "count": 3, "tuples": [res, res, res]} + if resp != tuples_response: + self.set_error("Wrong notification response!") + else: + self.ws_state = 4 + elif self.ws_state == 4: + if resp != {"response": "removed", "schema": self.schema_name, "stream": self.stream_name}: + self.set_error("Wrong removed response!") + else: + self.close() + ioloop.IOLoop.instance().stop() def runTest(self): - json_str = {"schema": self.schema, "stream": self.stream, "has_hostname": False, "flushing": {"base": "auto"}, - "columns": [{"name": "vala", "type": "string", "nullable": True}, + json_str = {"schema": self.schema_name, "stream": self.stream_name, "has_hostname": False, + "flushing": {"base": "auto"}, "columns": [ + {"name": "vala", "type": "string", "nullable": True}, {"name": "valb", "type": "uuid", "nullable": True}, {"name": "valc", "type": "mac", "nullable": True}, {"name": "vald", "type": "url", "nullable": True}, @@ -125,24 +132,28 @@ class NullablesTest(BaseFrontEndTest): self.assertEqual(resp.status_code, 201, msg=resp.text) - sleep(2) # we need to sleep to check that the next poll happens + sleep(4) # we need to sleep to check that the next poll happens - thread = Thread(target=web_socket, args=(self, )) + thread = Thread(target=self.web_socket_cycle) thread.start() - resp = requests.post("http://127.0.0.1:8000/stream/%s/%s" % (self.schema, self.stream), json=[{}, {}, {}]) + resp = requests.post("http://127.0.0.1:8000/stream/%s/%s" % (self.schema_name, self.stream_name), + json=[{}, {}, {}]) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list