dongwoo6kim commented on code in PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#discussion_r1759153861
########## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java: ########## @@ -369,6 +370,38 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable { WatermarkStrategy.noWatermarks(), "testConsumingTopicWithEmptyPartitions")); } + + @Test + @Timeout(value = 10) Review Comment: I used the Timeout to prevent infinite blocking and fail the test if it enters an infinite loop. Since I'm not familiar with the project's global timeout — could you elaborate more about it? Where is it configured? If global timeout is set then it should be enough but when I tested this method on the current main branch without the Timeout, it didn't fail but just hung. ########## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java: ########## @@ -369,6 +370,38 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable { WatermarkStrategy.noWatermarks(), "testConsumingTopicWithEmptyPartitions")); } + + @Test + @Timeout(value = 10) + public void testConsumingTransactionalMessage() throws Throwable { + String transactionalTopic = "transactionalTopic-" + UUID.randomUUID(); + KafkaSourceTestEnv.createTestTopic( + transactionalTopic, KafkaSourceTestEnv.NUM_PARTITIONS, 1); + List<ProducerRecord<String, Integer>> records = + KafkaSourceTestEnv.getRecordsForTopic(transactionalTopic); + // Prepare records for executeAndVerify method + records.removeIf(record -> record.partition() > record.value()); Review Comment: This is preprocessing step before using [executeAndVerify](https://github.com/apache/flink-connector-kafka/blob/7929b16dcfe648da30b6cc9755f63de2ed3d5319/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L482) method to verify test result. This method expects records from `partition P` should be an integer sequence from `P` to `NUM_RECORDS_PER_PARTITION`. So I deleted records where the value is less than the partition number. Similar approach to [here](https://github.com/apache/flink-connector-kafka/blob/7929b16dcfe648da30b6cc9755f63de2ed3d5319/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L348) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org