cmccabe commented on code in PR #12837: URL: https://github.com/apache/kafka/pull/12837#discussion_r1022030312
########## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ########## @@ -159,53 +218,30 @@ public KafkaClusterTestKit build() throws Exception { executorService = Executors.newFixedThreadPool(numOfExecutorThreads, ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { - Map<String, String> props = new HashMap<>(configProps); - props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); - props.put(KafkaConfig$.MODULE$.NodeIdProp(), - Integer.toString(node.id())); - props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), - node.metadataDirectory()); - props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), - "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); - props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), - nodes.interBrokerListenerName().value()); - props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), - "CONTROLLER"); - // Note: we can't accurately set controller.quorum.voters yet, since we don't - // yet know what ports each controller will pick. Set it to a dummy string \ - // for now as a placeholder. - props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString); - - // reduce log cleaner offset map memory usage - props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152"); - setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); - KafkaConfig config = new KafkaConfig(props, false, Option.empty()); - - String threadNamePrefix = String.format("controller%d_", node.id()); - MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id()); - TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0); BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(nodes.bootstrapMetadataVersion(), "testkit"); - KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>( - metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(), - Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future); - ControllerServer controller = new ControllerServer( - nodes.controllerProperties(node.id()), - config, - raftManager, - Time.SYSTEM, - new Metrics(), - new MockControllerMetrics(), - Option.apply(threadNamePrefix), - connectFutureManager.future, - KafkaRaftServer.configSchema(), - raftManager.apiVersions(), - bootstrapMetadata, - metadataFaultHandler, - fatalFaultHandler - ); + String threadNamePrefix = (nodes.brokerNodes().containsKey(node.id())) ? + String.format("colocated%d", node.id()) : + String.format("controller%d", node.id()); + JointServer jointServer = new JointServer(createNodeConfig(node), Review Comment: It's not simple at all to use KafkaRaftServer in most of our tests. Let me give an example. If someone shuts down a broker by calling BrokerServer#shutdown, and the broker was a standalone broker, you have to somehow shut down the associated KafkaRaftServer, the associated snapshot generator, and the associated metadata loader. Maybe KafkaRaftServer is "already doing this" but BrokerServer doesn't have a reference to KafkaRaftServer. We also don't want to refactor all of our tests in terms of KafkaRaftServer rather than BrokerSever (that would be a huge undertaking) So I don't really see a way to use KafkaRaftServer here. Unless we wanted to turn it into basically what JointServer is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org