This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1050f48035c [improve][client/broker] Add DnsResolverGroup to share DNS
cache across multiple PulsarClient instances (#24784)
1050f48035c is described below
commit 1050f48035c97a14094b812467f046f6d37ff6f4
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Sep 26 01:04:28 2025 +0300
[improve][client/broker] Add DnsResolverGroup to share DNS cache across
multiple PulsarClient instances (#24784)
---
.../org/apache/pulsar/broker/PulsarService.java | 23 ++++--
.../pulsar/client/impl/ConnectionPoolTest.java | 4 +-
.../apache/pulsar/client/impl/ConnectionPool.java | 52 ++++++-------
.../pulsar/client/impl/DnsResolverGroupImpl.java | 86 ++++++++++++++++++++++
.../pulsar/client/impl/PulsarClientImpl.java | 51 ++++++++++---
.../pulsar/proxy/server/ProxyConnection.java | 2 +-
6 files changed, 171 insertions(+), 47 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5fd63502f8e..aa5c0b0ad2d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -143,6 +143,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.impl.DnsResolverGroupImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -266,6 +267,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private final ScheduledExecutorProvider
brokerClientSharedScheduledExecutorProvider;
private final Timer brokerClientSharedTimer;
private final ExecutorProvider brokerClientSharedLookupExecutorProvider;
+ private final DnsResolverGroupImpl brokerClientSharedDnsResolverGroup;
private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
@@ -398,6 +400,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
new HashedWheelTimer(new
DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
this.brokerClientSharedLookupExecutorProvider =
new ScheduledExecutorProvider(1,
"broker-client-shared-lookup-executor");
+ this.brokerClientSharedDnsResolverGroup =
+ new DnsResolverGroupImpl(this.ioEventLoopGroup,
+ loadBrokerClientProperties(new
ClientConfigurationData()));
// here in the constructor we don't have the offloader scheduler yet
this.offloaderStats = LedgerOffloaderStats.create(false, false, null,
0);
@@ -697,6 +702,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
brokerClientSharedLookupExecutorProvider.shutdownNow();
+ brokerClientSharedDnsResolverGroup.close();
brokerClientSharedTimer.stop();
if (monotonicClock instanceof AutoCloseable c) {
c.close();
@@ -1711,7 +1717,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
-
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider);
+
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
+ .dnsResolverGroup(brokerClientSharedDnsResolverGroup);
if (customizer != null) {
customizer.accept(pulsarClientImplBuilder);
}
@@ -1740,10 +1747,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
// Apply all arbitrary configuration. This must be called before
setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way
they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more
information.
- Map<String, Object> overrides = PropertiesUtils
-
.filterAndMapProperties(this.getConfiguration().getProperties(),
"brokerClient_");
- ClientConfigurationData conf =
- ConfigurationDataUtils.loadData(overrides, initialConf,
ClientConfigurationData.class);
+ ClientConfigurationData conf = loadBrokerClientProperties(initialConf);
// Disabled auto release useless connections
// The automatic release connection feature is not yet perfect for
transaction scenarios, so turn it
@@ -1789,6 +1793,15 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return conf;
}
+ // load plain brokerClient_ properties without complete initialization
+ private ClientConfigurationData
loadBrokerClientProperties(ClientConfigurationData initialConf) {
+ Map<String, Object> overrides = PropertiesUtils
+
.filterAndMapProperties(this.getConfiguration().getProperties(),
"brokerClient_");
+ ClientConfigurationData conf =
+ ConfigurationDataUtils.loadData(overrides, initialConf,
ClientConfigurationData.class);
+ return conf;
+ }
+
public synchronized PulsarAdmin getAdminClient() throws
PulsarServerException {
if (this.adminClient == null) {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 5cc04d63108..d739d93f1bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AbstractAddressResolver;
+import io.netty.resolver.AddressResolver;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
@@ -260,7 +261,8 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class,
InstrumentProvider.NOOP, conf, eventLoop,
(Supplier<ClientCnx>) () -> new
ClientCnx(InstrumentProvider.NOOP, conf, eventLoop),
- Optional.of(resolver), scheduledExecutorService);
+
Optional.<Supplier<AddressResolver<InetSocketAddress>>>of(() -> resolver),
+ scheduledExecutorService);
ClientCnx cnx = pool.getConnection(
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 59baccfbb99..f8412b5bf29 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -27,9 +27,6 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolver;
-import io.netty.resolver.dns.DnsAddressResolverGroup;
-import io.netty.resolver.dns.DnsNameResolverBuilder;
-import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import io.opentelemetry.api.common.Attributes;
@@ -51,6 +48,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -61,7 +60,6 @@ import
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +78,7 @@ public class ConnectionPool implements AutoCloseable {
private final boolean isSniProxy;
protected final AddressResolver<InetSocketAddress> addressResolver;
+ private DnsResolverGroupImpl dnsResolverGroup;
private final boolean shouldCloseDnsResolver;
@@ -106,8 +105,7 @@ public class ConnectionPool implements AutoCloseable {
public ConnectionPool(InstrumentProvider instrumentProvider,
ClientConfigurationData conf, EventLoopGroup
eventLoopGroup,
ScheduledExecutorService scheduledExecutorService)
throws PulsarClientException {
- this(instrumentProvider, conf, eventLoopGroup, () -> new
ClientCnx(instrumentProvider, conf, eventLoopGroup),
- scheduledExecutorService);
+ this(instrumentProvider, conf, eventLoopGroup, null,
scheduledExecutorService);
}
public ConnectionPool(InstrumentProvider instrumentProvider,
@@ -118,12 +116,16 @@ public class ConnectionPool implements AutoCloseable {
scheduledExecutorService);
}
- public ConnectionPool(InstrumentProvider instrumentProvider,
- ClientConfigurationData conf, EventLoopGroup
eventLoopGroup,
+ @Builder(builderClassName = "ConnectionPoolBuilder")
+ public ConnectionPool(@NonNull InstrumentProvider instrumentProvider,
+ @NonNull ClientConfigurationData conf, @NonNull
EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier,
- Optional<AddressResolver<InetSocketAddress>>
addressResolver,
+ @NonNull
Optional<Supplier<AddressResolver<InetSocketAddress>>> addressResolverSupplier,
ScheduledExecutorService scheduledExecutorService)
throws PulsarClientException {
+ if (clientCnxSupplier == null) {
+ clientCnxSupplier = () -> new ClientCnx(instrumentProvider, conf,
eventLoopGroup);
+ }
this.eventLoopGroup = eventLoopGroup;
this.clientConfig = conf;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -152,8 +154,9 @@ public class ConnectionPool implements AutoCloseable {
throw new PulsarClientException(e);
}
- this.shouldCloseDnsResolver = !addressResolver.isPresent();
- this.addressResolver = addressResolver.orElseGet(() ->
createAddressResolver(conf, eventLoopGroup));
+ this.shouldCloseDnsResolver = !addressResolverSupplier.isPresent();
+ this.addressResolver =
+ addressResolverSupplier.orElseGet(() ->
createAddressResolver(conf, eventLoopGroup)).get();
// Auto release useless connections. see:
https://github.com/apache/pulsar/issues/15516.
this.connectionMaxIdleSeconds = conf.getConnectionMaxIdleSeconds();
this.autoReleaseIdleConnectionsEnabled = connectionMaxIdleSeconds > 0;
@@ -185,26 +188,12 @@ public class ConnectionPool implements AutoCloseable {
Attributes.builder().put("pulsar.failure.type",
"handshake").build());
}
- private static AddressResolver<InetSocketAddress>
createAddressResolver(ClientConfigurationData conf,
-
EventLoopGroup eventLoopGroup) {
- DnsNameResolverBuilder dnsNameResolverBuilder = new
DnsNameResolverBuilder()
- .traceEnabled(true)
-
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup))
-
.socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup),
true);
- if (conf.getDnsLookupBindAddress() != null) {
- InetSocketAddress addr = new
InetSocketAddress(conf.getDnsLookupBindAddress(),
- conf.getDnsLookupBindPort());
- dnsNameResolverBuilder.localAddress(addr);
- }
- List<InetSocketAddress> serverAddresses = conf.getDnsServerAddresses();
- if (serverAddresses != null && !serverAddresses.isEmpty()) {
- dnsNameResolverBuilder.nameServerProvider(new
SequentialDnsServerAddressStreamProvider(serverAddresses));
+ private Supplier<AddressResolver<InetSocketAddress>>
createAddressResolver(ClientConfigurationData conf,
+
EventLoopGroup eventLoopGroup) {
+ if (dnsResolverGroup == null) {
+ dnsResolverGroup = new DnsResolverGroupImpl(eventLoopGroup, conf);
}
- DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
- // use DnsAddressResolverGroup to create the AddressResolver since it
contains a solution
- // to prevent cache stampede / thundering herds problem when a DNS
entry expires while the system
- // is under high load
- return new
DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next());
+ return () -> dnsResolverGroup.createAddressResolver(eventLoopGroup);
}
private static final Random random = new Random();
@@ -479,6 +468,9 @@ public class ConnectionPool implements AutoCloseable {
if (shouldCloseDnsResolver) {
addressResolver.close();
}
+ if (dnsResolverGroup != null) {
+ dnsResolverGroup.close();
+ }
if (asyncReleaseUselessConnectionsTask != null &&
!asyncReleaseUselessConnectionsTask.isCancelled()) {
asyncReleaseUselessConnectionsTask.cancel(false);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
new file mode 100644
index 00000000000..61af7968f81
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.resolver.dns.DnsServerAddressStreamProvider;
+import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+/**
+ * An abstraction to manage a group of Netty {@link AddressResolver} instances.
+ * Uses {@link io.netty.resolver.dns.DnsAddressResolverGroup} to create the
{@link AddressResolver} instance
+ * since it contains a shared DNS cache and a solution to prevent cache
stampede / thundering herds problem
+ * when a DNS entry expires while the system is under high load.
+ */
+public class DnsResolverGroupImpl implements AutoCloseable {
+ private final DnsAddressResolverGroup dnsAddressResolverGroup;
+
+ public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup,
ClientConfigurationData conf) {
+ Optional<InetSocketAddress> bindAddress =
Optional.ofNullable(conf.getDnsLookupBindAddress())
+ .map(addr -> new InetSocketAddress(addr,
conf.getDnsLookupBindPort()));
+ Optional<DnsServerAddressStreamProvider> dnsServerAddresses =
Optional.ofNullable(conf.getDnsServerAddresses())
+ .filter(Predicate.not(List::isEmpty))
+ .map(SequentialDnsServerAddressStreamProvider::new);
+ this.dnsAddressResolverGroup =
createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+ }
+
+ public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup,
Optional<InetSocketAddress> bindAddress,
+ Optional<DnsServerAddressStreamProvider>
dnsServerAddresses) {
+ this.dnsAddressResolverGroup =
createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+ }
+
+ private static DnsAddressResolverGroup
createAddressResolverGroup(EventLoopGroup eventLoopGroup,
+
Optional<InetSocketAddress> bindAddress,
+
Optional<DnsServerAddressStreamProvider>
+
dnsServerAddresses) {
+ DnsNameResolverBuilder dnsNameResolverBuilder =
createDnsNameResolverBuilder(eventLoopGroup);
+ bindAddress.ifPresent(dnsNameResolverBuilder::localAddress);
+
dnsServerAddresses.ifPresent(dnsNameResolverBuilder::nameServerProvider);
+
+ return new DnsAddressResolverGroup(dnsNameResolverBuilder);
+ }
+
+ private static DnsNameResolverBuilder
createDnsNameResolverBuilder(EventLoopGroup eventLoopGroup) {
+ DnsNameResolverBuilder dnsNameResolverBuilder = new
DnsNameResolverBuilder()
+ .traceEnabled(true)
+
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup))
+
.socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup),
true);
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+ return dnsNameResolverBuilder;
+ }
+
+ @Override
+ public void close() {
+ this.dnsAddressResolverGroup.close();
+ }
+
+ public AddressResolver<InetSocketAddress>
createAddressResolver(EventLoopGroup eventLoopGroup) {
+ return dnsAddressResolverGroup.getResolver(eventLoopGroup.next());
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 01b27f491e1..d38d35926a4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -24,6 +24,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.AddressResolver;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.io.IOException;
@@ -129,6 +130,8 @@ public class PulsarClientImpl implements PulsarClient {
private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
private final boolean createdCnxPool;
+ private final DnsResolverGroupImpl dnsResolverGroupLocalInstance;
+ private final AddressResolver<InetSocketAddress> addressResolver;
public enum State {
Open, Closing, Closed
@@ -168,22 +171,22 @@ public class PulsarClientImpl implements PulsarClient {
private TransactionCoordinatorClientImpl tcClient;
public PulsarClientImpl(ClientConfigurationData conf) throws
PulsarClientException {
- this(conf, null, null, null, null, null, null, null);
+ this(conf, null, null, null, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
- this(conf, eventLoopGroup, null, null, null, null, null, null);
+ this(conf, eventLoopGroup, null, null, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
+ this(conf, eventLoopGroup, cnxPool, null, null, null, null, null,
null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
+ this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null,
null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool connectionPool,
@@ -192,7 +195,7 @@ public class PulsarClientImpl implements PulsarClient {
ScheduledExecutorProvider
scheduledExecutorProvider)
throws PulsarClientException {
this(conf, eventLoopGroup, connectionPool, timer,
externalExecutorProvider, internalExecutorProvider,
- scheduledExecutorProvider, null);
+ scheduledExecutorProvider, null, null);
}
@Builder(builderClassName = "PulsarClientImplBuilder")
@@ -200,7 +203,8 @@ public class PulsarClientImpl implements PulsarClient {
Timer timer, ExecutorProvider
externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider
scheduledExecutorProvider,
- ExecutorProvider lookupExecutorProvider) throws
PulsarClientException {
+ ExecutorProvider lookupExecutorProvider,
+ DnsResolverGroupImpl dnsResolverGroup) throws
PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
@@ -225,10 +229,29 @@ public class PulsarClientImpl implements PulsarClient {
conf.getAuthentication().start();
this.scheduledExecutorProvider = scheduledExecutorProvider != null
? scheduledExecutorProvider :
new ScheduledExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-scheduled");
- connectionPoolReference =
- connectionPool != null ? connectionPool :
- new ConnectionPool(instrumentProvider, conf,
this.eventLoopGroup,
- (ScheduledExecutorService)
this.scheduledExecutorProvider.getExecutor());
+ if (connectionPool != null) {
+ connectionPoolReference = connectionPool;
+ dnsResolverGroupLocalInstance = null;
+ addressResolver = null;
+ } else {
+ DnsResolverGroupImpl dnsResolverGroupReference;
+ if (dnsResolverGroup == null) {
+ dnsResolverGroupReference =
+ dnsResolverGroupLocalInstance = new
DnsResolverGroupImpl(eventLoopGroupReference, conf);
+ } else {
+ dnsResolverGroupReference = dnsResolverGroup;
+ dnsResolverGroupLocalInstance = null;
+ }
+ addressResolver =
dnsResolverGroupReference.createAddressResolver(eventLoopGroupReference);
+ connectionPoolReference = ConnectionPool.builder()
+ .instrumentProvider(instrumentProvider)
+ .conf(conf)
+ .eventLoopGroup(eventLoopGroupReference)
+ .addressResolverSupplier(Optional.of(() ->
addressResolver))
+ .scheduledExecutorService(
+ (ScheduledExecutorService)
this.scheduledExecutorProvider.getExecutor())
+ .build();
+ }
this.cnxPool = connectionPoolReference;
this.externalExecutorProvider = externalExecutorProvider != null ?
externalExecutorProvider :
new ExecutorProvider(conf.getNumListenerThreads(),
"pulsar-external-listener");
@@ -919,6 +942,14 @@ public class PulsarClientImpl implements PulsarClient {
conf.getServiceUrlProvider().close();
}
+ if (addressResolver != null) {
+ addressResolver.close();
+ }
+
+ if (dnsResolverGroupLocalInstance != null) {
+ dnsResolverGroupLocalInstance.close();
+ }
+
try {
// Shutting down eventLoopGroup separately because in some
cases, cnxPool might be using different
// eventLoopGroup.
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 372d45ffe60..e479b8ee622 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -404,7 +404,7 @@ public class ProxyConnection extends PulsarHandler {
if (this.connectionPool == null) {
this.connectionPool = new
ConnectionPool(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(),
clientCnxSupplier,
-
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())),
null);
+ Optional.of(() ->
dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null);
} else {
LOG.error("BUG! Connection Pool has already been created for
proxy connection to {} state {} role {}",
remoteAddress, state, maybeAnonymizedClientAuthRole);