yashmayya commented on code in PR #13375:
URL: https://github.com/apache/kafka/pull/13375#discussion_r1144234357


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -152,11 +154,6 @@ public void stop() {
         connectCluster.forEach(this::stopWorker);
         try {
             kafkaCluster.stop();
-        } catch (UngracefulShutdownException e) {
-            log.warn("Kafka did not shutdown gracefully");

Review Comment:
   It's being handled here - 
https://github.com/yashmayya/kafka/blob/f0f4bb9a30ed3fcb4692f16185df0dce4a032efd/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L175-L179



##########
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##########
@@ -174,13 +174,17 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
                 props.put(KafkaConfig$.MODULE$.LogDirsProp(),
                         String.join(",", brokerNode.logDataDirectories()));
             }
-            props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
-                    "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-            props.put(KafkaConfig$.MODULE$.ListenersProp(), 
listeners(node.id()));
-            props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
-                    nodes.interBrokerListenerName().value());
-            props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
-                    "CONTROLLER");
+
+            // listeners could be defined via Builder::setConfigProp which 
shouldn't be overridden
+            if (!props.containsKey(KafkaConfig$.MODULE$.ListenersProp())) {

Review Comment:
   Hm that's a good point and I did ponder over this one as well. The 
alternative would be to parse the user defined listener here and make sure that 
either the security protocol map, inter broker listener, controller listener 
are all set appropriately or else do so ourselves - this would involve making 
sure that the protocols, SASL mechanisms etc. all match up properly which seems 
potentially error prone (I tried a setup with broker listeners and controller 
listeners using different security protocols or mechanisms and I wasn't able to 
get it to work - not sure whether something like that is even currently 
supported). I'm assuming that users would likely want to set custom listeners 
in case they want non-plaintext listeners at which point it isn't much extra 
work to also setup the controller listener accordingly (and there's multiple 
examples on how to do so in various integration tests). What do you think?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -195,7 +197,22 @@ public void testRestartFailedTask() throws Exception {
     public void testBrokerCoordinator() throws Exception {
         ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
         
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, 
String.valueOf(5000));
-        connect = connectBuilder.workerProps(workerProps).build();
+        Properties brokerProps = new Properties();
+
+        // Find a free port and use it in the Kafka broker's listeners config. 
We can't use port 0 in the listeners
+        // config to get a random free port because in this test we want to 
stop the Kafka broker and then bring it
+        // back up and listening on the same port in order to verify that the 
Connect cluster can re-connect to Kafka
+        // and continue functioning normally. If we were to use port 0 here, 
the Kafka broker would most likely listen
+        // on a different random free port the second time it is started.

Review Comment:
   Sorry, my bad for not clarifying at the outset. There's a couple of reasons 
- one is that the previous implementation seemed fairly messy where we were 
checking the bound ports after broker startup and then modifying the broker 
config and using the static port in the listener configuration on broker 
restarts only if the user hadn't already configured listener configs. So, for 
instance, this `startOnlyKafkaOnSamePorts` would not work as expected if the 
user defines a SASL or SSL listener with port 0. The other reason is that the 
`KafkaClusterTestKit` being leveraged here would need some refactors to allow 
changing broker configs after it has already been instantiated - currently, it 
only supports customising broker configs in its builder. All in all, it just 
seems a lot cleaner to move the responsibility of using a fixed port in the 
listeners configuration to the clients of the embedded Kafka cluster in case 
they want to test functionality involving offline brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to