This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8fd963831f1 [fix][broker] Fix cannot shutdown broker gracefully by
admin api (#24731)
8fd963831f1 is described below
commit 8fd963831f14d733ac00aa89f18688acb5563735
Author: Yike Xiao <[email protected]>
AuthorDate: Fri Sep 12 16:12:37 2025 +0800
[fix][broker] Fix cannot shutdown broker gracefully by admin api (#24731)
(cherry picked from commit 4169395d1aea6d2a12101ae4ef2cfcca2c6cc48c)
---
.../org/apache/pulsar/broker/PulsarService.java | 16 ++++++-
.../pulsar/broker/admin/impl/BrokersBase.java | 2 +-
.../org/apache/pulsar/broker/web/WebService.java | 50 +++++++++++++++-------
.../apache/pulsar/broker/PulsarServiceTest.java | 27 ++++++++++++
4 files changed, 78 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index dd726ee9a56..9bf51c7e7cc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -430,8 +430,22 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
/**
* Close the current pulsar service. All resources are released.
+ * <p>
+ * This method is equivalent with {@code closeAsync(true)}.
+ *
+ * @see PulsarService#closeAsync(boolean)
*/
public CompletableFuture<Void> closeAsync() {
+ return closeAsync(true);
+ }
+
+ /**
+ * Close the current pulsar service.
+ *
+ * @param waitForWebServiceToStop if true, waits for the web service to
stop before returning from this method.
+ * @return a future which will be completed when the service is fully
closed.
+ */
+ public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop)
{
mutex.lock();
try {
if (closeFuture != null) {
@@ -468,7 +482,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (this.webService != null) {
try {
- this.webService.close();
+ this.webService.close(waitForWebServiceToStop);
this.webService = null;
} catch (Exception e) {
LOG.error("Web service closing failed", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3be502fd764..c9207adbd44 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -447,7 +447,7 @@ public class BrokersBase extends AdminResource {
private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int
maxConcurrentUnloadPerSec,
boolean
forcedTerminateTopic) {
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic);
- return pulsar().closeAsync();
+ return pulsar().closeAsync(false);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 89c12c6771e..a16dbfea6bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -411,28 +411,48 @@ public class WebService implements AutoCloseable {
@Override
public void close() throws PulsarServerException {
+ close(true);
+ }
+
+ public void close(boolean waitUtilServerStopped) throws
PulsarServerException {
try {
- server.stop();
- // unregister statistics from Prometheus client's default
CollectorRegistry singleton
- // to prevent memory leaks in tests
- if (jettyStatisticsCollector != null) {
- try {
-
CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector);
- } catch (Exception e) {
- // ignore any exception happening in unregister
- // exception will be thrown for 2. instance of WebService
in tests since
- // the register supports a single JettyStatisticsCollector
- }
- jettyStatisticsCollector = null;
+ if (waitUtilServerStopped) {
+ doClose();
+ } else {
+ Thread webServiceTerminator = new Thread(() -> {
+ try {
+ doClose();
+ } catch (Exception e) {
+ log.error("Error while closing web service", e);
+ }
+ });
+ webServiceTerminator.setName("pulsar-web-service-terminator");
+ webServiceTerminator.start();
}
- webServiceExecutor.join();
- this.executorStats.close();
- log.info("Web service closed");
} catch (Exception e) {
throw new PulsarServerException(e);
}
}
+ private void doClose() throws Exception {
+ server.stop();
+ // unregister statistics from Prometheus client's default
CollectorRegistry singleton
+ // to prevent memory leaks in tests
+ if (jettyStatisticsCollector != null) {
+ try {
+
CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector);
+ } catch (Exception e) {
+ // ignore any exception happening in unregister
+ // exception will be thrown for 2. instance of WebService in
tests since
+ // the register supports a single JettyStatisticsCollector
+ }
+ jettyStatisticsCollector = null;
+ }
+ webServiceExecutor.join();
+ this.executorStats.close();
+ log.info("Web service closed");
+ }
+
public Optional<Integer> getListenPortHTTP() {
if (httpConnector != null) {
return Optional.of(httpConnector.getLocalPort());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index a515890dd30..bf7604eadf5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -23,12 +23,16 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
@@ -262,4 +266,27 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
assertFalse(e.getCause() instanceof IllegalArgumentException);
}
}
+
+ @Test
+ public void testShutdownViaAdminApi() throws Exception {
+ super.internalSetup();
+ super.setupDefaultTenantAndNamespace();
+ String topic = "persistent://public/default/testShutdownViaAdminApi";
+ admin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .create();
+ producer.send("message 1".getBytes());
+ admin.brokers()
+ .shutDownBrokerGracefully(0, false)
+ .get(30, TimeUnit.SECONDS);
+ try {
+ producer.send("message 2".getBytes());
+ fail("sending msg should timeout, because broker is down and there
is only one broker");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarClientException.TimeoutException);
+ }
+ }
}