Hi Jark, Thanks for the explanation, it answered my question well ~
Only one thing, if the keyRow count is N and value rowCount is M (N, M > 1), the cartesian product might not cover all use cases. But I think we don't need to worry about it for now, since this case is rare and we can discuss it later if we do encounter such a use case. Thanks~ Yufei On Mon, May 9, 2022 at 2:35 PM Jark Wu <imj...@gmail.com> wrote: > Hi Affe, > > Regarding the implementation, from the interface of > `DeserializationSchema#deserialize(byte[], Collector<T>)`, it might emit > multiple rows. > So this is just a more generic implementation instead of hard-code dropping > rows. > > Even though, currently, there is no built-in key format that will emit > multiple rows. > However, we can't assume there is no use case for it. I can imagine some > special formats > that group the same values into one key-value entry. In the terms of > implementation, > I think it makes sense to do the cartesian product. > > Best, > Jark > > On Mon, 9 May 2022 at 13:57, Yufei Zhang <affei...@gmail.com> wrote: > > > I was reading the source code of Kafka SQL Source Connector, I noticed > that > > in DynamicKafkaDeserializationSchema[1], when the schema emits multiple > > keys, the code is doing a cartesian product of the key rows and value > rows. > > I know that in CDC, a format can emit multiple rows (UPDATE_BEFORE and > > UPDATE_AFTER rows) for a single message, but I'm wondering > > 1. In what case will a key emits multiple rows? > > 2. what does the cartesian product of key and value rows represent? (if > > there is only 1 keyRow, then it make sense, but when both keyRows and > > valueRows have more than 1, I failed to infer the possible use case) > > > > ``` > > > > // otherwise emit a value for each key > > for (RowData physicalKeyRow : physicalKeyRows) { > > emitRow((GenericRowData) physicalKeyRow, (GenericRowData) > > physicalValueRow); > > } > > > > > > I searched in the docs but failed to understand the design here. Any > hints > > would be appreciated~ Thanks~ > > > > > > > > > > > > > > > > [1] > > > > > https://github.com/apache/flink/blob/1fc26192cf794961af4f6933c155daf86eb8a0fe/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L224 > > > > > > Cheers, > > Affe > > >