dongwoo6kim commented on code in PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1762233642
########## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java: ########## @@ -369,6 +370,38 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable { WatermarkStrategy.noWatermarks(), "testConsumingTopicWithEmptyPartitions")); } + + @Test + @Timeout(value = 10) + public void testConsumingTransactionalMessage() throws Throwable { + String transactionalTopic = "transactionalTopic-" + UUID.randomUUID(); + KafkaSourceTestEnv.createTestTopic( + transactionalTopic, KafkaSourceTestEnv.NUM_PARTITIONS, 1); + List<ProducerRecord<String, Integer>> records = + KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic); + // Prepare records for executeAndVerify method + records.removeIf(record -> record.partition() > record.value()); Review Comment: I've made two changes. 1. Replaced `records.removeIf(record -> record.partition() > record.value())` with `KafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic)`, as it serves a similar purpose. 2. Added a comment explaining how the data looks after the setup. > I'm assuming this will preserve one record per partition? After data modification, each partition p will contain records from p to NUM_RECORDS_PER_PARTITION (which is 10). For example, partition 1 has records 1 to 10, and partition 5 has records 5 to 10. > what would happen if we retain all records originally generated If we retain all records we need to make new assertion logic for the generated records. The main purpose of this data modification setup is to reuse `executeAndVerify` method. When you look at [here](https://github.com/apache/flink-connector-kafka/blob/7929b16dcfe648da30b6cc9755f63de2ed3d5319/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L505) `executeAndVerify` method expects the input data to be modified like this way. I intended to reuse existing test util functions and follow the test code convention but if you think it is causing unnecessary confusion I can change the test code to have custom assertion logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org