This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c576832631 [fix][broker]Failed to create partitions after the 
partitions were deleted because topic GC (#24651)
7c576832631 is described below

commit 7c576832631525e6baee264bb485e7fa625db2e3
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 28 16:04:29 2025 +0800

    [fix][broker]Failed to create partitions after the partitions were deleted 
because topic GC (#24651)
---
 .../pulsar/broker/service/BrokerService.java       | 15 ++++--
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../BrokerServiceAutoTopicCreationTest.java        | 53 ++++++++++++++++++++++
 .../nonpersistent/NonPersistentTopicTest.java      |  2 +-
 .../service/persistent/PersistentTopicTest.java    |  2 +-
 .../pulsar/client/api/ConsumerCreationTest.java    | 16 ++-----
 .../pulsar/client/api/ProducerCreationTest.java    | 19 ++++----
 7 files changed, 81 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a149abea50f..1f2e02da77e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3614,11 +3614,20 @@ public class BrokerService implements Closeable {
             allowed = pulsar.getConfiguration().isAllowAutoTopicCreation();
         }
 
-        if (allowed && topicName.isPartitioned()) {
+        if (topicName.isPartitioned()) {
+            TopicName partitionedTopic = 
TopicName.get(topicName.getPartitionedTopicName());
             // cannot re-create topic while it is being deleted
             return 
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                    .isPartitionedTopicBeingDeletedAsync(topicName)
-                    .thenApply(beingDeleted -> !beingDeleted);
+                .getPartitionedTopicMetadataAsync(partitionedTopic, true)
+                .thenApply(partitionedTopicMetadata -> {
+                    if (partitionedTopicMetadata.isEmpty()) {
+                        return allowed;
+                    }
+                    if (partitionedTopicMetadata.get().deleted) {
+                        return false;
+                    }
+                    return partitionedTopicMetadata.get().partitions > 
topicName.getPartitionIndex();
+                });
         } else {
             return CompletableFuture.completedFuture(allowed);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index c14c3d4e64d..bb23c6cf43a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3498,7 +3498,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             admin.topics().createSubscription(partitionedTopicName + 
"-partition-" + startPartitions, subName1,
                     MessageId.earliest);
             fail("Unexpected behaviour");
-        } catch (PulsarAdminException.ConflictException ex) {
+        } catch (PulsarAdminException.PreconditionFailedException ex) {
             // OK
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 16ddc38b3eb..dda26d7d631 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -30,7 +30,9 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
@@ -41,6 +43,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.awaitility.Awaitility;
@@ -615,4 +619,53 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         producer.close();
     }
 
+    @Test
+    public void testCreateTopicAfterGC() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+        final String topicP0 = TopicName.get(topic).getPartition(0).toString();
+        // Disable topic auto-creation.
+        boolean originalAllowAutoTopicCreation = 
pulsar.getConfiguration().isAllowAutoTopicCreation();
+        boolean originalDeleteInactiveTopics =
+                
pulsar.getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+        boolean originalDeleteInactivePartitionedTopicMetadataE =
+                
pulsar.getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(true);
+        
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
+        // Create partitioned topic.
+        Assert.assertThrows(PulsarClientException.NotFoundException.class,
+                () -> pulsarClient.newProducer().topic(topic).create());
+        admin.topics().createPartitionedTopic(topic, 1);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicP0, 
false).join().get();
+        // Enable topic GC.
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true);
+        admin.topicPolicies().setInactiveTopicPolicies(topic, 
inactiveTopicPolicies);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+                    .getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+            
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+                    .getMaxInactiveDurationSeconds(), 1);
+            
assertTrue(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+                    .isDeleteWhileInactive());
+        });
+        // Trigger topic GC.
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            persistentTopic.checkGC();
+            assertFalse(pulsar.getPulsarResources().getTopicResources()
+                    .persistentTopicExists(TopicName.get(topicP0)).get());
+        });
+
+        // Verify: the missed partition can be loaded up since the partitioned 
topic metadata exists.
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        // cleanup.
+        admin.topics().deletePartitionedTopic(topic);
+        
pulsar.getConfiguration().setAllowAutoTopicCreation(originalAllowAutoTopicCreation);
+        
pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(originalDeleteInactiveTopics);
+        
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(
+                originalDeleteInactivePartitionedTopicMetadataE);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index d902434f9bd..12b6cd2761a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -116,7 +116,7 @@ public class NonPersistentTopicTest extends BrokerTestBase {
         final String topicName = 
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
         admin.topics().createPartitionedTopic(topicName, 4);
         TopicName partition = TopicName.get(topicName).getPartition(4);
-        assertThrows(PulsarClientException.NotAllowedException.class, () -> {
+        assertThrows(PulsarClientException.NotFoundException.class, () -> {
             @Cleanup
             Producer<byte[]> ignored = pulsarClient.newProducer()
                     .topic(partition.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index f6bf6aa38e8..364360573da 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -575,7 +575,7 @@ public class PersistentTopicTest extends BrokerTestBase {
                     .topic(partition.toString())
                     .create();
             fail("unexpected behaviour");
-        } catch (PulsarClientException.NotAllowedException ex) {
+        } catch (PulsarClientException.NotFoundException ex) {
         }
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
index 195485739e0..b4199c9ef08 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
@@ -86,7 +86,7 @@ public class ConsumerCreationTest extends 
ProducerConsumerBase {
         }
 
         // Partition index is out of range.
-        assertThrows(NotAllowedException.class, () -> {
+        assertThrows(PulsarClientException.NotFoundException.class, () -> {
             @Cleanup
             Consumer<byte[]> ignored =
                     
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
@@ -112,16 +112,8 @@ public class ConsumerCreationTest extends 
ProducerConsumerBase {
         
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
 
         // Non-persistent topic only have the metadata, and no partition, so 
it works fine.
-        if (allowAutoTopicCreation || 
domain.equals(TopicDomain.non_persistent)) {
-            @Cleanup
-            Consumer<byte[]> ignored =
-                    
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
-        } else {
-            assertThrows(PulsarClientException.NotFoundException.class, () -> {
-                @Cleanup
-                Consumer<byte[]> ignored =
-                        
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
-            });
-        }
+        @Cleanup
+        Consumer<byte[]> ignored =
+                
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index e13423a2131..c742d1b86d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -231,7 +231,13 @@ public class ProducerCreationTest extends 
ProducerConsumerBase {
         }
 
         // Partition index is out of range.
-        assertThrows(NotAllowedException.class, () -> {
+        assertThrows(PulsarClientException.NotFoundException.class, () -> {
+            @Cleanup
+            Producer<byte[]> ignored =
+                    
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(3).toString())
+                            .create();
+        });
+        assertThrows(PulsarClientException.NotFoundException.class, () -> {
             @Cleanup
             Producer<byte[]> ignored =
                     
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
@@ -257,14 +263,7 @@ public class ProducerCreationTest extends 
ProducerConsumerBase {
         
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
 
         // Non-persistent topic only have the metadata, and no partition, so 
it works fine.
-        if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) {
-            @Cleanup
-            Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
-        } else {
-            assertThrows(PulsarClientException.NotFoundException.class, () -> {
-                @Cleanup
-                Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
-            });
-        }
+        @Cleanup
+        Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
     }
 }

Reply via email to