Hi Dhavan, The type of the `ds` appearing in `t_env.from_data_stream(ds) should be known. Otherwise, it's impossible to infer the schema of the converted table, as a result, `raw` type will be used which makes the schema of the resulting table not expected. You could either declare the type of the `ds` in the DataStream API via `output_type` or declare the type via `t_env.from_data_stream(ds, schema)` when converting DataStream to table.
>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`? It doesn't necessarily have to be Row type. You could also use other types, e.g. Integer, String, etc. >> 2. What should my schema (for output_type) look like given the structure of the message: https://pastebin.com/kEFBJSBS Is the output data a JSON string? If this is the case, I guess you could just declare the output_type as Types.STRING(). >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird) error: >> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row` inside `Row` is causing some issues. It supports `Row` inside `Row`. Could you share an example which could reproduce this issue? Regards, Dian On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya <dhavan.vai...@kofluence.com> wrote: > Hello, > > I am consuming Kafka messages with Table API connector. I cannot use > DataStream API because it does not support Confluent Avro. > > After consuming the messages, I am converting to DataStream API and using > ProcessFunction. The ProcessFunction makes async http calls and emits > results with a completely different structure. This works well. If I don't > do anything, and just do `ds.print()`, things are printed as expected. > > The issues start happening when I convert from DataStream to Table API > again. To do this effectively and sink back to Kafka (with Confluent Avro > serialization), I am specifying `output_type` while calling `process` on > the stream. I have gathered (there might be other ways I am unaware of) > that if I don't specify the schema here, converting back to Table API (with > `t_env.from_data_stream(ds)`) makes it very difficult to serialize the > data. > > What I am not sure about are the following things: > 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`? > 2. What should my schema (for output_type) look like given the structure > of the message: https://pastebin.com/kEFBJSBS > > I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird) > error: > https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row` > inside `Row` is causing some issues. > > Thanks! > > -- > Dhavan >