Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128754182 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } - if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { --- End diff -- The concern here is that not making the methods final makes it easy for contributors to accidentally override them. We don't have specific unit tests for the 0.9 `FlinkKafkaConsumer` or the 0.10 `FlinkKafkaConsumer` and only test the base `FlinkKafkaConsumerBase`. This is OK, as long as specific implementations don't override important methods. If the `FlinkKafkaConsumer090` did override the `snapshot()`/`restore()` methods, for example, no tests would catch this. @tzulitai I don't want to discuss here about these methods to much since we want to get the fixes in for release 1.3.2. A way around the problem is to turn the `FlinkKafkaConsumerBaseTest` into an abstract `FlinkKafkaConsumerBaseTestBase` that has an abstract method `createTestingConsumer(List<KafkaTopicPartition> mockFetchedPartitions)` that creates a "dummy" consumer for a specific Kafka version. Then we would have individual `FlinkKafkaConsumer09Test`, `FlinkKafkaConsumer010Test` and so on that derive form the abstract test base and just implement the method for creating the testing consumer. What do you think?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---