This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 46a76e98d6a [improve][broker] Replace isServiceUnitActiveAsync with
checkTopicNsOwnership (#24780)
46a76e98d6a is described below
commit 46a76e98d6a6f2a86839d93467c5337bb205f851
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Sep 25 17:02:53 2025 +0800
[improve][broker] Replace isServiceUnitActiveAsync with
checkTopicNsOwnership (#24780)
---
.../pulsar/broker/namespace/NamespaceService.java | 31 ------------
.../pulsar/broker/service/BrokerService.java | 55 +++++++++-------------
.../PersistentDispatcherFailoverConsumerTest.java | 2 -
.../service/PersistentTopicConcurrentTest.java | 2 -
.../pulsar/broker/service/PersistentTopicTest.java | 2 -
.../pulsar/broker/service/ServerCnxTest.java | 6 +--
.../client/api/OrphanPersistentTopicTest.java | 6 +--
7 files changed, 28 insertions(+), 76 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 32fe4ca1449..a92acf2177e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -43,9 +43,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -1225,35 +1223,6 @@ public class NamespaceService implements AutoCloseable {
new IllegalArgumentException("Invalid class of
NamespaceBundle: " + suName.getClass().getName()));
}
- /**
- * @deprecated This method is only used in test now.
- */
- @Deprecated
- public boolean isServiceUnitActive(TopicName topicName) {
- try {
- return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
- .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- LOG.warn("Unable to find OwnedBundle for topic in time - [{}]",
topicName, e);
- throw new RuntimeException(e);
- }
- }
-
- public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName
topicName) {
- // TODO: Add unit tests cover it.
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
- return getBundleAsync(topicName)
- .thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
- }
- return getBundleAsync(topicName).thenCompose(bundle -> {
- Optional<CompletableFuture<OwnedBundle>> optionalFuture =
ownershipCache.getOwnedBundleAsync(bundle);
- if (optionalFuture.isEmpty()) {
- return CompletableFuture.completedFuture(false);
- }
- return optionalFuture.get().thenApply(ob -> ob != null &&
ob.isActive());
- });
- }
-
private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName
fqnn) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6976b4f2fce..9bd107f3320 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1736,38 +1736,29 @@ public class BrokerService implements Closeable {
CompletableFuture<Optional<Topic>>
topicFuture,
Map<String, String> properties) {
TopicName topicName = TopicName.get(topic);
- pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
- .thenAccept(isActive -> {
- if (isActive) {
- CompletableFuture<Map<String, String>>
propertiesFuture;
- if (properties == null) {
- //Read properties from storage when loading topic.
- propertiesFuture =
fetchTopicPropertiesAsync(topicName);
- } else {
- propertiesFuture =
CompletableFuture.completedFuture(properties);
- }
- propertiesFuture.thenAccept(finalProperties ->
- //TODO add topicName in properties?
- createPersistentTopic0(topic, createIfMissing,
topicFuture,
- finalProperties)
- ).exceptionally(throwable -> {
- log.warn("[{}] Read topic property failed", topic,
throwable);
- pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
- topicFuture.completeExceptionally(throwable);
- return null;
- });
- } else {
- // namespace is being unloaded
- String msg = String.format("Namespace is being
unloaded, cannot add topic %s", topic);
- log.warn(msg);
- pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
- topicFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg));
- }
- }).exceptionally(ex -> {
- pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
- topicFuture.completeExceptionally(ex);
- return null;
- });
+ checkTopicNsOwnership(topic).thenRun(() -> {
+ CompletableFuture<Map<String, String>> propertiesFuture;
+ if (properties == null) {
+ //Read properties from storage when loading topic.
+ propertiesFuture = fetchTopicPropertiesAsync(topicName);
+ } else {
+ propertiesFuture =
CompletableFuture.completedFuture(properties);
+ }
+ propertiesFuture.thenAccept(finalProperties ->
+ //TODO add topicName in properties?
+ createPersistentTopic0(topic, createIfMissing, topicFuture,
+ finalProperties)
+ ).exceptionally(throwable -> {
+ log.warn("[{}] Read topic property failed", topic, throwable);
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
+ topicFuture.completeExceptionally(throwable);
+ return null;
+ });
+ }).exceptionally(e -> {
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
+ topicFuture.completeExceptionally(e.getCause());
+ return null;
+ });
}
@VisibleForTesting
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 96ca2d90f06..37cf75d84ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -79,7 +79,6 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
@@ -162,7 +161,6 @@ public class PersistentDispatcherFailoverConsumerTest {
NamespaceService nsSvc =
pulsarTestContext.getPulsarService().getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
- doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 20f58f277a3..2f8a9246351 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -51,7 +51,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +102,6 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
- doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ca8f762adc4..3741c4093fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -224,8 +224,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
- doReturn(true).when(nsSvc).isServiceUnitActive(any());
-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4d734081e43..2cfbac35bfc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -231,8 +231,6 @@ public class ServerCnxTest {
.getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(),
any());
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
- doReturn(true).when(namespaceService).isServiceUnitActive(any());
-
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"),
CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
@@ -1601,8 +1599,8 @@ public class ServerCnxTest {
setChannelConnected();
// Force the case where the broker doesn't own any topic
-
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
- .isServiceUnitActiveAsync(any(TopicName.class));
+ doReturn(CompletableFuture.failedFuture(new
ServiceUnitNotReadyException("failed"))).when(brokerService)
+ .checkTopicNsOwnership(any(String.class));
// test PRODUCER failure case
ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /*
producer id */, 1 /* request id */,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index b7c323af5bc..3613ba51625 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -245,7 +245,7 @@ public class OrphanPersistentTopicTest extends
ProducerConsumerBase {
admin.topics().createNonPartitionedTopic(tpName);
admin.namespaces().unload(ns);
- // Inject an error when calling
"NamespaceService.isServiceUnitActiveAsync".
+ // Inject an error when loading the topic
AtomicInteger failedTimes = new AtomicInteger();
NamespaceService namespaceService = pulsar.getNamespaceService();
doAnswer(invocation -> {
@@ -258,7 +258,7 @@ public class OrphanPersistentTopicTest extends
ProducerConsumerBase {
return CompletableFuture.failedFuture(new
RuntimeException("mocked error"));
}
return invocation.callRealMethod();
-
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+ }).when(namespaceService).checkBundleOwnership(any(TopicName.class),
any());
// Verify: the consumer can create successfully eventually.
Consumer consumer =
pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
@@ -295,7 +295,7 @@ public class OrphanPersistentTopicTest extends
ProducerConsumerBase {
pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
}
return invocation.callRealMethod();
-
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+ }).when(namespaceService).checkBundleOwnership(any(TopicName.class),
any());
// Verify: the consumer create failed due to pulsar does not allow to
create topic automatically.
try {