lindong28 opened a new pull request #15161:
URL: https://github.com/apache/flink/pull/15161


   ## What is the purpose of the change
   
   This PR fixed a few bugs related to `KafkaSource`. See "Brief change log" 
for the list of bug fixes made in this PR.
   
   ## Brief change log
   
   Bug fixes:
   - `KafkaSourceReader` should not commit offsets for partitions whose offsets 
have not been initialized.
   - `SourceCoordinatorContext` should not log and fail job again if it 
receives InterruptedException after it is closed.
   - `SourceOperatorStreamTask` should be closed synchronously to avoid 
ClassNotFoundException.
   - `PartitionOffsetsRetrieverImpl.committedOffsets()` should handle the case 
without committed offsets.
   - `SourceOperatorStreamTask` should check the committed offset first before 
using OffsetResetStrategy.
   - Auto offset commit should be disabled by default.
   - `SourceCoordinatorContext` should fail job if it fails to send event to 
subtasks.
   
   Usability improvements:
   - Reduce the offset commit logging verbosity from INFO to DEBUG.
   - `SourceOperatorStreamTask` should report numRecordsOutCount.
   - `KafkaSourceEnumerator` should close the admin client early if periodic 
partition discovery is disabled.
   - Remove the unused `close.timeout.ms` config.
   
   Tests:
   - Add IT cases for KafkaSource by migrating IT cases from FlinkKafkaConsumer.
   
   ## Verifying this change
   
   - Added tests in `KafkaSourceITCase`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to