Hi Affe, Regarding the implementation, from the interface of `DeserializationSchema#deserialize(byte[], Collector<T>)`, it might emit multiple rows. So this is just a more generic implementation instead of hard-code dropping rows.
Even though, currently, there is no built-in key format that will emit multiple rows. However, we can't assume there is no use case for it. I can imagine some special formats that group the same values into one key-value entry. In the terms of implementation, I think it makes sense to do the cartesian product. Best, Jark On Mon, 9 May 2022 at 13:57, Yufei Zhang <affei...@gmail.com> wrote: > I was reading the source code of Kafka SQL Source Connector, I noticed that > in DynamicKafkaDeserializationSchema[1], when the schema emits multiple > keys, the code is doing a cartesian product of the key rows and value rows. > I know that in CDC, a format can emit multiple rows (UPDATE_BEFORE and > UPDATE_AFTER rows) for a single message, but I'm wondering > 1. In what case will a key emits multiple rows? > 2. what does the cartesian product of key and value rows represent? (if > there is only 1 keyRow, then it make sense, but when both keyRows and > valueRows have more than 1, I failed to infer the possible use case) > > ``` > > // otherwise emit a value for each key > for (RowData physicalKeyRow : physicalKeyRows) { > emitRow((GenericRowData) physicalKeyRow, (GenericRowData) > physicalValueRow); > } > > > I searched in the docs but failed to understand the design here. Any hints > would be appreciated~ Thanks~ > > > > > > > > [1] > > https://github.com/apache/flink/blob/1fc26192cf794961af4f6933c155daf86eb8a0fe/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L224 > > > Cheers, > Affe >