Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89363213
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -444,6 +445,134 @@ public void run() {
                kafkaOffsetHandler.close();
                deleteTestTopic(topicName);
        }
    +
    +   /**
    +    * This test ensures that when explicitly set to start from earliest 
record, the consumer
    +    * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
    +    */
    +   public void runStartFromEarliestOffsets() throws Exception {
    +           // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
    +           final int parallelism = 3;
    +           final int recordsInEachPartition = 50;
    +
    +           final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +           env.setParallelism(parallelism);
    +
    +           Properties readProps = new Properties();
    +           readProps.putAll(standardProps);
    +           readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
    +
    +           // the committed offsets should be ignored
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +           readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
    +
    +           kafkaOffsetHandler.close();
    +           deleteTestTopic(topicName);
    +   }
    +
    +   /**
    +    * This test ensures that when explicitly set to start from latest 
record, the consumer
    +    * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
    +    */
    +   public void runStartFromLatestOffsets() throws Exception {
    +           // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
    +           final int parallelism = 3;
    +           final int recordsInEachPartition = 50;
    +
    +           final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +           env.setParallelism(parallelism);
    +
    +           final Properties readProps = new Properties();
    +           readProps.putAll(standardProps);
    +           readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
    +
    +           // the committed offsets should be ignored
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
    +           kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
    +
    +           Thread consumeThread = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
    +                           } catch (Exception e) {
    +                                   throw new RuntimeException(e);
    --- End diff --
    
    Ah, right! Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to