This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c576832631 [fix][broker]Failed to create partitions after the
partitions were deleted because topic GC (#24651)
7c576832631 is described below
commit 7c576832631525e6baee264bb485e7fa625db2e3
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 28 16:04:29 2025 +0800
[fix][broker]Failed to create partitions after the partitions were deleted
because topic GC (#24651)
---
.../pulsar/broker/service/BrokerService.java | 15 ++++--
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../BrokerServiceAutoTopicCreationTest.java | 53 ++++++++++++++++++++++
.../nonpersistent/NonPersistentTopicTest.java | 2 +-
.../service/persistent/PersistentTopicTest.java | 2 +-
.../pulsar/client/api/ConsumerCreationTest.java | 16 ++-----
.../pulsar/client/api/ProducerCreationTest.java | 19 ++++----
7 files changed, 81 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a149abea50f..1f2e02da77e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3614,11 +3614,20 @@ public class BrokerService implements Closeable {
allowed = pulsar.getConfiguration().isAllowAutoTopicCreation();
}
- if (allowed && topicName.isPartitioned()) {
+ if (topicName.isPartitioned()) {
+ TopicName partitionedTopic =
TopicName.get(topicName.getPartitionedTopicName());
// cannot re-create topic while it is being deleted
return
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .isPartitionedTopicBeingDeletedAsync(topicName)
- .thenApply(beingDeleted -> !beingDeleted);
+ .getPartitionedTopicMetadataAsync(partitionedTopic, true)
+ .thenApply(partitionedTopicMetadata -> {
+ if (partitionedTopicMetadata.isEmpty()) {
+ return allowed;
+ }
+ if (partitionedTopicMetadata.get().deleted) {
+ return false;
+ }
+ return partitionedTopicMetadata.get().partitions >
topicName.getPartitionIndex();
+ });
} else {
return CompletableFuture.completedFuture(allowed);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index c14c3d4e64d..bb23c6cf43a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3498,7 +3498,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topics().createSubscription(partitionedTopicName +
"-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
- } catch (PulsarAdminException.ConflictException ex) {
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
// OK
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 16ddc38b3eb..dda26d7d631 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -30,7 +30,9 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
@@ -41,6 +43,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.awaitility.Awaitility;
@@ -615,4 +619,53 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
producer.close();
}
+ @Test
+ public void testCreateTopicAfterGC() throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+ final String topicP0 = TopicName.get(topic).getPartition(0).toString();
+ // Disable topic auto-creation.
+ boolean originalAllowAutoTopicCreation =
pulsar.getConfiguration().isAllowAutoTopicCreation();
+ boolean originalDeleteInactiveTopics =
+
pulsar.getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+ boolean originalDeleteInactivePartitionedTopicMetadataE =
+
pulsar.getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(true);
+
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
+ // Create partitioned topic.
+ Assert.assertThrows(PulsarClientException.NotFoundException.class,
+ () -> pulsarClient.newProducer().topic(topic).create());
+ admin.topics().createPartitionedTopic(topic, 1);
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topicP0,
false).join().get();
+ // Enable topic GC.
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true);
+ admin.topicPolicies().setInactiveTopicPolicies(topic,
inactiveTopicPolicies);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+ .getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+ .getMaxInactiveDurationSeconds(), 1);
+
assertTrue(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+ .isDeleteWhileInactive());
+ });
+ // Trigger topic GC.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ persistentTopic.checkGC();
+ assertFalse(pulsar.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(topicP0)).get());
+ });
+
+ // Verify: the missed partition can be loaded up since the partitioned
topic metadata exists.
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ // cleanup.
+ admin.topics().deletePartitionedTopic(topic);
+
pulsar.getConfiguration().setAllowAutoTopicCreation(originalAllowAutoTopicCreation);
+
pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(originalDeleteInactiveTopics);
+
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(
+ originalDeleteInactivePartitionedTopicMetadataE);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index d902434f9bd..12b6cd2761a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -116,7 +116,7 @@ public class NonPersistentTopicTest extends BrokerTestBase {
final String topicName =
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
admin.topics().createPartitionedTopic(topicName, 4);
TopicName partition = TopicName.get(topicName).getPartition(4);
- assertThrows(PulsarClientException.NotAllowedException.class, () -> {
+ assertThrows(PulsarClientException.NotFoundException.class, () -> {
@Cleanup
Producer<byte[]> ignored = pulsarClient.newProducer()
.topic(partition.toString())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index f6bf6aa38e8..364360573da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -575,7 +575,7 @@ public class PersistentTopicTest extends BrokerTestBase {
.topic(partition.toString())
.create();
fail("unexpected behaviour");
- } catch (PulsarClientException.NotAllowedException ex) {
+ } catch (PulsarClientException.NotFoundException ex) {
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
index 195485739e0..b4199c9ef08 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
@@ -86,7 +86,7 @@ public class ConsumerCreationTest extends
ProducerConsumerBase {
}
// Partition index is out of range.
- assertThrows(NotAllowedException.class, () -> {
+ assertThrows(PulsarClientException.NotFoundException.class, () -> {
@Cleanup
Consumer<byte[]> ignored =
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
@@ -112,16 +112,8 @@ public class ConsumerCreationTest extends
ProducerConsumerBase {
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
// Non-persistent topic only have the metadata, and no partition, so
it works fine.
- if (allowAutoTopicCreation ||
domain.equals(TopicDomain.non_persistent)) {
- @Cleanup
- Consumer<byte[]> ignored =
-
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
- } else {
- assertThrows(PulsarClientException.NotFoundException.class, () -> {
- @Cleanup
- Consumer<byte[]> ignored =
-
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
- });
- }
+ @Cleanup
+ Consumer<byte[]> ignored =
+
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index e13423a2131..c742d1b86d6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -231,7 +231,13 @@ public class ProducerCreationTest extends
ProducerConsumerBase {
}
// Partition index is out of range.
- assertThrows(NotAllowedException.class, () -> {
+ assertThrows(PulsarClientException.NotFoundException.class, () -> {
+ @Cleanup
+ Producer<byte[]> ignored =
+
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(3).toString())
+ .create();
+ });
+ assertThrows(PulsarClientException.NotFoundException.class, () -> {
@Cleanup
Producer<byte[]> ignored =
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
@@ -257,14 +263,7 @@ public class ProducerCreationTest extends
ProducerConsumerBase {
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
// Non-persistent topic only have the metadata, and no partition, so
it works fine.
- if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) {
- @Cleanup
- Producer<byte[]> ignored =
pulsarClient.newProducer().topic(partitionedTopic).create();
- } else {
- assertThrows(PulsarClientException.NotFoundException.class, () -> {
- @Cleanup
- Producer<byte[]> ignored =
pulsarClient.newProducer().topic(partitionedTopic).create();
- });
- }
+ @Cleanup
+ Producer<byte[]> ignored =
pulsarClient.newProducer().topic(partitionedTopic).create();
}
}