showuon commented on code in PR #17524: URL: https://github.com/apache/kafka/pull/17524#discussion_r1850021684
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3611,12 +3611,17 @@ class KafkaApis(val requestChannel: RequestChannel, clusterId, () => { val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection() - metadataCache.getAliveBrokerNodes(request.context.listenerName).foreach { node => + val describeClusterRequest = request.body[DescribeClusterRequest] + metadataCache.getBrokerNodes(request.context.listenerName).foreach { node => + if (node.isFenced && !describeClusterRequest.data().includeFencedBrokers()) { + } else { Review Comment: What do we want to do in the if block? log something? Or remove if / else? ########## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ########## @@ -354,6 +354,10 @@ class ZkMetadataCache( metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName)) } + override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = { + getAliveBrokerNodes(listenerName) + } + Review Comment: Since we're targeting v4.0.0, this can be removed. ########## tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java: ########## @@ -69,6 +69,28 @@ public void testUnregister(ClusterInstance clusterInstance) { } } + @ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT}) + public void testListEndpointsWithBootstrapServer(ClusterInstance clusterInstance) { + String output = ToolsTestUtils.captureStandardOut(() -> + assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers()))); + String port = clusterInstance.bootstrapServers().split(":")[1]; + int id = clusterInstance.brokerIds().iterator().next(); + String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s"; + String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker"); + assertTrue(output.equals(expected)); + } + + @ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT}) + public void testListEndpointsArgumentWithBootstrapServer(ClusterInstance clusterInstance) { Review Comment: Could we add a test to make sure the "fenced" broker state can be correctly returned and output? ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -3194,6 +3194,25 @@ public void testListGroupsWithTypesOlderBrokerVersion() throws Exception { } } + @Test + public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers() { + ApiVersion describeClusterV1 = new ApiVersion() + .setApiKey(ApiKeys.DESCRIBE_CLUSTER.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(describeClusterV1))); + + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof DescribeClusterRequest); + + final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)); + TestUtils.assertFutureThrows(result.nodes(), + UnsupportedVersionException.class, "Including fenced broker endpoints is not supported with version 1"); + Review Comment: nit: additional empty line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org