becketqin commented on pull request #14239: URL: https://github.com/apache/flink/pull/14239#issuecomment-735924235
The patch migrates all the existing IT cases from `KafkaConsumerTestBase` for the new KafkaSource, except the `runEndOfStreamTest()` which is no longer applicable. In order to migrate the IT case, the following two commits has to be cherry-picked from FLINK-20379. - Add a `GenericDeserializationSchema` that suits the new Source API. The `GenericDeserializationSchema` is intended to take care of the deserialization from an arbitrary record type, which is usually a `byte[]` with additional information. - Let the KafkaRecordDeserializer implement `GenericDeserializationSchema`. Add bridge methods to the existing `KafkaDeserializationSchema` implementations. Other than that, the following two commits are made: 03173a7 fixes a bug where NPE was thrown when the committed offset does not exist for a partition. e1fe5fd adjusts the behavior of `SpecifiedOffsetsInitializer` to keep it the same as the legacy `FlinkKafkaConsumer`. df413b4 Disable offsets auto commit by default. I have verified the patch with a modified `StateMachineExample` with the following steps: 1. Start a Kafka cluster and create a topic of 32 partitions. 2. Start a Flink standalone cluster with 3 TMs. 3. Submit the `KafkaEventsGeneratorJob` to generate records. 4. Submit the `StateMachineExample` to process the records. - The jobs runs without exception. - Checkpoint works fine. - The offsets are committed back to Flink correctly. - The WebUI shows the metrics correctly. 5. Kill -9 the TM that runs the `StateMachineExample`. - The Task fails over correctly and resumes consumption from the last checkpointed offset. 6. Cancel the `StateMachineExample` with a savepoint. - The job stops without an exception. 7. Resume the `StateMachineExample` from the savepoint. - The job resumes correctly from the offsets when it was canceled. - NOTE: The following exception may be seen from the TM log. It is a know issue from Kafka because the same client-id was registered twice to the JMX. It is a known issue to Kafka, but I am not familiar with the mbeanserver enough to tell if Flink should expect this. ``` javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=myGroup-0 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_172] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_172] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_172] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_172] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_172] at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_172] at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e 692044a2daf:1.12-SNAPSHOT] at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e6920 44a2daf:1.12-SNAPSHOT] at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e6920 44a2daf:1.12-SNAPSHOT] at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e6920 44a2daf:1.12-SNAPSHOT] at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:84) ~[blob_p-7527eaac8696e2e6d749b6e9303862 2aad2a03ee-2deed2871e614394bdf6e692044a2daf:1.12-SNAPSHOT] at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$0(KafkaSource.java:121) ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed287 1e614394bdf6e692044a2daf:1.12-SNAPSHOT] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:129) ~[flink-table-blink_2.11-1.12-S NAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:59) ~[flink-table-blink_2.11-1. 12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:218) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSH OT] at org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:220) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org