Hi,

The values must be valid encoded Protobuf messages [1], while in your
attached code snippet you are sending utf-8 encoded JSON strings.
You can take a look at this example with a generator that produces Protobuf
messages [2][3]

[1] https://developers.google.com/protocol-buffers/docs/pythontutorial
[2]
https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
[3]
https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25

On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <sunilsattir...@gmail.com>
wrote:

> Hi, Based on the example from
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> I am trying to ingest json data in kafka, but unable to achieve based on
> the examples.
>
> event-generator.py
>
> def produce():
>     request = {}
>     request['id'] = "abc-123"
>     request['field1'] = "field1-1"
>     request['field2'] = "field2-2"
>     request['field3'] = "field3-3"
>     if len(sys.argv) == 2:
>         delay_seconds = int(sys.argv[1])
>     else:
>         delay_seconds = 1
>     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
>     for request in random_requests_dict():
>         producer.send(topic='test-topic',
>                       value=json.dumps(request).encode('utf-8'))
>         producer.flush()
>         time.sleep(delay_seconds)
>
> Below is the proto definition of the json data ( i dont always know all
> the fields, but i know id fields definitely exists)
> message.proto
>
> message MyRow {
>     string id = 1;
>     google.protobuf.Struct message = 2;
> }
>
> Below is greeter that received the data
> tokenizer.py ( same like greeter.py saving state of id instead of counting
> )
>
>
> @app.route('/statefun', methods=['POST'])
> def handle():
>     my_row = MyRow()
>     data = my_row.ParseFromString(request.data) // Is this the right way
> to do it?
>     response_data = handler(request.data)
>     response = make_response(response_data)
>     response.headers.set('Content-Type', 'application/octet-stream')
>     return response
>
>
> but, below is the error message. I am a newbie with proto and appreciate
> any help
>
> 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447,
> in wsgi_app
>     response = self.full_dispatch_request()
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952,
> in full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821,
> in handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39,
> in reraise
>     raise value
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950,
> in full_dispatch_request
>     rv = self.dispatch_request()
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936,
> in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File "/app/tokenizer.py", line 101, in handle
>     response_data = handler(data)
>   File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> line 38, in __call__
>     request.ParseFromString(request_bytes)
>   File
> "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line
> 199, in ParseFromString
>     return self.MergeFromString(serialized)
>   File
> "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> line 1131, in MergeFromString
>     serialized = memoryview(serialized)
> TypeError: memoryview: a bytes-like object is required, not 'int'
>
>

Reply via email to