[ https://issues.apache.org/jira/browse/FLINK-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14660438#comment-14660438 ]
ASF GitHub Bot commented on FLINK-2386: --------------------------------------- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/996 [WIP][FLINK-2386] Add new Kafka Consumers I'm opening a WIP pull request (against our rules) to get some feedback on my ongoing work. Please note that I'm on vacation next week (until August 17) **Why this rework?** The current `PersistentKafkaSource` does not always provide exactly-once processing guarantees because we are using the high level Consumer API of Kafka. We've chosen to use that API because it is handling all the corner cases such as leader election, leader failover and other low level stuff. The problem is that the API does not allow us to - commit offsets manually - consistently (across restarts) assign partitions to Flink instances The Kafka community is aware of these issues and actively working on a new Consumer API. See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design and https://issues.apache.org/jira/browse/KAFKA-1326 The release of Kafka 0.8.3 is scheduled for October 2015 (see plan: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan) Therefore, I decided for the following approach: Copy the code of the unreleased, new Kafka Consumer into the Flink consumer and use it. The new API has all the bells and whistles we need (manual committing, per-partition subscriptions, nice APIs), but it is not completely backwards compatible. We can retrieve topic metadata with the new API from Kafka 0.8.1, 0.8.2 (and of course 0.8.3) We can retrieve data from Kafka 0.8.2 (and 0.8.3) We can only commit to Kafka 0.8.3 Therefore, this pull request contains three different user facing classes `FlinkKafkaConsumer081`, `FlinkKafkaConsumer082` and `FlinkKafkaConsumer083` for the different possible combinations. For 0.8.1 we are using a hand-crafted implementation against the simple consumer API (https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) so we had to do what we originally wanted to avoid. I tried to make that implementation as robust and efficient as possible. I'm intentionally not handling any broker failures in the code. For these cases, I'm relying on Flink's fault tolerance mechanisms (which effectively means redeploying the Kafka sources against other online brokers) For reviewing the pull request, there are only a few important classes to look at: - FlinkKafkaConsumerBase - IncludedFetcher - LegacyFetcher (the one implementing the SimpleConsumer API) I fixed a little bug in the stream graph generator. It was ignoring the "number of execution retries" when no checkpointing is enabled. Known issues: - this pull request contains at least one failing test - the KafkaConsumer contains at least one known, yet untested bug - missing documentation I will also open a pull request for using the new Producer API. It provides much better performance and usability. Open questions: - Do we really want to copy 20k+ lines of code into our code base (for now)? If there are concerns about this, I could also manually implement the missing pieces. Its probably 100 lines of code for getting the partition infos for a topic, and we would use the Simple Consumer also for reading from 0.8.2. - Do we want to use the packaging I'm suggesting here (additional maven module for `flink-connector-kafka-083`). We would need to introduce it anyways when Kafka releases 0.8.3 because the dependencies are not compatible. But its adding confusion for our users. I will write more documentation for guidance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink2386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/996.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #996 ---- commit 177e0bbc6bc613b67111ba038e0ded4fae8474f1 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-20T19:39:46Z wip commit 70cb8f1ecb7df7a98313796609d2fa0dbade86bf Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-21T15:21:45Z [FLINK-2386] Add initial code for the new kafka connector, with everything unreleased copied from the kafka sources commit a4a2847908a8c2f118b8667d7cb66693c065c38d Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-21T17:58:13Z wip commit b02cde37c2120ff6f0fcf1c233391a1d8804e594 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-22T15:29:58Z wip commit 54a05c39d150b016e0a089daedb3492d986b93bd Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-22T19:56:41Z wip commit 393fd6766a5df4bf14ef0c13864b8a4abdb62bb4 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-22T20:20:20Z we are good for a test drive commit 3d66332e61665df9bafa05d2644b4fe1032da694 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T09:55:02Z wip commit b3e0c82c098d1aa27e418adb552f1b218c0f9550 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T11:49:05Z fixed deserialization commit 409eb8091fce6caa3d3c9cc1ffb0dc42c3f5e130 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T12:29:24Z this one test seems to pass commit d1bcac9886521a0b8b05b9c4b7c37a4667c7dcce Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T12:35:22Z rat check commit abd0f5b57610d0d8dcc3d530c816afc616cbf30b Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T12:45:13Z successful build commit ec28a40145f987450ab8938d7f1b6443be53d3b6 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T14:02:40Z added a lot of debuggign stuff commit 4df31ba2f0c53228f1a26307aa221ad4b9d5db68 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T14:17:21Z another fix commit 7d5f283a73b76ec2e81339dbdd619fe01988aaf8 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-23T16:26:55Z support for partitions < instances commit 1c9ee9d09c3584ff168ce12a8438542032895953 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-24T15:21:01Z Improve error handling, fix offset handling commit 9dad95e40cdc9acd6bc64ea5c7d54296320381ea Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-24T16:24:29Z wip commit 12d24cb2c3d6a5b16bbde7619f0789e05118d5ac Author: Robert Metzger <rmetz...@apache.org> Date: 2015-07-30T14:09:08Z wip commit 9fcd8b5af8c5e6e018d8c020ba0b9f9295c6da2b Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-04T10:17:19Z wip commit bc18a0242319954f82158c059b94898bbea6a33c Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-05T16:13:30Z TODO: it seems that sinks are not participating in snapshots commit ba7546aebae9d6067f915e8b55290148ccd4a4f5 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-06T09:39:34Z wip commit 70cf8e4f0e6c4355040ee50472c252abca275799 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-06T12:30:08Z Tests should run commit 19adb8668c10eb7db6c99273ab8bc4bfcdc11da8 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-06T13:10:22Z restore old behaivor commit bd5595b213515dad30cfc5b2e9f3e1c6cfbd64b2 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-06T15:01:20Z Looks like the tests are working commit 8dde02b17bd32fbc53cfdb67c5a43fa88b2e882c Author: Robert Metzger <rmetz...@apache.org> Date: 2015-08-06T15:32:16Z lets see whether the tests are working ---- > Implement Kafka connector using the new Kafka Consumer API > ---------------------------------------------------------- > > Key: FLINK-2386 > URL: https://issues.apache.org/jira/browse/FLINK-2386 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Robert Metzger > Assignee: Robert Metzger > > Once Kafka has released its new consumer API, we should provide a connector > for that version. > The release will probably be called 0.9 or 0.8.3. > The connector will be mostly compatible with Kafka 0.8.2.x, except for > committing offsets to the broker (the new connector expects a coordinator to > be available on Kafka). To work around that, we can provide a configuration > option to commit offsets to zookeeper (managed by flink code). > For 0.9/0.8.3 it will be fully compatible. > It will not be compatible with 0.8.1 because of mismatching Kafka messages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)