This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 37effb80508 [fix][broker] Fix cannot shutdown broker gracefully by
admin api (#24731)
37effb80508 is described below
commit 37effb80508591bc0d9356e4874198a797a30163
Author: Yike Xiao <[email protected]>
AuthorDate: Fri Sep 12 16:12:37 2025 +0800
[fix][broker] Fix cannot shutdown broker gracefully by admin api (#24731)
(cherry picked from commit 4169395d1aea6d2a12101ae4ef2cfcca2c6cc48c)
---
.../org/apache/pulsar/broker/PulsarService.java | 16 ++++++-
.../pulsar/broker/admin/impl/BrokersBase.java | 2 +-
.../org/apache/pulsar/broker/web/WebService.java | 50 +++++++++++++++-------
.../apache/pulsar/broker/PulsarServiceTest.java | 27 ++++++++++++
4 files changed, 78 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aac0ae67f6a..7f3bbad579b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -460,8 +460,22 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
/**
* Close the current pulsar service. All resources are released.
+ * <p>
+ * This method is equivalent with {@code closeAsync(true)}.
+ *
+ * @see PulsarService#closeAsync(boolean)
*/
public CompletableFuture<Void> closeAsync() {
+ return closeAsync(true);
+ }
+
+ /**
+ * Close the current pulsar service.
+ *
+ * @param waitForWebServiceToStop if true, waits for the web service to
stop before returning from this method.
+ * @return a future which will be completed when the service is fully
closed.
+ */
+ public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop)
{
mutex.lock();
try {
// Close protocol handler before unloading namespace bundles
because protocol handlers might maintain
@@ -508,7 +522,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (this.webService != null) {
try {
- this.webService.close();
+ this.webService.close(waitForWebServiceToStop);
this.webService = null;
} catch (Exception e) {
LOG.error("Web service closing failed", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 17bbead5771..1e4e4ff66dd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -473,7 +473,7 @@ public class BrokersBase extends AdminResource {
private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int
maxConcurrentUnloadPerSec,
boolean
forcedTerminateTopic) {
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic);
- return pulsar().closeAsync();
+ return pulsar().closeAsync(false);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 6118a850115..4b1fb93f780 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -423,28 +423,48 @@ public class WebService implements AutoCloseable {
@Override
public void close() throws PulsarServerException {
+ close(true);
+ }
+
+ public void close(boolean waitUtilServerStopped) throws
PulsarServerException {
try {
- server.stop();
- // unregister statistics from Prometheus client's default
CollectorRegistry singleton
- // to prevent memory leaks in tests
- if (jettyStatisticsCollector != null) {
- try {
-
CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector);
- } catch (Exception e) {
- // ignore any exception happening in unregister
- // exception will be thrown for 2. instance of WebService
in tests since
- // the register supports a single JettyStatisticsCollector
- }
- jettyStatisticsCollector = null;
+ if (waitUtilServerStopped) {
+ doClose();
+ } else {
+ Thread webServiceTerminator = new Thread(() -> {
+ try {
+ doClose();
+ } catch (Exception e) {
+ log.error("Error while closing web service", e);
+ }
+ });
+ webServiceTerminator.setName("pulsar-web-service-terminator");
+ webServiceTerminator.start();
}
- webServiceExecutor.join();
- this.executorStats.close();
- log.info("Web service closed");
} catch (Exception e) {
throw new PulsarServerException(e);
}
}
+ private void doClose() throws Exception {
+ server.stop();
+ // unregister statistics from Prometheus client's default
CollectorRegistry singleton
+ // to prevent memory leaks in tests
+ if (jettyStatisticsCollector != null) {
+ try {
+
CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector);
+ } catch (Exception e) {
+ // ignore any exception happening in unregister
+ // exception will be thrown for 2. instance of WebService in
tests since
+ // the register supports a single JettyStatisticsCollector
+ }
+ jettyStatisticsCollector = null;
+ }
+ webServiceExecutor.join();
+ this.executorStats.close();
+ log.info("Web service closed");
+ }
+
public Optional<Integer> getListenPortHTTP() {
if (httpConnector != null) {
return Optional.of(httpConnector.getLocalPort());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 3bbf423da6e..f6d4235958c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -23,14 +23,18 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -305,4 +309,27 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
pulsarService.close();
}
}
+
+ @Test
+ public void testShutdownViaAdminApi() throws Exception {
+ super.internalSetup();
+ super.setupDefaultTenantAndNamespace();
+ String topic = "persistent://public/default/testShutdownViaAdminApi";
+ admin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .create();
+ producer.send("message 1".getBytes());
+ admin.brokers()
+ .shutDownBrokerGracefully(0, false)
+ .get(30, TimeUnit.SECONDS);
+ try {
+ producer.send("message 2".getBytes());
+ fail("sending msg should timeout, because broker is down and there
is only one broker");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarClientException.TimeoutException);
+ }
+ }
}