Hi, The values must be valid encoded Protobuf messages [1], while in your attached code snippet you are sending utf-8 encoded JSON strings. You can take a look at this example with a generator that produces Protobuf messages [2][3]
[1] https://developers.google.com/protocol-buffers/docs/pythontutorial [2] https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 [3] https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <sunilsattir...@gmail.com> wrote: > Hi, Based on the example from > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > I am trying to ingest json data in kafka, but unable to achieve based on > the examples. > > event-generator.py > > def produce(): > request = {} > request['id'] = "abc-123" > request['field1'] = "field1-1" > request['field2'] = "field2-2" > request['field3'] = "field3-3" > if len(sys.argv) == 2: > delay_seconds = int(sys.argv[1]) > else: > delay_seconds = 1 > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > for request in random_requests_dict(): > producer.send(topic='test-topic', > value=json.dumps(request).encode('utf-8')) > producer.flush() > time.sleep(delay_seconds) > > Below is the proto definition of the json data ( i dont always know all > the fields, but i know id fields definitely exists) > message.proto > > message MyRow { > string id = 1; > google.protobuf.Struct message = 2; > } > > Below is greeter that received the data > tokenizer.py ( same like greeter.py saving state of id instead of counting > ) > > > @app.route('/statefun', methods=['POST']) > def handle(): > my_row = MyRow() > data = my_row.ParseFromString(request.data) // Is this the right way > to do it? > response_data = handler(request.data) > response = make_response(response_data) > response.headers.set('Content-Type', 'application/octet-stream') > return response > > > but, below is the error message. I am a newbie with proto and appreciate > any help > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > Traceback (most recent call last): > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, > in wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, > in full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, > in handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, > in reraise > raise value > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, > in full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, > in dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/app/tokenizer.py", line 101, in handle > response_data = handler(data) > File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > line 38, in __call__ > request.ParseFromString(request_bytes) > File > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line > 199, in ParseFromString > return self.MergeFromString(serialized) > File > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > line 1131, in MergeFromString > serialized = memoryview(serialized) > TypeError: memoryview: a bytes-like object is required, not 'int' > >