Efrat19 commented on code in PR #237:
URL: 
https://github.com/apache/flink-connector-kafka/pull/237#discussion_r2971381124


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
##########
@@ -139,43 +137,8 @@ private void tryDelete(AdminClient adminClient, String 
topic) throws Exception {
     @Override
     public void createTestTopic(
             String topic, int numberOfPartitions, int replicationFactor, 
Properties properties) {
-        createNewTopic(topic, numberOfPartitions, replicationFactor, 
getStandardProperties());
-    }
-
-    public static void createNewTopic(
-            String topic, int numberOfPartitions, int replicationFactor, 
Properties properties) {
-        LOG.info("Creating topic {}", topic);
-        try (AdminClient adminClient = AdminClient.create(properties)) {
-            NewTopic topicObj = new NewTopic(topic, numberOfPartitions, 
(short) replicationFactor);
-            
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
-            CommonTestUtils.waitUtil(
-                    () -> {
-                        try {
-                            // Ensure all partitions have a leader elected and 
logs initialized
-                            Map<TopicPartition, OffsetSpec> offsetSpecs = new 
HashMap<>();
-                            for (int i = 0; i < numberOfPartitions; i++) {
-                                offsetSpecs.put(
-                                        new TopicPartition(topic, i), 
OffsetSpec.earliest());
-                            }
-                            adminClient
-                                    .listOffsets(offsetSpecs)
-                                    .all()
-                                    .get(REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-                            return true;
-                        } catch (Exception e) {
-                            LOG.warn(
-                                    "Partitions for topic {} not yet ready to 
serve requests",
-                                    topic,
-                                    e);
-                            return false;
-                        }
-                    },
-                    Duration.ofSeconds(30),
-                    String.format("New topic \"%s\" is not ready within 
timeout", topicObj));
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Create test topic : " + topic + " failed, " + 
e.getMessage());
-        }
+        createNewTopicAndWaitForPartitionAssignment(
+                topic, numberOfPartitions, replicationFactor, properties);

Review Comment:
   Good catch, thanks!
   Looking closely I think the change is for the good though
   This `createTestTopic` implementation takes in a `properties` argument but 
uses `getStandardProperties()` instead, which is a little counterintuitive
   ```
       @Override
       public void createTestTopic(
               String topic, int numberOfPartitions, int replicationFactor, 
Properties properties) {
           createNewTopicAndWaitForPartitionAssignment(
                   topic, numberOfPartitions, replicationFactor, 
getStandardProperties());
   ```
   I assume it was meant to satisfy the contract.
   To fix this I think we can change the abstract call in 
`KafkaTestEnvironment` to default to standardProperties instead of the empty 
ones.
   This will cover all usages except 
   `DynamicKafkaSourceExternalContext.setupSplits` which originally sent 
properties - imo the correct behavior would be to use them, I added a note 
about it in the commit but if you have concerns I can create a separate PR.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to