This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new efa43007ad Add configs to specify keepAlive and shutdownTimeout for
GrpcQueryClient. (#13546)
efa43007ad is described below
commit efa43007adc1dd7736580d882f33956e359b0678
Author: Mayank Shrivastava <[email protected]>
AuthorDate: Mon Jul 8 21:27:14 2024 -0700
Add configs to specify keepAlive and shutdownTimeout for GrpcQueryClient.
(#13546)
- Added `channelShutdownTimeoutSecond` config for `GrpcQueryClient` with
default of 10s.
- Added configs for keep-alive:
- `channelKeepAliveEnabled` to enable/disable the feature, default false.
- `channelKeepAliveTimeiSeconds` to configures the interval for sending
keep-alive pings,
default 300 seconds (5 minutes).
- `channelKeepAliveTimeoutSeconds` configures the timeout for waiting for
a ping acknowledgment,
default 300 seconds (5 minutes).
- `channelKeepAliveWithoutCalls` ensures pings are sent even when there
are no active calls,
keeping the connection alive during idle period, default true.
---
.../org/apache/pinot/common/config/GrpcConfig.java | 31 ++++++++++++++++++++++
.../pinot/common/utils/grpc/GrpcQueryClient.java | 25 ++++++++++++-----
2 files changed, 50 insertions(+), 6 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
index 68a6b790a3..ed2d32a736 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -28,6 +28,20 @@ public class GrpcConfig {
public static final String GRPC_TLS_PREFIX = "tls";
public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
public static final String CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE =
"maxInboundMessageSizeBytes";
+
+ private static final String CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS =
"channelShutdownTimeoutSeconds";
+ private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS = 10;
+
+ // KeepAlive configs
+ private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS =
"channelKeepAliveTimeSeconds";
+ private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS = -1; //
Set value > 0 to enable keep alive
+
+ private static final String CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS =
"channelKeepAliveTimeoutSeconds";
+ private static final int DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 20; //
20 seconds
+
+ private static final String CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS =
"channelKeepAliveWithoutCalls";
+ private static final boolean DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS = true;
+
// Default max message size to 128MB
public static final int DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE = 128 * 1024
* 1024;
// Default use plain text for transport
@@ -69,6 +83,23 @@ public class GrpcConfig {
return
Boolean.parseBoolean(_pinotConfig.getProperty(CONFIG_USE_PLAIN_TEXT,
DEFAULT_IS_USE_PLAIN_TEXT));
}
+ public int getChannelShutdownTimeoutSecond() {
+ return _pinotConfig.getProperty(CONFIG_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS,
DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECONDS);
+ }
+
+ public int getChannelKeepAliveTimeSeconds() {
+ return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIME_SECONDS,
DEFAULT_CHANNEL_KEEP_ALIVE_TIME_SECONDS);
+ }
+
+ public int getChannelKeepAliveTimeoutSeconds() {
+ return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS,
+ DEFAULT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS);
+ }
+
+ public boolean isChannelKeepAliveWithoutCalls() {
+ return _pinotConfig.getProperty(CONFIG_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS,
DEFAULT_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS);
+ }
+
public TlsConfig getTlsConfig() {
return _tlsConfig;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index a41a30c5d4..9987876b95 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class GrpcQueryClient implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcQueryClient.class);
- private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10;
+
// the key is the hashCode of the TlsConfig, the value is the SslContext
// We don't use TlsConfig as the map key because the TlsConfig is mutable,
which means the hashCode can change. If the
// hashCode changes and the map is resized, the SslContext of the old
hashCode will be lost.
@@ -53,22 +53,35 @@ public class GrpcQueryClient implements Closeable {
private final ManagedChannel _managedChannel;
private final PinotQueryServerGrpc.PinotQueryServerBlockingStub
_blockingStub;
+ private final int _channelShutdownTimeoutSeconds;
public GrpcQueryClient(String host, int port) {
this(host, port, new GrpcConfig(Collections.emptyMap()));
}
public GrpcQueryClient(String host, int port, GrpcConfig config) {
+ ManagedChannelBuilder<?> channelBuilder;
if (config.isUsePlainText()) {
- _managedChannel =
+ channelBuilder =
ManagedChannelBuilder.forAddress(host,
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
- .usePlaintext().build();
+ .usePlaintext();
} else {
- _managedChannel =
+ channelBuilder =
NettyChannelBuilder.forAddress(host,
port).maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
- .sslContext(buildSslContext(config.getTlsConfig())).build();
+ .sslContext(buildSslContext(config.getTlsConfig()));
+ }
+
+ // Set keep alive configs, if enabled
+ int channelKeepAliveTimeSeconds = config.getChannelKeepAliveTimeSeconds();
+ if (channelKeepAliveTimeSeconds > 0) {
+ channelBuilder.keepAliveTime(channelKeepAliveTimeSeconds,
TimeUnit.SECONDS)
+ .keepAliveTimeout(config.getChannelKeepAliveTimeoutSeconds(),
TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(config.isChannelKeepAliveWithoutCalls());
}
+
+ _managedChannel = channelBuilder.build();
_blockingStub = PinotQueryServerGrpc.newBlockingStub(_managedChannel);
+ _channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
}
private SslContext buildSslContext(TlsConfig tlsConfig) {
@@ -103,7 +116,7 @@ public class GrpcQueryClient implements Closeable {
if (!_managedChannel.isShutdown()) {
try {
_managedChannel.shutdownNow();
- if
(!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND,
TimeUnit.SECONDS)) {
+ if (!_managedChannel.awaitTermination(_channelShutdownTimeoutSeconds,
TimeUnit.SECONDS)) {
LOGGER.warn("Timed out forcefully shutting down connection: {}. ",
_managedChannel);
}
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]