chia7712 commented on code in PR #21230:
URL: https://github.com/apache/kafka/pull/21230#discussion_r2666320031
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -1192,6 +1213,487 @@ public void
testCreateClusterAndCreateTopicWithRemoteLogManagerInstantiation() t
}
}
+ @Test
+ public void testCreateClusterAndRestartControllerNode() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().stream()
+ .filter(c -> c.controller().isActive())
+ .findFirst()
+ .get();
+ var port = controller.socketServer().boundPort(
+
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+ // shutdown active controller
+ controller.shutdown();
+ // Rewrite The `listeners` config to avoid controller socket
server init using different port
+ var config = controller.sharedServer().controllerConfig().props();
+ ((java.util.HashMap<String, String>)
config).put(SocketServerConfigs.LISTENERS_CONFIG,
+ "CONTROLLER://localhost:" + port);
+
controller.sharedServer().controllerConfig().updateCurrentConfig(config);
+
+ // restart controller
+ controller.startup();
+ TestUtils.waitForCondition(() ->
cluster.controllers().values().stream()
+ .anyMatch(c -> c.controller().isActive()),
+ "Timeout waiting for new controller election");
+ }
+ }
+
+ @Test
+ public void testSnapshotCount() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(0)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
+ .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this
low to generate metadata
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var metaLog = FileSystems.getDefault().getPath(
+ cluster.controllers().get(3000).config().metadataLogDir(),
+ "__cluster_metadata-0");
+ TestUtils.waitForCondition(() -> {
+ var files = metaLog.toFile().listFiles((dir, name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ return files != null && files.length > 0;
+ }, "Failed to see at least one snapshot");
+ Thread.sleep(500 * 10); // Sleep for 10 snapshot intervals
+ var filesAfterTenIntervals = metaLog.toFile().listFiles((dir,
name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ int countAfterTenIntervals = filesAfterTenIntervals != null ?
filesAfterTenIntervals.length : 0;
+ assertTrue(countAfterTenIntervals > 1,
+ "Expected to see at least one more snapshot, saw " +
countAfterTenIntervals);
+ assertTrue(countAfterTenIntervals < 20,
+ "Did not expect to see more than twice as many snapshots as
snapshot intervals, saw " + countAfterTenIntervals);
+ TestUtils.waitForCondition(() -> {
+ var emitterMetrics =
cluster.controllers().values().iterator().next()
+ .sharedServer().snapshotEmitter().metrics();
+ return emitterMetrics.latestSnapshotGeneratedBytes() > 0;
+ }, "Failed to see latestSnapshotGeneratedBytes > 0");
+ }
+ }
+
+ /**
+ * Test a single broker, single controller cluster at the minimum
bootstrap level. This tests
+ * that we can function without having periodic NoOpRecords written.
+ */
+ @Test
+ public void testSingleControllerSingleBrokerCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ }
+ }
+
+ @Test
+ public void testOverlyLargeCreateTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var newTopics = new ArrayList<NewTopic>();
+ for (int i = 0; i <= 10000; i++) {
+ newTopics.add(new NewTopic("foo" + i, 100000, (short) 1));
+ }
+ var executionException = assertThrows(ExecutionException.class,
+ () -> admin.createTopics(newTopics).all().get());
+ assertNotNull(executionException.getCause());
+ assertEquals(PolicyViolationException.class,
executionException.getCause().getClass());
+ assertEquals("Excessively large number of partitions per
request.",
+ executionException.getCause().getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testTimedOutHeartbeats() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG,
"10")
+ .setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG,
"1000")
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ controller.controller().waitForReadyBrokers(3).get();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var latch =
QuorumControllerIntegrationTestUtils.pause((QuorumController)
controller.controller());
+ Thread.sleep(1001);
+ latch.countDown();
+ assertEquals(0,
controller.sharedServer().controllerServerMetrics().fencedBrokerCount());
+
assertTrue(controller.quorumControllerMetrics().timedOutHeartbeats() > 0,
+ "Expected timedOutHeartbeats to be greater than 0.");
+ });
+ }
+ }
+
+ @Test
+ public void testRegisteredControllerEndpoints() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var controller =
cluster.controllers().values().iterator().next();
+ var registeredControllers =
controller.registrationsPublisher().controllers();
+ assertEquals(3, registeredControllers.size(), "Expected 3
controller registrations");
+ registeredControllers.values().forEach(registration -> {
+ assertNotNull(registration.listeners().get("CONTROLLER"));
+ assertNotEquals(0,
registration.listeners().get("CONTROLLER").port());
+ });
+ });
+ }
+ }
+
+ @Test
+ public void
testDirectToControllerCommunicationFailsOnOlderMetadataVersion() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin(Map.of(), true)) {
+ var exception = assertThrows(ExecutionException.class,
+ () -> admin.describeCluster().clusterId().get(1,
TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(UnsupportedVersionException.class,
exception.getCause().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testStartupWithNonDefaultKControllerDynamicConfiguration()
throws Exception {
+ var bootstrapRecords = List.of(
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()),
(short) 0),
+ new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setResourceName("")
+ .setName("num.io.threads")
+ .setValue("9"), (short) 0));
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new
TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords,
"testRecords"))
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ assertNotNull(controller.controllerApisHandlerPool());
+ assertEquals(9,
controller.controllerApisHandlerPool().threadPoolSize().get());
+ });
+ }
+ }
+
+ @Test
+ public void testTopicDeletedAndRecreatedWhileBrokerIsDown() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined());
+ });
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ // Modify foo-0 so that it has the wrong topic ID.
+ var logDir = broker0.logManager().getLog(foo0,
false).get().dir();
+ var partitionMetadataFile = new File(logDir,
"partition.metadata");
+ Files.write(partitionMetadataFile.toPath(),
+ "version: 0\ntopic_id:
AAAAAAAAAAAAA7SrBWaJ7g\n".getBytes(StandardCharsets.UTF_8));
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ });
+ }
+ }
+ }
+
+ @Test
+ public void
testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(2)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined()));
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ // Modify foo-0 so that it refers to a future replica.
+ // This is equivalent to a failure during the promotion of the
future replica and a restart with directory for
+ // the main replica being offline
+ var log = broker0.logManager().getLog(foo0, false).get();
+ log.renameDir(UnifiedLog.logFutureDirName(foo0), false);
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ assertTrue(broker0.logManager().getLog(foo0,
true).isEmpty());
+ });
+ }
+ }
+ }
+
+ @Test
+ public void
testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(2)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined()));
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ var log = broker0.logManager().getLog(foo0, false).get();
+
+ // Copy foo-0 to targetParentDir
+ // This is so that we can rename the main replica to a future
down below
+ var parentDir = log.parentDir();
+ var targetParentDir = broker0.config().logDirs().stream()
+ .filter(l -> !l.equals(parentDir))
+ .findFirst()
+ .orElseThrow();
+ var targetDirFile = new File(targetParentDir,
log.dir().getName());
+ targetDirFile.mkdir();
+ Files.walk(Paths.get(log.dir().toString())).forEach(p -> {
+ var out = Paths.get(targetDirFile.toString(),
+ p.toString().substring(log.dir().toString().length()));
+ if (!p.toString().equals(log.dir().toString())) {
+ try {
+ Files.copy(p, out);
Review Comment:
```java
assertDoesNotThrow(() -> Files.copy(p, out));
```
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -1192,6 +1213,487 @@ public void
testCreateClusterAndCreateTopicWithRemoteLogManagerInstantiation() t
}
}
+ @Test
+ public void testCreateClusterAndRestartControllerNode() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().stream()
+ .filter(c -> c.controller().isActive())
+ .findFirst()
+ .get();
+ var port = controller.socketServer().boundPort(
+
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+ // shutdown active controller
+ controller.shutdown();
+ // Rewrite The `listeners` config to avoid controller socket
server init using different port
+ var config = controller.sharedServer().controllerConfig().props();
+ ((java.util.HashMap<String, String>)
config).put(SocketServerConfigs.LISTENERS_CONFIG,
+ "CONTROLLER://localhost:" + port);
+
controller.sharedServer().controllerConfig().updateCurrentConfig(config);
+
+ // restart controller
+ controller.startup();
+ TestUtils.waitForCondition(() ->
cluster.controllers().values().stream()
+ .anyMatch(c -> c.controller().isActive()),
+ "Timeout waiting for new controller election");
+ }
+ }
+
+ @Test
+ public void testSnapshotCount() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(0)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
+ .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this
low to generate metadata
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var metaLog = FileSystems.getDefault().getPath(
+ cluster.controllers().get(3000).config().metadataLogDir(),
+ "__cluster_metadata-0");
+ TestUtils.waitForCondition(() -> {
+ var files = metaLog.toFile().listFiles((dir, name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ return files != null && files.length > 0;
+ }, "Failed to see at least one snapshot");
+ Thread.sleep(500 * 10); // Sleep for 10 snapshot intervals
+ var filesAfterTenIntervals = metaLog.toFile().listFiles((dir,
name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ int countAfterTenIntervals = filesAfterTenIntervals != null ?
filesAfterTenIntervals.length : 0;
+ assertTrue(countAfterTenIntervals > 1,
+ "Expected to see at least one more snapshot, saw " +
countAfterTenIntervals);
+ assertTrue(countAfterTenIntervals < 20,
+ "Did not expect to see more than twice as many snapshots as
snapshot intervals, saw " + countAfterTenIntervals);
+ TestUtils.waitForCondition(() -> {
+ var emitterMetrics =
cluster.controllers().values().iterator().next()
+ .sharedServer().snapshotEmitter().metrics();
+ return emitterMetrics.latestSnapshotGeneratedBytes() > 0;
+ }, "Failed to see latestSnapshotGeneratedBytes > 0");
+ }
+ }
+
+ /**
+ * Test a single broker, single controller cluster at the minimum
bootstrap level. This tests
+ * that we can function without having periodic NoOpRecords written.
+ */
+ @Test
+ public void testSingleControllerSingleBrokerCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ }
+ }
+
+ @Test
+ public void testOverlyLargeCreateTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var newTopics = new ArrayList<NewTopic>();
+ for (int i = 0; i <= 10000; i++) {
+ newTopics.add(new NewTopic("foo" + i, 100000, (short) 1));
+ }
+ var executionException = assertThrows(ExecutionException.class,
+ () -> admin.createTopics(newTopics).all().get());
+ assertNotNull(executionException.getCause());
+ assertEquals(PolicyViolationException.class,
executionException.getCause().getClass());
+ assertEquals("Excessively large number of partitions per
request.",
+ executionException.getCause().getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testTimedOutHeartbeats() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG,
"10")
+ .setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG,
"1000")
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ controller.controller().waitForReadyBrokers(3).get();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var latch =
QuorumControllerIntegrationTestUtils.pause((QuorumController)
controller.controller());
Review Comment:
I'm not a fan of adding an entire dependency for such a minor reference.
Maybe we can duplicate a bit of code as a trade-off?
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -1192,6 +1213,487 @@ public void
testCreateClusterAndCreateTopicWithRemoteLogManagerInstantiation() t
}
}
+ @Test
+ public void testCreateClusterAndRestartControllerNode() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().stream()
+ .filter(c -> c.controller().isActive())
+ .findFirst()
+ .get();
+ var port = controller.socketServer().boundPort(
+
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+ // shutdown active controller
+ controller.shutdown();
+ // Rewrite The `listeners` config to avoid controller socket
server init using different port
+ var config = controller.sharedServer().controllerConfig().props();
+ ((java.util.HashMap<String, String>)
config).put(SocketServerConfigs.LISTENERS_CONFIG,
Review Comment:
`Map<String, String>`
##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -1192,6 +1213,487 @@ public void
testCreateClusterAndCreateTopicWithRemoteLogManagerInstantiation() t
}
}
+ @Test
+ public void testCreateClusterAndRestartControllerNode() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().stream()
+ .filter(c -> c.controller().isActive())
+ .findFirst()
+ .get();
+ var port = controller.socketServer().boundPort(
+
ListenerName.normalised(controller.config().controllerListeners().head().listener()));
+
+ // shutdown active controller
+ controller.shutdown();
+ // Rewrite The `listeners` config to avoid controller socket
server init using different port
+ var config = controller.sharedServer().controllerConfig().props();
+ ((java.util.HashMap<String, String>)
config).put(SocketServerConfigs.LISTENERS_CONFIG,
+ "CONTROLLER://localhost:" + port);
+
controller.sharedServer().controllerConfig().updateCurrentConfig(config);
+
+ // restart controller
+ controller.startup();
+ TestUtils.waitForCondition(() ->
cluster.controllers().values().stream()
+ .anyMatch(c -> c.controller().isActive()),
+ "Timeout waiting for new controller election");
+ }
+ }
+
+ @Test
+ public void testSnapshotCount() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(0)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp("metadata.log.max.snapshot.interval.ms", "500")
+ .setConfigProp("metadata.max.idle.interval.ms", "50") // Set this
low to generate metadata
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var metaLog = FileSystems.getDefault().getPath(
+ cluster.controllers().get(3000).config().metadataLogDir(),
+ "__cluster_metadata-0");
+ TestUtils.waitForCondition(() -> {
+ var files = metaLog.toFile().listFiles((dir, name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ return files != null && files.length > 0;
+ }, "Failed to see at least one snapshot");
+ Thread.sleep(500 * 10); // Sleep for 10 snapshot intervals
+ var filesAfterTenIntervals = metaLog.toFile().listFiles((dir,
name) ->
+ name.toLowerCase(Locale.ROOT).endsWith("checkpoint")
+ );
+ int countAfterTenIntervals = filesAfterTenIntervals != null ?
filesAfterTenIntervals.length : 0;
+ assertTrue(countAfterTenIntervals > 1,
+ "Expected to see at least one more snapshot, saw " +
countAfterTenIntervals);
+ assertTrue(countAfterTenIntervals < 20,
+ "Did not expect to see more than twice as many snapshots as
snapshot intervals, saw " + countAfterTenIntervals);
+ TestUtils.waitForCondition(() -> {
+ var emitterMetrics =
cluster.controllers().values().iterator().next()
+ .sharedServer().snapshotEmitter().metrics();
+ return emitterMetrics.latestSnapshotGeneratedBytes() > 0;
+ }, "Failed to see latestSnapshotGeneratedBytes > 0");
+ }
+ }
+
+ /**
+ * Test a single broker, single controller cluster at the minimum
bootstrap level. This tests
+ * that we can function without having periodic NoOpRecords written.
+ */
+ @Test
+ public void testSingleControllerSingleBrokerCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.MINIMUM_VERSION)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ }
+ }
+
+ @Test
+ public void testOverlyLargeCreateTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var newTopics = new ArrayList<NewTopic>();
+ for (int i = 0; i <= 10000; i++) {
+ newTopics.add(new NewTopic("foo" + i, 100000, (short) 1));
+ }
+ var executionException = assertThrows(ExecutionException.class,
+ () -> admin.createTopics(newTopics).all().get());
+ assertNotNull(executionException.getCause());
+ assertEquals(PolicyViolationException.class,
executionException.getCause().getClass());
+ assertEquals("Excessively large number of partitions per
request.",
+ executionException.getCause().getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testTimedOutHeartbeats() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .setConfigProp(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG,
"10")
+ .setConfigProp(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG,
"1000")
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ controller.controller().waitForReadyBrokers(3).get();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var latch =
QuorumControllerIntegrationTestUtils.pause((QuorumController)
controller.controller());
+ Thread.sleep(1001);
+ latch.countDown();
+ assertEquals(0,
controller.sharedServer().controllerServerMetrics().fencedBrokerCount());
+
assertTrue(controller.quorumControllerMetrics().timedOutHeartbeats() > 0,
+ "Expected timedOutHeartbeats to be greater than 0.");
+ });
+ }
+ }
+
+ @Test
+ public void testRegisteredControllerEndpoints() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var controller =
cluster.controllers().values().iterator().next();
+ var registeredControllers =
controller.registrationsPublisher().controllers();
+ assertEquals(3, registeredControllers.size(), "Expected 3
controller registrations");
+ registeredControllers.values().forEach(registration -> {
+ assertNotNull(registration.listeners().get("CONTROLLER"));
+ assertNotEquals(0,
registration.listeners().get("CONTROLLER").port());
+ });
+ });
+ }
+ }
+
+ @Test
+ public void
testDirectToControllerCommunicationFailsOnOlderMetadataVersion() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin(Map.of(), true)) {
+ var exception = assertThrows(ExecutionException.class,
+ () -> admin.describeCluster().clusterId().get(1,
TimeUnit.MINUTES));
+ assertNotNull(exception.getCause());
+ assertEquals(UnsupportedVersionException.class,
exception.getCause().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testStartupWithNonDefaultKControllerDynamicConfiguration()
throws Exception {
+ var bootstrapRecords = List.of(
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()),
(short) 0),
+ new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setResourceName("")
+ .setName("num.io.threads")
+ .setValue("9"), (short) 0));
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new
TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords,
"testRecords"))
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ var controller = cluster.controllers().values().iterator().next();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ assertNotNull(controller.controllerApisHandlerPool());
+ assertEquals(9,
controller.controllerApisHandlerPool().threadPoolSize().get());
+ });
+ }
+ }
+
+ @Test
+ public void testTopicDeletedAndRecreatedWhileBrokerIsDown() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2)
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined());
+ });
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ // Modify foo-0 so that it has the wrong topic ID.
+ var logDir = broker0.logManager().getLog(foo0,
false).get().dir();
+ var partitionMetadataFile = new File(logDir,
"partition.metadata");
+ Files.write(partitionMetadataFile.toPath(),
+ "version: 0\ntopic_id:
AAAAAAAAAAAAA7SrBWaJ7g\n".getBytes(StandardCharsets.UTF_8));
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ });
+ }
+ }
+ }
+
+ @Test
+ public void
testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(2)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined()));
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ // Modify foo-0 so that it refers to a future replica.
+ // This is equivalent to a failure during the promotion of the
future replica and a restart with directory for
+ // the main replica being offline
+ var log = broker0.logManager().getLog(foo0, false).get();
+ log.renameDir(UnifiedLog.logFutureDirName(foo0), false);
+
+ // Start up broker0 and wait until the ISR of foo-0 is set to
[0, 1, 2]
+ broker0.startup();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(0, 1, 2), new
HashSet<>(info.get().isr()));
+ assertTrue(broker0.logManager().getLog(foo0,
true).isEmpty());
+ });
+ }
+ }
+ }
+
+ @Test
+ public void
testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(2)
+ .setNumControllerNodes(1)
+ .build())
+ .build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = cluster.admin()) {
+ var broker0 = cluster.brokers().get(0);
+ var broker1 = cluster.brokers().get(1);
+ var foo0 = new TopicPartition("foo", 0);
+
+ admin.createTopics(List.of(
+ new NewTopic("foo", 3, (short) 3))).all().get();
+
+ // Wait until foo-0 is created on broker0.
+ TestUtils.retryOnExceptionWithTimeout(60000, () ->
+ assertTrue(broker0.logManager().getLog(foo0,
false).isDefined()));
+
+ // Shut down broker0 and wait until the ISR of foo-0 is set to
[1, 2]
+ broker0.shutdown();
+ TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+ var info = broker1.metadataCache().getLeaderAndIsr("foo",
0);
+ assertTrue(info.isPresent());
+ assertEquals(Set.of(1, 2), new
HashSet<>(info.get().isr()));
+ });
+
+ var log = broker0.logManager().getLog(foo0, false).get();
+
+ // Copy foo-0 to targetParentDir
+ // This is so that we can rename the main replica to a future
down below
+ var parentDir = log.parentDir();
+ var targetParentDir = broker0.config().logDirs().stream()
+ .filter(l -> !l.equals(parentDir))
+ .findFirst()
+ .orElseThrow();
+ var targetDirFile = new File(targetParentDir,
log.dir().getName());
+ targetDirFile.mkdir();
+ Files.walk(Paths.get(log.dir().toString())).forEach(p -> {
Review Comment:
The `Stream` returned by `Files.walk` needs to be closed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]