This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new eef82db59f3 [fix][broker]Failed to create partitions after the
partitions were deleted because topic GC (#24651)
eef82db59f3 is described below
commit eef82db59f37e0911c507171d2316c9685caca51
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 28 16:04:29 2025 +0800
[fix][broker]Failed to create partitions after the partitions were deleted
because topic GC (#24651)
(cherry picked from commit 7c576832631525e6baee264bb485e7fa625db2e3)
---
.../pulsar/broker/service/BrokerService.java | 15 +++-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../BrokerServiceAutoTopicCreationTest.java | 100 +++++++++++++++++----
.../nonpersistent/NonPersistentTopicTest.java | 2 +-
.../service/persistent/PersistentTopicTest.java | 2 +-
5 files changed, 100 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2bdad2bcc7b..75da0b16d93 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3593,11 +3593,20 @@ public class BrokerService implements Closeable {
allowed = pulsar.getConfiguration().isAllowAutoTopicCreation();
}
- if (allowed && topicName.isPartitioned()) {
+ if (topicName.isPartitioned()) {
+ TopicName partitionedTopic =
TopicName.get(topicName.getPartitionedTopicName());
// cannot re-create topic while it is being deleted
return
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .isPartitionedTopicBeingDeletedAsync(topicName)
- .thenApply(beingDeleted -> !beingDeleted);
+ .getPartitionedTopicMetadataAsync(partitionedTopic, true)
+ .thenApply(partitionedTopicMetadata -> {
+ if (partitionedTopicMetadata.isEmpty()) {
+ return allowed;
+ }
+ if (partitionedTopicMetadata.get().deleted) {
+ return false;
+ }
+ return partitionedTopicMetadata.get().partitions >
topicName.getPartitionIndex();
+ });
} else {
return CompletableFuture.completedFuture(allowed);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 801b888f91c..38a8a374e2f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3273,7 +3273,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topics().createSubscription(partitionedTopicName +
"-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
- } catch (PulsarAdminException.ConflictException ex) {
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
// OK
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index a88e4b66d87..89f30d68e21 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -30,7 +30,9 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
@@ -41,6 +43,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.awaitility.Awaitility;
@@ -434,15 +438,19 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
}
@Test
- public void testDynamicConfigurationTopicAutoCreationDisable() throws
PulsarAdminException {
+ public void testDynamicConfigurationTopicAutoCreationDisable() throws
Exception {
// test disable AllowAutoTopicCreation
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation",
"false");
final String namespaceName = "prop/ns-abc";
final String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ UUID.randomUUID();
- Assert.assertThrows(PulsarClientException.NotFoundException.class,
- ()-> pulsarClient.newProducer().topic(topic).create());
+ try {
+ pulsarClient.newProducer().topic(topic).create();
+ fail("expected a topic not found exception");
+ } catch (PulsarClientException.NotFoundException e) {
+ // expected.
+ }
}
@Test
@@ -459,8 +467,12 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
- List<String> topics = admin.topics().getList(namespaceName);
- List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ List<String> topics =
admin.topics().getList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+ }).toList();
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+ }).toList();
assertEquals(topics.size(), 1);
assertEquals(partitionedTopicList.size(), 0);
producer.close();
@@ -489,8 +501,12 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
});
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
- List<String> topics = admin.topics().getList(namespaceName);
- List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ List<String> topics =
admin.topics().getList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+ }).toList();;
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+ }).toList();;
PartitionedTopicMetadata partitionedTopicMetadata =
admin.topics().getPartitionedTopicMetadata(topic);
assertEquals(topics.size(), 4);
assertEquals(partitionedTopicList.size(), 1);
@@ -508,15 +524,19 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
final String namespaceName = "prop/ns-abc";
- String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ final String topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ UUID.randomUUID();
// test enable AllowAutoTopicCreation,
// partitioned when maxNumPartitionsPerPartitionedTopic <
defaultNumPartitions
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic",
"2");
admin.brokers().updateDynamicConfiguration("defaultNumPartitions",
"6");
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
- List<String> topics = admin.topics().getList(namespaceName);
- List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ List<String> topics =
admin.topics().getList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+ }).toList();
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+ }).toList();
PartitionedTopicMetadata partitionedTopicMetadata =
admin.topics().getPartitionedTopicMetadata(topic);
assertEquals(topics.size(), 2);
assertEquals(partitionedTopicList.size(), 1);
@@ -532,13 +552,15 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
Awaitility.await().untilAsserted(() ->
assertEquals(admin.brokers().getAllDynamicConfigurations()
.get("maxNumPartitionsPerPartitionedTopic"), "1"));
- topic = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ final String topic2 = "persistent://" + namespaceName +
"/test-dynamicConfiguration-topic-auto-creation-"
+ UUID.randomUUID();
- producer = pulsarClient.newProducer().topic(topic).create();
- topics = admin.topics().getList(namespaceName);
- assertEquals(topics.size(), 1);
+ producer = pulsarClient.newProducer().topic(topic2).create();
+ List<String> topics2 =
admin.topics().getList(namespaceName).stream().filter(tp -> {
+ return TopicName.get(tp).getPartitionedTopicName().equals(topic2);
+ }).toList();
+ assertEquals(topics2.size(), 1);
producer.close();
- for (String t : topics) {
+ for (String t : topics2) {
admin.topics().delete(t);
}
}
@@ -596,4 +618,52 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
}
+ @Test
+ public void testCreateTopicAfterGC() throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+ final String topicP0 = TopicName.get(topic).getPartition(0).toString();
+ // Disable topic auto-creation.
+ boolean originalAllowAutoTopicCreation =
pulsar.getConfiguration().isAllowAutoTopicCreation();
+ boolean originalDeleteInactiveTopics =
+
pulsar.getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+ boolean originalDeleteInactivePartitionedTopicMetadataE =
+
pulsar.getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(true);
+
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
+ // Create partitioned topic.
+ Assert.assertThrows(PulsarClientException.NotFoundException.class,
+ () -> pulsarClient.newProducer().topic(topic).create());
+ admin.topics().createPartitionedTopic(topic, 1);
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topicP0,
false).join().get();
+ // Enable topic GC.
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true);
+ admin.topicPolicies().setInactiveTopicPolicies(topic,
inactiveTopicPolicies);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+ .getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+ .getMaxInactiveDurationSeconds(), 1);
+
assertTrue(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+ .isDeleteWhileInactive());
+ });
+ // Trigger topic GC.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ persistentTopic.checkGC();
+ assertFalse(pulsar.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(topicP0)).get());
+ });
+
+ // Verify: the missed partition can be loaded up since the partitioned
topic metadata exists.
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // cleanup.
+ admin.topics().deletePartitionedTopic(topic);
+
pulsar.getConfiguration().setAllowAutoTopicCreation(originalAllowAutoTopicCreation);
+
pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(originalDeleteInactiveTopics);
+
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(
+ originalDeleteInactivePartitionedTopicMetadataE);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 54d411314f8..0acd574bb09 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -122,7 +122,7 @@ public class NonPersistentTopicTest extends BrokerTestBase {
.topic(partition.toString())
.create();
fail("unexpected behaviour");
- } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+ } catch (PulsarClientException.NotFoundException ignored) {
}
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index bc33e6d6f39..462a5002bd3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -575,7 +575,7 @@ public class PersistentTopicTest extends BrokerTestBase {
.topic(partition.toString())
.create();
fail("unexpected behaviour");
- } catch (PulsarClientException.NotAllowedException ex) {
+ } catch (PulsarClientException.NotFoundException ex) {
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}