Hey Dian, Though my HTTP call's response is indeed JSON, I needed to serialize data into Avro for Kafka sink.
Since you said it supports Row inside Row, I investigated deeper and found out that since Row class *sorts by key names*, I had to strictly follow the order in output_type parameter in `process()` function! For example, if my ProcessFunction's output is defined to be following (that is, it yields a structure of this type): Row( relatedPlaylists=Row( likes=1, uploads="1", ) ) In `ds = process(MyProcessFunction(), output_type=output_type)`, the output type must be: Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes", "uploads"], [Types.INT(), Types.STRING()])]) And I cannot swap the order of "likes" and "uploads". That is, the following is wrong: Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads", "likes"], [Types.STRING(), Types.INT()])]) After making these changes, things indeed work out well - I am able to convert back to table API and sink into Kafka with Confluent Avro serialization. Though I find this ordering very cumbersome :D On Wed, 11 May 2022 at 11:35, Dian Fu <dian0511...@gmail.com> wrote: > Hi Dhavan, > > The type of the `ds` appearing in `t_env.from_data_stream(ds) should be > known. Otherwise, it's impossible to infer the schema of the converted > table, as a result, `raw` type will be used which makes the schema of the > resulting table not expected. You could either declare the type of the `ds` > in the DataStream API via `output_type` or declare the type via > `t_env.from_data_stream(ds, schema)` when converting DataStream to table. > > >> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row > `? > It doesn't necessarily have to be Row type. You could also use other > types, e.g. Integer, String, etc. > > >> 2. What should my schema (for output_type) look like given the > structure of the message: https://pastebin.com/kEFBJSBS > Is the output data a JSON string? If this is the case, I guess you could > just declare the output_type as Types.STRING(). > > >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird) > error: > >> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row` > inside `Row` is causing some issues. > It supports `Row` inside `Row`. Could you share an example which could > reproduce this issue? > > Regards, > Dian > > > On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya <dhavan.vai...@kofluence.com> > wrote: > >> Hello, >> >> I am consuming Kafka messages with Table API connector. I cannot use >> DataStream API because it does not support Confluent Avro. >> >> After consuming the messages, I am converting to DataStream API and using >> ProcessFunction. The ProcessFunction makes async http calls and emits >> results with a completely different structure. This works well. If I don't >> do anything, and just do `ds.print()`, things are printed as expected. >> >> The issues start happening when I convert from DataStream to Table API >> again. To do this effectively and sink back to Kafka (with Confluent Avro >> serialization), I am specifying `output_type` while calling `process` on >> the stream. I have gathered (there might be other ways I am unaware of) >> that if I don't specify the schema here, converting back to Table API (with >> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the >> data. >> >> What I am not sure about are the following things: >> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`? >> 2. What should my schema (for output_type) look like given the structure >> of the message: https://pastebin.com/kEFBJSBS >> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird) >> error: >> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row` >> inside `Row` is causing some issues. >> >> Thanks! >> >> -- >> Dhavan >> >