dongwoo6kim commented on code in PR #100:
URL: 
https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1762233642


##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java:
##########
@@ -369,6 +370,38 @@ public void testConsumingTopicWithEmptyPartitions() throws 
Throwable {
                             WatermarkStrategy.noWatermarks(),
                             "testConsumingTopicWithEmptyPartitions"));
         }
+
+        @Test
+        @Timeout(value = 10)
+        public void testConsumingTransactionalMessage() throws Throwable {
+            String transactionalTopic = "transactionalTopic-" + 
UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(
+                    transactionalTopic, KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic);
+            // Prepare records for executeAndVerify method
+            records.removeIf(record -> record.partition() > record.value());

Review Comment:
   I've made two changes. 
   1. Replaced `records.removeIf(record -> record.partition() > 
record.value())` with 
`KafkaSourceTestEnv.setupEarliestOffsets(transactionalTopic)`, as it serves a 
similar purpose.
   2. Added a comment explaining how the data looks after the setup.
   
   > I'm assuming this will preserve one record per partition?
   
   After data modification, each partition p will contain records from p to 
NUM_RECORDS_PER_PARTITION (which is 10). For example, partition 1 has records 1 
to 10, and partition 5 has records 5 to 10.
    
   > what would happen if we retain all records originally generated
   
   If we retain all records we need to make new assertion logic for the 
generated records.
   The main purpose of this data modification setup is to reuse 
`executeAndVerify` method. 
   When you look at 
[here](https://github.com/apache/flink-connector-kafka/blob/7929b16dcfe648da30b6cc9755f63de2ed3d5319/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L505)
 `executeAndVerify` method expects the input data to be modified like this way. 
   
   
   I intended to reuse existing test util functions and follow the test code 
convention but if you think it is causing unnecessary confusion I can change 
the test code to have custom assertion logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to