This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e716658fbad [branch-2.1](arrow-flight-sql) Fix exceed user property
max connection cause Reach limit of connections (#39836)
e716658fbad is described below
commit e716658fbadf4bcbecc778acfd11553c8df8b803
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Aug 23 17:27:34 2024 +0800
[branch-2.1](arrow-flight-sql) Fix exceed user property max connection
cause Reach limit of connections (#39836)
pick #39127
pick #39802
---
.../service/arrowflight/DorisFlightSqlService.java | 4 ++
.../arrowflight/tokens/FlightTokenManagerImpl.java | 53 +++++++++++++++++++---
regression-test/pipeline/external/conf/fe.conf | 2 +
regression-test/pipeline/p1/conf/be.conf | 1 +
regression-test/pipeline/p1/conf/fe.conf | 2 +
5 files changed, 56 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
index 85377788097..df9099c6816 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
@@ -57,6 +57,10 @@ public class DorisFlightSqlService {
DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location,
flightSessionsManager);
flightServer = FlightServer.builder(allocator, location, producer)
.headerAuthenticator(new
FlightBearerTokenAuthenticator(flightTokenManager)).build();
+ LOG.info("Arrow Flight SQL service is created, port: {},
token_cache_size: {}"
+ + ", qe_max_connection: {}, token_alive_time: {}",
+ port, Config.arrow_flight_token_cache_size,
Config.qe_max_connection,
+ Config.arrow_flight_token_alive_time);
}
// start Arrow Flight SQL service, return true if success, otherwise false
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
index cd1b492de06..57101d995e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
@@ -19,6 +19,7 @@
package org.apache.doris.service.arrowflight.tokens;
+import org.apache.doris.catalog.Env;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
@@ -31,9 +32,12 @@ import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import java.math.BigInteger;
import java.security.SecureRandom;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -46,7 +50,9 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
private final int cacheSize;
private final int cacheExpiration;
- private LoadingCache<String, FlightTokenDetails> tokenCache;
+ private final LoadingCache<String, FlightTokenDetails> tokenCache;
+ // <username, <token, 1>>
+ private final ConcurrentHashMap<String, LoadingCache<String, Integer>>
usersTokenLRU = new ConcurrentHashMap<>();
public FlightTokenManagerImpl(final int cacheSize, final int
cacheExpiration) {
this.cacheSize = cacheSize;
@@ -56,17 +62,19 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
.expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
.removalListener(new RemovalListener<String,
FlightTokenDetails>() {
@Override
- public void onRemoval(RemovalNotification<String,
FlightTokenDetails> notification) {
+ public void onRemoval(@NotNull RemovalNotification<String,
FlightTokenDetails> notification) {
// TODO: broadcast this message to other FE
- LOG.info("evict bearer token: " +
notification.getKey() + ", reason: "
+ String token = notification.getKey();
+ FlightTokenDetails tokenDetails =
notification.getValue();
+ LOG.info("evict bearer token: " + token + ", reason:
token number exceeded, "
+ notification.getCause());
ConnectContext context =
ExecuteEnv.getInstance().getScheduler()
- .getContext(notification.getKey());
+ .getContext(token);
if (context != null) {
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
- LOG.info("unregister flight connect context after
evict bearer token: "
- + notification.getKey());
+ LOG.info("unregister flight connect context after
evict bearer token: " + token);
}
+
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
}
}).build(new CacheLoader<String, FlightTokenDetails>() {
@Override
@@ -96,6 +104,29 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
flightAuthResult.getUserIdentity(),
flightAuthResult.getRemoteIp());
tokenCache.put(token, flightTokenDetails);
+ if (!usersTokenLRU.containsKey(username)) {
+ // TODO Modify usersTokenLRU size when user property maxConn
changes. but LoadingCache currently not
+ // support modify.
+ usersTokenLRU.put(username,
+
CacheBuilder.newBuilder().maximumSize(Env.getCurrentEnv().getAuth().getMaxConn(username)
/ 2)
+ .removalListener(new RemovalListener<String,
Integer>() {
+ @Override
+ public void onRemoval(@NotNull
RemovalNotification<String, Integer> notification) {
+ // TODO: broadcast this message to other FE
+ assert notification.getKey() != null;
+
tokenCache.invalidate(notification.getKey());
+ LOG.info("evict bearer token: " +
notification.getKey()
+ + ", reason: user connection
exceeded, " + notification.getCause());
+ }
+ }).build(new CacheLoader<String, Integer>() {
+ @NotNull
+ @Override
+ public Integer load(@NotNull String key) {
+ return 1;
+ }
+ }));
+ }
+ usersTokenLRU.get(username).put(token, 1);
LOG.info("Created flight token for user: {}, token: {}", username,
token);
return flightTokenDetails;
}
@@ -114,6 +145,16 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
throw new IllegalArgumentException("bearer token expired: " +
token + ", try reconnect, "
+ "currently in fe.conf, `arrow_flight_token_alive_time`="
+ this.cacheExpiration);
}
+ if (usersTokenLRU.containsKey(value.getUsername())) {
+ try {
+ usersTokenLRU.get(value.getUsername()).get(token);
+ } catch (ExecutionException ignored) {
+ throw new IllegalArgumentException("usersTokenLRU not exist
bearer token: " + token);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "bearer token not created: " + token + ", username: " +
value.getUsername());
+ }
LOG.info("Validated bearer token for user: {}", value.getUsername());
return value;
}
diff --git a/regression-test/pipeline/external/conf/fe.conf
b/regression-test/pipeline/external/conf/fe.conf
index bdbc56564a1..e325acfbb1c 100644
--- a/regression-test/pipeline/external/conf/fe.conf
+++ b/regression-test/pipeline/external/conf/fe.conf
@@ -55,6 +55,8 @@ catalog_trash_expire_second=1
# priority_networks = 10.10.10.0/24;192.168.0.0/16
priority_networks=172.19.0.0/24
+arrow_flight_sql_port = 8081
+
# Advanced configurations
# log_roll_size_mb = 1024
# sys_log_dir = ${DORIS_HOME}/log
diff --git a/regression-test/pipeline/p1/conf/be.conf
b/regression-test/pipeline/p1/conf/be.conf
index 3b7c4530d40..225b0ad84d6 100644
--- a/regression-test/pipeline/p1/conf/be.conf
+++ b/regression-test/pipeline/p1/conf/be.conf
@@ -31,6 +31,7 @@ be_port = 9162
webserver_port = 8142
heartbeat_service_port = 9152
brpc_port = 8162
+arrow_flight_sql_port = 8181
mem_limit = 90%
disable_minidump = true
diff --git a/regression-test/pipeline/p1/conf/fe.conf
b/regression-test/pipeline/p1/conf/fe.conf
index eb8ab90af08..48d658d39ee 100644
--- a/regression-test/pipeline/p1/conf/fe.conf
+++ b/regression-test/pipeline/p1/conf/fe.conf
@@ -47,6 +47,8 @@
sys_log_verbose_modules=org.apache.doris.service.FrontendServiceImpl
# Default value is ${DORIS_HOME}/doris-meta
# meta_dir = ${DORIS_HOME}/doris-meta
+arrow_flight_sql_port = 8081
+
disable_decimalv2 = false
disable_datev1 = false
catalog_trash_expire_second=1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]