Hello,

I am consuming Kafka messages with Table API connector. I cannot use
DataStream API because it does not support Confluent Avro.

After consuming the messages, I am converting to DataStream API and using
ProcessFunction. The ProcessFunction makes async http calls and emits
results with a completely different structure. This works well. If I don't
do anything, and just do `ds.print()`,  things are printed as expected.

The issues start happening when I convert from DataStream to Table API
again. To do this effectively and sink back to Kafka (with Confluent Avro
serialization), I am specifying `output_type` while calling `process` on
the stream. I have gathered (there might be other ways I am unaware of)
that if I don't specify the schema here, converting back to Table API (with
`t_env.from_data_stream(ds)`) makes it very difficult to serialize the data.

What I am not sure about are the following things:
1. Must I create objects of Flink's datatypes like `pyflink.common.Row`?
2. What should my schema (for output_type) look like given the structure of
the message: https://pastebin.com/kEFBJSBS

I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
error:
https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
inside `Row` is causing some issues.

Thanks!

--
Dhavan

Reply via email to