[ 
https://issues.apache.org/jira/browse/FLINK-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14660438#comment-14660438
 ] 

ASF GitHub Bot commented on FLINK-2386:
---------------------------------------

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/996

    [WIP][FLINK-2386] Add new Kafka Consumers

    I'm opening a WIP pull request (against our rules) to get some feedback on 
my ongoing work.
    Please note that I'm on vacation next week (until August 17)
    
    **Why this rework?**
    
    The current `PersistentKafkaSource` does not always provide exactly-once 
processing guarantees because we are using the high level Consumer API of Kafka.
    We've chosen to use that API because it is handling all the corner cases 
such as leader election, leader failover and other low level stuff.
    The problem is that the API does not allow us to
    - commit offsets manually
    - consistently (across restarts) assign partitions to Flink instances  
    
    The Kafka community is aware of these issues and actively working on a new 
Consumer API. See 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
 and https://issues.apache.org/jira/browse/KAFKA-1326
    The release of Kafka 0.8.3 is scheduled for October 2015 (see plan: 
https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan)
    
    Therefore, I decided for the following approach:
    Copy the code of the unreleased, new Kafka Consumer into the Flink consumer 
and use it.
    The new API has all the bells and whistles we need (manual committing, 
per-partition subscriptions, nice APIs), but it is not completely backwards 
compatible.
    
    We can retrieve topic metadata with the new API from Kafka 0.8.1, 0.8.2 
(and of course 0.8.3)
    We can retrieve data from Kafka 0.8.2 (and 0.8.3)
    We can only commit to Kafka 0.8.3
    
    Therefore, this pull request contains three different user facing classes 
`FlinkKafkaConsumer081`,  `FlinkKafkaConsumer082` and `FlinkKafkaConsumer083` 
for the different possible combinations.
    For 0.8.1 we are using a hand-crafted implementation against the simple 
consumer API 
(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
 so we had to do what we originally wanted to avoid.
    I tried to make that implementation as robust and efficient as possible. 
I'm intentionally not handling any broker failures in the code. For these 
cases, I'm relying on Flink's fault tolerance mechanisms (which effectively 
means redeploying the Kafka sources against other online brokers)
    
    For reviewing the pull request, there are only a few important classes to 
look at:
    - FlinkKafkaConsumerBase
    - IncludedFetcher
    - LegacyFetcher (the one implementing the SimpleConsumer API)
    I fixed a little bug in the stream graph generator. It was ignoring the 
"number of execution retries" when no checkpointing is enabled. 
    
    
    Known issues:
    - this pull request contains at least one failing test
    - the KafkaConsumer contains at least one known, yet untested bug
    - missing documentation
    
    I will also open a pull request for using the new Producer API. It provides 
much better performance and usability.
    
    Open questions:
    - Do we really want to copy 20k+ lines of code into our code base (for 
now)? 
    If there are concerns about this, I could also manually implement the 
missing pieces. Its probably 100 lines of code for getting the partition infos 
for a topic, and we would use the Simple Consumer also for reading from 0.8.2.
    
    - Do we want to use the packaging I'm suggesting here (additional maven 
module for `flink-connector-kafka-083`). We would need to introduce it anyways 
when Kafka releases 0.8.3 because the dependencies are not compatible.
    But its adding confusion for our users.
    I will write more documentation for guidance.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink2386

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/996.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #996
    
----
commit 177e0bbc6bc613b67111ba038e0ded4fae8474f1
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-20T19:39:46Z

    wip

commit 70cb8f1ecb7df7a98313796609d2fa0dbade86bf
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-21T15:21:45Z

    [FLINK-2386] Add initial code for the new kafka connector, with everything 
unreleased copied from the kafka sources

commit a4a2847908a8c2f118b8667d7cb66693c065c38d
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-21T17:58:13Z

    wip

commit b02cde37c2120ff6f0fcf1c233391a1d8804e594
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-22T15:29:58Z

    wip

commit 54a05c39d150b016e0a089daedb3492d986b93bd
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-22T19:56:41Z

    wip

commit 393fd6766a5df4bf14ef0c13864b8a4abdb62bb4
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-22T20:20:20Z

    we are good for a test drive

commit 3d66332e61665df9bafa05d2644b4fe1032da694
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T09:55:02Z

    wip

commit b3e0c82c098d1aa27e418adb552f1b218c0f9550
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T11:49:05Z

    fixed deserialization

commit 409eb8091fce6caa3d3c9cc1ffb0dc42c3f5e130
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T12:29:24Z

    this one test seems to pass

commit d1bcac9886521a0b8b05b9c4b7c37a4667c7dcce
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T12:35:22Z

    rat check

commit abd0f5b57610d0d8dcc3d530c816afc616cbf30b
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T12:45:13Z

    successful build

commit ec28a40145f987450ab8938d7f1b6443be53d3b6
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T14:02:40Z

    added a lot of debuggign stuff

commit 4df31ba2f0c53228f1a26307aa221ad4b9d5db68
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T14:17:21Z

    another fix

commit 7d5f283a73b76ec2e81339dbdd619fe01988aaf8
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-23T16:26:55Z

    support for partitions < instances

commit 1c9ee9d09c3584ff168ce12a8438542032895953
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-24T15:21:01Z

    Improve error handling, fix offset handling

commit 9dad95e40cdc9acd6bc64ea5c7d54296320381ea
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-24T16:24:29Z

    wip

commit 12d24cb2c3d6a5b16bbde7619f0789e05118d5ac
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-07-30T14:09:08Z

    wip

commit 9fcd8b5af8c5e6e018d8c020ba0b9f9295c6da2b
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-04T10:17:19Z

    wip

commit bc18a0242319954f82158c059b94898bbea6a33c
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-05T16:13:30Z

    TODO: it seems that sinks are not participating in snapshots

commit ba7546aebae9d6067f915e8b55290148ccd4a4f5
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-06T09:39:34Z

    wip

commit 70cf8e4f0e6c4355040ee50472c252abca275799
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-06T12:30:08Z

    Tests should run

commit 19adb8668c10eb7db6c99273ab8bc4bfcdc11da8
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-06T13:10:22Z

    restore old behaivor

commit bd5595b213515dad30cfc5b2e9f3e1c6cfbd64b2
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-06T15:01:20Z

    Looks like the tests are working

commit 8dde02b17bd32fbc53cfdb67c5a43fa88b2e882c
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-08-06T15:32:16Z

    lets see whether the tests are working

----


> Implement Kafka connector using the new Kafka Consumer API
> ----------------------------------------------------------
>
>                 Key: FLINK-2386
>                 URL: https://issues.apache.org/jira/browse/FLINK-2386
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> Once Kafka has released its new consumer API, we should provide a connector 
> for that version.
> The release will probably be called 0.9 or 0.8.3.
> The connector will be mostly compatible with Kafka 0.8.2.x, except for 
> committing offsets to the broker (the new connector expects a coordinator to 
> be available on Kafka). To work around that, we can provide a configuration 
> option to commit offsets to zookeeper (managed by flink code).
> For 0.9/0.8.3 it will be fully compatible.
> It will not be compatible with 0.8.1 because of mismatching Kafka messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to