Hi, Based on the example from https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example I am trying to ingest json data in kafka, but unable to achieve based on the examples.
event-generator.py def produce(): request = {} request['id'] = "abc-123" request['field1'] = "field1-1" request['field2'] = "field2-2" request['field3'] = "field3-3" if len(sys.argv) == 2: delay_seconds = int(sys.argv[1]) else: delay_seconds = 1 producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) for request in random_requests_dict(): producer.send(topic='test-topic', value=json.dumps(request).encode('utf-8')) producer.flush() time.sleep(delay_seconds) Below is the proto definition of the json data ( i dont always know all the fields, but i know id fields definitely exists) message.proto message MyRow { string id = 1; google.protobuf.Struct message = 2; } Below is greeter that received the data tokenizer.py ( same like greeter.py saving state of id instead of counting ) @app.route('/statefun', methods=['POST']) def handle(): my_row = MyRow() data = my_row.ParseFromString(request.data) // Is this the right way to do it? response_data = handler(request.data) response = make_response(response_data) response.headers.set('Content-Type', 'application/octet-stream') return response but, below is the error message. I am a newbie with proto and appreciate any help 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app response = self.full_dispatch_request() File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request rv = self.handle_user_exception(e) File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise raise value File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request rv = self.dispatch_request() File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/app/tokenizer.py", line 101, in handle response_data = handler(data) File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", line 38, in __call__ request.ParseFromString(request_bytes) File "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line 199, in ParseFromString return self.MergeFromString(serialized) File "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", line 1131, in MergeFromString serialized = memoryview(serialized) TypeError: memoryview: a bytes-like object is required, not 'int'