becketqin opened a new pull request #13574: URL: https://github.com/apache/flink/pull/13574
## What is the purpose of the change This patch adds an implementation of Kafka source based on FLIP-27. A few clarifications: 1. Package paths. The patch reuses the current module of `flink-connectors/flink-connector-kafka`, but with a new package path `org.apache.flink.connector.kafka.source`. 2. The serialization schema. This patch currently creates a new `KafkaRecordDeserializer` class instead of reusing the `org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema`. This is to make future deprecation of old KafkaDeserializationSchema easier. We will migrate the existing implementation of `KafkaDeserializationSchema` in a separate patch. Also, ideally we should unify the `KafkaRecordDeserializer` with the interface of `DeserializationSchema`. At this point `DeserializationSchema` assumes each serialized record is represented by a single byte array, so it is not suitable for the Kafka case where a serialized record is a `ConsumerRecord<byte[], byte[]>`. 3. The patch did not add some necessary API in the SplitEnumerator and SplitReader, namely the following: 1) a `close()` method in `SplitReader`; 2) the `onCheckpointComplete()` callback in the `SplitEnumerator` and `SourceReader`. These API changes will be done in a separate patch. 4. This patch still needs more integration tests. I'll add more tests either while the PR is in review or in followup patches. 5. The docs will be added later in a separate commit in this PR once the code is reviewed. ## Brief change log 80ff3b43afa00f011 [FLINK-18323][connector/kafka] Add Kafka Source based on FLIP-27. ## Verifying this change The changes in this patch are mostly in `flink-connectors/flink-connector-kafka/` module. Unit tests and basic IT cases are added under `org.apache.flink.connector.kafka.source` package in the test directory. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (No) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org