becketqin opened a new pull request #13574:
URL: https://github.com/apache/flink/pull/13574


   ## What is the purpose of the change
   This patch adds an implementation of Kafka source based on FLIP-27.
   
   A few clarifications:
   1. Package paths. The patch reuses the current module of 
`flink-connectors/flink-connector-kafka`, but with a new package path 
`org.apache.flink.connector.kafka.source`.
   2. The serialization schema. This patch currently creates a new 
`KafkaRecordDeserializer` class instead of reusing the 
`org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema`. This 
is to make future deprecation of old KafkaDeserializationSchema easier. We will 
migrate the existing implementation of `KafkaDeserializationSchema` in a 
separate patch. Also, ideally we should unify the `KafkaRecordDeserializer` 
with the interface of `DeserializationSchema`. At this point 
`DeserializationSchema` assumes each serialized record is represented by a 
single byte array, so it is not suitable for the Kafka case where a serialized 
record is a `ConsumerRecord<byte[], byte[]>`.
   3. The patch did not add some necessary API in the SplitEnumerator and 
SplitReader, namely the following: 1) a `close()` method in `SplitReader`; 2) 
the `onCheckpointComplete()` callback in the `SplitEnumerator` and 
`SourceReader`. These API changes will be done in a separate patch.
   4. This patch still needs more integration tests. I'll add more tests either 
while the PR is in review or in followup patches.
   5. The docs will be added later in a separate commit in this PR once the 
code is reviewed.
   
   ## Brief change log
   80ff3b43afa00f011 [FLINK-18323][connector/kafka] Add Kafka Source based on 
FLIP-27.
   
   
   ## Verifying this change
   The changes in this patch are mostly in 
`flink-connectors/flink-connector-kafka/` module. Unit tests and basic IT cases 
are added under `org.apache.flink.connector.kafka.source` package in the test 
directory.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (No)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
     - The serializers: (yes)
     - The runtime per-record code paths (performance sensitive): (yes)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to