Hi Jun, Thanks for opening this discussion.
From my point of view, I don't know whether an object-reuse kafka deserialization schema is widely used among users. If a user would like to implement a customized object-reuse KafkaRecordDeserializationSchema like [1] does, we cannot deserialize the records and put them into an element queue. However, I think most users would not implement an object-reuse kafka deserialization schema, and the current solution in FLINK-25132 [2] introduces obvious performance regression for most kafka deseralization schemas. On the other hand, I don't think another parameter would behave well in this case. Not to mention that introducing another parameter means more complexity, users could still make mistakes with this new parameter. To be honest, I don't have a better idea here currently, maybe we need to refactor the interfaces of KafkaDeserializationSchema to avoid such object-reuse bad cases. [1] https://github.com/apache/flink/blob/c21979833f50c48b498ba5d6812ea52d12e9a7f7/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L424-L459 [2] https://issues.apache.org/jira/browse/FLINK-25132 Best Yun Tang ________________________________ From: 何军 <xuehaijux...@gmail.com> Sent: Wednesday, April 19, 2023 11:30 To: dev@flink.apache.org <dev@flink.apache.org> Subject: [discuss] FLINK-25132: KafkaSource cannot work with object-reusing DeserializationSchema result in a performance regression Hi everyone, I would like to start a discussion on FLINK-25132: KafkaSource cannot work with object-reusing DeserializationSchema. The implementation of FLINK-25132 moves the deserialization of kafka records from SplitFetcher thread to thread that runs KafkaRecordEmitter, namely flink's main task thread. In our experience, even without triggering this object-reusing bug in KafkaDeserializerSchema , it still caused a performance regression of 20% to 30%. I tend to add a parameter to flink conf to indicate where the deserialization process needs to be placed, that is, in the fetcher thread or the main thread, since not all deserializer will encounter the mentioned object-reusing bug. Looking forward to your reply. Best, Jun He