
ASF GitHub Bot commented on FLINK-4280:

Github user rmetzger commented on a diff in the pull request:

    --- Diff: 
    @@ -444,6 +445,134 @@ public void run() {
    +   /**
    +    * This test ensures that when explicitly set to start from earliest 
record, the consumer
    +    * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
    +    */
    +   public void runStartFromEarliestOffsets() throws Exception {
    +           // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
    +           final int parallelism = 3;
    +           final int recordsInEachPartition = 50;
    +           final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +           env.setParallelism(parallelism);
    +           Properties readProps = new Properties();
    +           readProps.putAll(standardProps);
    +           readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
    +           // the committed offsets should be ignored
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +           readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
    +           kafkaOffsetHandler.close();
    +           deleteTestTopic(topicName);
    +   }
    +   /**
    +    * This test ensures that when explicitly set to start from latest 
record, the consumer
    +    * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
    +    */
    +   public void runStartFromLatestOffsets() throws Exception {
    +           // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
    +           final int parallelism = 3;
    +           final int recordsInEachPartition = 50;
    +           final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +           env.setParallelism(parallelism);
    +           final Properties readProps = new Properties();
    +           readProps.putAll(standardProps);
    +           readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
    +           // the committed offsets should be ignored
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +           Thread consumeThread = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
    +                           } catch (Exception e) {
    +                                   throw new RuntimeException(e);
    --- End diff --
    This exception will not fail the test.
    You need to define a Throwable field, set it in the thread and check it 
once the thread has finished.

> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>                 Key: FLINK-4280
>                 URL: https://issues.apache.org/jira/browse/FLINK-4280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.

This message was sent by Atlassian JIRA

Reply via email to