Hi, Based on the example from 
https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
 I am trying to ingest json data in kafka, but unable to achieve based on the 
examples.

event-generator.py

def produce():
    request = {}
    request['id'] = "abc-123"
    request['field1'] = "field1-1"
    request['field2'] = "field2-2"
    request['field3'] = "field3-3"
    if len(sys.argv) == 2:
        delay_seconds = int(sys.argv[1])
    else:
        delay_seconds = 1
    producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
    for request in random_requests_dict():
        producer.send(topic='test-topic',
                      value=json.dumps(request).encode('utf-8'))
        producer.flush()
        time.sleep(delay_seconds)

Below is the proto definition of the json data ( i dont always know all the 
fields, but i know id fields definitely exists)
message.proto

message MyRow {
    string id = 1;
    google.protobuf.Struct message = 2;
}

Below is greeter that received the data
tokenizer.py ( same like greeter.py saving state of id instead of counting )


@app.route('/statefun', methods=['POST'])
def handle():
    my_row = MyRow()
    data = my_row.ParseFromString(request.data) // Is this the right way to do 
it?
    response_data = handler(request.data)
    response = make_response(response_data)
    response.headers.set('Content-Type', 'application/octet-stream')
    return response


but, below is the error message. I am a newbie with proto and appreciate any 
help

11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, in 
wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, in 
full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, in 
handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in 
reraise
    raise value
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, in 
full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, in 
dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/app/tokenizer.py", line 101, in handle
    response_data = handler(data)
  File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", line 
38, in __call__
    request.ParseFromString(request_bytes)
  File "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", 
line 199, in ParseFromString
    return self.MergeFromString(serialized)
  File 
"/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
 line 1131, in MergeFromString
    serialized = memoryview(serialized)
TypeError: memoryview: a bytes-like object is required, not 'int'

Reply via email to