Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89680317 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); --- End diff -- Actually, the sleep here isn't waiting for the readSequence call to finish. I'm waiting a bit to make sure that the consume job has fully started. It won't be able to read anything until new latest data is generated afterwards, which is done below by `DataGenerators.generateRandomizedIntegerSequence`. So, what the test is doing is: 1. Write 50 records to each partition. 2. Commit some random offsets. 3. Start a job to read from latest in a separate thread. (should not read any of the previous data, offsets also ignored). The `readSequence` is expected to read 30 more records from each partition 4. Make sure the job has started by waiting 5 seconds. 5. Generate 30 records to each partition. 6. The consume job should return from `readSequence` before the test expires. Is there a better way to do step 4. instead of sleeping?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---