Hi Dhavan,

The type of the `ds`  appearing in `t_env.from_data_stream(ds) should be
known. Otherwise, it's impossible to infer the schema of the converted
table, as a result, `raw` type will be used which makes the schema of the
resulting table not expected. You could either declare the type of the `ds`
in the DataStream API via `output_type` or declare the type via
`t_env.from_data_stream(ds, schema)` when converting DataStream to table.

>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`?
It doesn't necessarily have to be Row type. You could also use other types,
e.g. Integer, String, etc.

>> 2. What should my schema (for output_type) look like given the structure
of the message: https://pastebin.com/kEFBJSBS
Is the output data a JSON string? If this is the case, I guess you could
just declare the output_type as Types.STRING().

>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
error:
>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
inside `Row` is causing some issues.
It supports `Row` inside `Row`. Could you share an example which could
reproduce this issue?

Regards,
Dian


On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya <dhavan.vai...@kofluence.com>
wrote:

> Hello,
>
> I am consuming Kafka messages with Table API connector. I cannot use
> DataStream API because it does not support Confluent Avro.
>
> After consuming the messages, I am converting to DataStream API and using
> ProcessFunction. The ProcessFunction makes async http calls and emits
> results with a completely different structure. This works well. If I don't
> do anything, and just do `ds.print()`,  things are printed as expected.
>
> The issues start happening when I convert from DataStream to Table API
> again. To do this effectively and sink back to Kafka (with Confluent Avro
> serialization), I am specifying `output_type` while calling `process` on
> the stream. I have gathered (there might be other ways I am unaware of)
> that if I don't specify the schema here, converting back to Table API (with
> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the
> data.
>
> What I am not sure about are the following things:
> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`?
> 2. What should my schema (for output_type) look like given the structure
> of the message: https://pastebin.com/kEFBJSBS
>
> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
> error:
> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
> inside `Row` is causing some issues.
>
> Thanks!
>
> --
> Dhavan
>

Reply via email to