cmccabe commented on code in PR #12837:
URL: https://github.com/apache/kafka/pull/12837#discussion_r1022030312


##########
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##########
@@ -159,53 +218,30 @@ public KafkaClusterTestKit build() throws Exception {
                 executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
                     ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
                 for (ControllerNode node : nodes.controllerNodes().values()) {
-                    Map<String, String> props = new HashMap<>(configProps);
-                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
roles(node.id()));
-                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
-                        Integer.toString(node.id()));
-                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
-                        node.metadataDirectory());
-                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
-                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-                    props.put(KafkaConfig$.MODULE$.ListenersProp(), 
listeners(node.id()));
-                    
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
-                        nodes.interBrokerListenerName().value());
-                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
-                        "CONTROLLER");
-                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
-                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
-                    // for now as a placeholder.
-                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
uninitializedQuorumVotersString);
-
-                    // reduce log cleaner offset map memory usage
-                    
props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152");
-
                     setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
-                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
-
-                    String threadNamePrefix = String.format("controller%d_", 
node.id());
-                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId().toString(), node.id());
-                    TopicPartition metadataPartition = new 
TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
                     BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
                         fromVersion(nodes.bootstrapMetadataVersion(), 
"testkit");
-                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
-                        metaProperties, config, new MetadataRecordSerde(), 
metadataPartition, KafkaRaftServer.MetadataTopicId(),
-                        Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
-                    ControllerServer controller = new ControllerServer(
-                        nodes.controllerProperties(node.id()),
-                        config,
-                        raftManager,
-                        Time.SYSTEM,
-                        new Metrics(),
-                        new MockControllerMetrics(),
-                        Option.apply(threadNamePrefix),
-                        connectFutureManager.future,
-                        KafkaRaftServer.configSchema(),
-                        raftManager.apiVersions(),
-                        bootstrapMetadata,
-                        metadataFaultHandler,
-                        fatalFaultHandler
-                    );
+                    String threadNamePrefix = 
(nodes.brokerNodes().containsKey(node.id())) ?
+                            String.format("colocated%d", node.id()) :
+                            String.format("controller%d", node.id());
+                    JointServer jointServer = new 
JointServer(createNodeConfig(node),

Review Comment:
   It's not simple at all to use KafkaRaftServer in most of our tests. Let me 
give an example. If someone shuts down a broker by calling 
BrokerServer#shutdown, and the broker was a standalone broker, you have to 
somehow shut down the associated KafkaRaftServer, the associated snapshot 
generator, and the associated metadata loader. Maybe KafkaRaftServer is 
"already doing this" but BrokerServer doesn't have a reference to 
KafkaRaftServer. We also don't want to refactor all of our tests in terms of 
KafkaRaftServer rather than BrokerSever (that would be a huge undertaking)
   
   So I don't really see a way to use KafkaRaftServer here. Unless we wanted to 
turn it into basically what JointServer is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to