Hi Affe,

Regarding the implementation, from the interface of
`DeserializationSchema#deserialize(byte[], Collector<T>)`, it might emit
multiple rows.
So this is just a more generic implementation instead of hard-code dropping
rows.

Even though, currently, there is no built-in key format that will emit
multiple rows.
However, we can't assume there is no use case for it. I can imagine some
special formats
that group the same values into one key-value entry. In the terms of
implementation,
I think it makes sense to do the cartesian product.

Best,
Jark

On Mon, 9 May 2022 at 13:57, Yufei Zhang <affei...@gmail.com> wrote:

> I was reading the source code of Kafka SQL Source Connector, I noticed that
> in  DynamicKafkaDeserializationSchema[1], when the schema emits multiple
> keys, the code is doing a cartesian product of the key rows and value rows.
> I know that in CDC, a format can emit multiple rows (UPDATE_BEFORE and
> UPDATE_AFTER rows) for a single message, but I'm wondering
> 1. In what case will a key emits multiple rows?
> 2. what does the cartesian product of key and value rows represent? (if
> there is only 1 keyRow, then it make sense, but when both keyRows and
> valueRows have more than 1, I failed to infer the possible use case)
>
> ```
>
> // otherwise emit a value for each key
> for (RowData physicalKeyRow : physicalKeyRows) {
>     emitRow((GenericRowData) physicalKeyRow, (GenericRowData)
> physicalValueRow);
> }
>
>
> I searched in the docs but failed to understand the design here.  Any hints
> would be appreciated~ Thanks~
>
>
>
>
>
>
>
> [1]
>
> https://github.com/apache/flink/blob/1fc26192cf794961af4f6933c155daf86eb8a0fe/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L224
>
>
> Cheers,
> Affe
>

Reply via email to