This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed10ec33dc5 [improve][broker]Part-2 Add Admin API to delete topic
policies (#24602)
ed10ec33dc5 is described below
commit ed10ec33dc554d1bf9722d419f83e95fc0122157
Author: fengyubiao <[email protected]>
AuthorDate: Sat Aug 9 12:18:28 2025 +0800
[improve][broker]Part-2 Add Admin API to delete topic policies (#24602)
---
.../pulsar/broker/admin/v2/PersistentTopics.java | 36 +++++++++++++++
.../broker/service/OneWayReplicatorTestBase.java | 22 +++++++---
.../service/OneWayReplicatorUsingGlobalZKTest.java | 51 +++++++++++++++++++++-
.../broker/service/TopicPolicyTestUtils.java | 8 ++++
.../apache/pulsar/client/admin/TopicPolicies.java | 11 +++++
.../client/admin/internal/TopicPoliciesImpl.java | 11 +++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 16 ++++++-
7 files changed, 146 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c539dad7426..5acb21f15a8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1203,6 +1203,42 @@ public class PersistentTopics extends
PersistentTopicsBase {
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/policies")
+ @ApiOperation(value = "Delete policies for a topic.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic does not
exist"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void deleteTopicPolicies(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS,
PolicyOperation.WRITE)
+ .thenCompose(__ ->
pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName, false))
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (t instanceof IllegalStateException){
+ ex = new RestException(422/* Unprocessable entity*/,
t.getMessage());
+ } else if (isNot307And4xxException(ex)) {
+ log.error("[{}] Failed to delete topic {}",
clientAppId(), topicName, t);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/subscriptions")
@ApiOperation(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 58d8bcf7045..0196100b8ba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -467,27 +467,34 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
protected void setTopicLevelClusters(String topic, List<String> clusters,
PulsarAdmin admin,
PulsarService pulsar) throws
Exception {
+ setTopicLevelClusters(topic, clusters, admin, pulsar, false);
+ }
+
+ protected void setTopicLevelClusters(String topic, List<String> clusters,
PulsarAdmin admin,
+ PulsarService pulsar, boolean global)
throws Exception {
Set<String> expected = new HashSet<>(clusters);
TopicName topicName =
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
int partitions = ensurePartitionsAreSame(topic);
- admin.topics().setReplicationClusters(topic, clusters);
+ admin.topicPolicies(global).setReplicationClusters(topic, clusters);
Awaitility.await().untilAsserted(() -> {
- TopicPolicies policies =
TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(),
topicName);
+ TopicPolicies policies =
TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(),
topicName,
+ global);
assertEquals(new HashSet<>(policies.getReplicationClusters()),
expected);
if (partitions == 0) {
- checkNonPartitionedTopicLevelClusters(topicName.toString(),
clusters, admin, pulsar.getBrokerService());
+ checkNonPartitionedTopicLevelClusters(topicName.toString(),
clusters, admin, pulsar,
+ global);
} else {
for (int i = 0; i < partitions; i++) {
checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(),
clusters, admin,
- pulsar.getBrokerService());
+ pulsar, global);
}
}
});
}
protected void checkNonPartitionedTopicLevelClusters(String topic,
List<String> clusters, PulsarAdmin admin,
- BrokerService broker) throws
Exception {
- CompletableFuture<Optional<Topic>> future = broker.getTopic(topic,
false);
+ PulsarService pulsar,
boolean global) throws Exception {
+ CompletableFuture<Optional<Topic>> future =
pulsar.getBrokerService().getTopic(topic, false);
if (future == null) {
return;
}
@@ -497,7 +504,8 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
}
PersistentTopic persistentTopic = (PersistentTopic) optional.get();
Set<String> expected = new HashSet<>(clusters);
- Set<String> act = new
HashSet<>(TopicPolicyTestUtils.getTopicPolicies(persistentTopic)
+ Set<String> act = new HashSet<>(TopicPolicyTestUtils
+ .getTopicPolicies(pulsar.getTopicPoliciesService(),
TopicName.get(persistentTopic.topic), global)
.getReplicationClusters());
assertEquals(act, expected);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index e14cc5045d6..5837de8c809 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -317,9 +319,56 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testDeleteNonPartitionedTopic();
}
+ @Override
@Test
public void testDeletePartitionedTopic() throws Exception {
- super.testDeletePartitionedTopic();
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ admin1.topics().createPartitionedTopic(topicName, 2);
+
+ // Verify replicator works.
+ verifyReplicationWorks(topicName);
+
+ // Remove remote cluster from remote cluster.
+ setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1,
pulsar1, true);
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(pulsar1.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+ assertTrue(pulsar1.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+ });
+
+
+ // Delete topic.
+ admin1.topics().deletePartitionedTopic(topicName);
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ Optional<TopicPolicies> op1 = pulsar1.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topicName),
GLOBAL_ONLY).join();
+ assertFalse(op1.isPresent());
+ Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topicName),
GLOBAL_ONLY).join();
+ assertTrue(op2.isPresent());
+ });
+ admin2.topicPolicies().deleteTopicPolicies(topicName);
+ Awaitility.await().untilAsserted(() -> {
+ Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topicName),
GLOBAL_ONLY).join();
+ assertFalse(op2.isPresent());
+ });
}
@Test(enabled = false)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 0aa8e070d31..b5adaeccad7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -48,6 +48,14 @@ public class TopicPolicyTestUtils {
.orElse(null);
}
+ public static TopicPolicies getTopicPolicies(TopicPoliciesService
topicPoliciesService, TopicName topicName,
+ boolean global) throws ExecutionException, InterruptedException {
+ TopicPoliciesService.GetType getType = global ?
TopicPoliciesService.GetType.GLOBAL_ONLY
+ : TopicPoliciesService.GetType.LOCAL_ONLY;
+ return topicPoliciesService.getTopicPoliciesAsync(topicName,
getType).get()
+ .orElse(null);
+ }
+
public static TopicPolicies getLocalTopicPolicies(TopicPoliciesService
topicPoliciesService, TopicName topicName)
throws ExecutionException, InterruptedException {
return topicPoliciesService.getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY).get()
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 7a5623f849f..3e985dd7281 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -1936,7 +1936,18 @@ public interface TopicPolicies {
*/
CompletableFuture<Void> setReplicationClusters(String topic, List<String>
clusterIds);
+ /**
+ * get the replication clusters for the topic.
+ */
Set<String> getReplicationClusters(String topic, boolean applied) throws
PulsarAdminException;
+ /**
+ * get the replication clusters for the topic.
+ */
void removeReplicationClusters(String topic) throws PulsarAdminException;
+
+ /**
+ * Delete topic policies, it works even if the topic has been deleted.
+ */
+ void deleteTopicPolicies(String topic) throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 0a4a816640f..6cfa981f1c4 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -1312,6 +1312,17 @@ public class TopicPoliciesImpl extends BaseResource
implements TopicPolicies {
return asyncDeleteRequest(path);
}
+ @Override
+ public void deleteTopicPolicies(String topic) throws PulsarAdminException {
+ sync(() -> deleteTopicPoliciesAsync(topic));
+ }
+
+ public CompletableFuture<Void> deleteTopicPoliciesAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "policies");
+ return asyncDeleteRequest(path);
+ }
+
/*
* returns topic name with encoded Local Name
*/
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index b49c4d40a53..5730722a486 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -60,7 +60,7 @@ public class CmdTopicPolicies extends CmdBase {
public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
super("topicPolicies", admin);
-
+ addCommand("delete", new DeletePolicies());
addCommand("get-message-ttl", new GetMessageTTL());
addCommand("set-message-ttl", new SetMessageTTL());
addCommand("remove-message-ttl", new RemoveMessageTTL());
@@ -2058,6 +2058,20 @@ public class CmdTopicPolicies extends CmdBase {
}
}
+ @Command(description = "Remove the all policies for a topic, it will not
remove policies from the remote"
+ + "cluster")
+ private class DeletePolicies extends CliCommand {
+
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(topicName);
+ getTopicPolicies(false).deleteTopicPolicies(persistentTopic);
+ }
+ }
+
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}