lindong28 opened a new pull request #15161: URL: https://github.com/apache/flink/pull/15161
## What is the purpose of the change This PR fixed a few bugs related to `KafkaSource`. See "Brief change log" for the list of bug fixes made in this PR. ## Brief change log Bug fixes: - `KafkaSourceReader` should not commit offsets for partitions whose offsets have not been initialized. - `SourceCoordinatorContext` should not log and fail job again if it receives InterruptedException after it is closed. - `SourceOperatorStreamTask` should be closed synchronously to avoid ClassNotFoundException. - `PartitionOffsetsRetrieverImpl.committedOffsets()` should handle the case without committed offsets. - `SourceOperatorStreamTask` should check the committed offset first before using OffsetResetStrategy. - Auto offset commit should be disabled by default. - `SourceCoordinatorContext` should fail job if it fails to send event to subtasks. Usability improvements: - Reduce the offset commit logging verbosity from INFO to DEBUG. - `SourceOperatorStreamTask` should report numRecordsOutCount. - `KafkaSourceEnumerator` should close the admin client early if periodic partition discovery is disabled. - Remove the unused `close.timeout.ms` config. Tests: - Add IT cases for KafkaSource by migrating IT cases from FlinkKafkaConsumer. ## Verifying this change - Added tests in `KafkaSourceITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org