Hey Dian,

Though my HTTP call's response is indeed JSON, I needed to serialize data
into Avro for Kafka sink.

Since you said it supports Row inside Row, I investigated deeper and found
out that since Row class *sorts by key names*, I had to strictly follow the
order in output_type parameter in `process()` function! For example, if my
ProcessFunction's output is defined to be following (that is, it yields a
structure of this type):

Row(
  relatedPlaylists=Row(
    likes=1,
    uploads="1",
  )
)

In `ds = process(MyProcessFunction(), output_type=output_type)`, the output
type must be:
Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes",
"uploads"], [Types.INT(), Types.STRING()])])

And I cannot swap the order of "likes" and "uploads". That is, the
following is wrong:
Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads",
"likes"], [Types.STRING(), Types.INT()])])

After making these changes, things indeed work out well - I am able to
convert back to table API and sink into Kafka with Confluent Avro
serialization. Though I find this ordering very cumbersome :D

On Wed, 11 May 2022 at 11:35, Dian Fu <dian0511...@gmail.com> wrote:

> Hi Dhavan,
>
> The type of the `ds`  appearing in `t_env.from_data_stream(ds) should be
> known. Otherwise, it's impossible to infer the schema of the converted
> table, as a result, `raw` type will be used which makes the schema of the
> resulting table not expected. You could either declare the type of the `ds`
> in the DataStream API via `output_type` or declare the type via
> `t_env.from_data_stream(ds, schema)` when converting DataStream to table.
>
> >> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row
> `?
> It doesn't necessarily have to be Row type. You could also use other
> types, e.g. Integer, String, etc.
>
> >> 2. What should my schema (for output_type) look like given the
> structure of the message: https://pastebin.com/kEFBJSBS
> Is the output data a JSON string? If this is the case, I guess you could
> just declare the output_type as Types.STRING().
>
> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
> error:
> >> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
> inside `Row` is causing some issues.
> It supports `Row` inside `Row`. Could you share an example which could
> reproduce this issue?
>
> Regards,
> Dian
>
>
> On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya <dhavan.vai...@kofluence.com>
> wrote:
>
>> Hello,
>>
>> I am consuming Kafka messages with Table API connector. I cannot use
>> DataStream API because it does not support Confluent Avro.
>>
>> After consuming the messages, I am converting to DataStream API and using
>> ProcessFunction. The ProcessFunction makes async http calls and emits
>> results with a completely different structure. This works well. If I don't
>> do anything, and just do `ds.print()`,  things are printed as expected.
>>
>> The issues start happening when I convert from DataStream to Table API
>> again. To do this effectively and sink back to Kafka (with Confluent Avro
>> serialization), I am specifying `output_type` while calling `process` on
>> the stream. I have gathered (there might be other ways I am unaware of)
>> that if I don't specify the schema here, converting back to Table API (with
>> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the
>> data.
>>
>> What I am not sure about are the following things:
>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`?
>> 2. What should my schema (for output_type) look like given the structure
>> of the message: https://pastebin.com/kEFBJSBS
>>
>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
>> error:
>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
>> inside `Row` is causing some issues.
>>
>> Thanks!
>>
>> --
>> Dhavan
>>
>

Reply via email to