This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new b672f277161 branch-4.1: [fix](auth)Fix arrow flight client ip auth
#63506 (#63592)
b672f277161 is described below
commit b672f277161b6a8ee63de9fba5d5e1e70579ade7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 25 19:16:44 2026 +0800
branch-4.1: [fix](auth)Fix arrow flight client ip auth #63506 (#63592)
Cherry-picked from #63506
Co-authored-by: Calvin Kirs <[email protected]>
---
.../service/arrowflight/DorisFlightSqlService.java | 6 ++
.../auth2/FlightCredentialValidator.java | 3 +-
.../auth2/FlightRemoteIpServerStreamTracer.java | 103 +++++++++++++++++++++
.../FlightRemoteIpServerStreamTracerTest.java | 92 ++++++++++++++++++
.../arrow_flight_sql_p0/test_auth_remote_ip.groovy | 78 ++++++++++++++++
5 files changed, 280 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
index 5c47941b291..22f9466e7db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
@@ -20,11 +20,13 @@ package org.apache.doris.service.arrowflight;
import org.apache.doris.common.Config;
import org.apache.doris.service.FrontendOptions;
import
org.apache.doris.service.arrowflight.auth2.FlightBearerTokenAuthenticator;
+import
org.apache.doris.service.arrowflight.auth2.FlightRemoteIpServerStreamTracer;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import
org.apache.doris.service.arrowflight.sessions.FlightSessionsWithTokenManager;
import org.apache.doris.service.arrowflight.tokens.FlightTokenManager;
import org.apache.doris.service.arrowflight.tokens.FlightTokenManagerImpl;
+import io.grpc.ServerBuilder;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
@@ -33,12 +35,14 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.util.function.Consumer;
/**
* flight sql protocol implementation based on nio.
*/
public class DorisFlightSqlService {
private static final Logger LOG =
LogManager.getLogger(DorisFlightSqlService.class);
+ private static final String GRPC_BUILDER_CONSUMER = "grpc.builderConsumer";
private final FlightServer flightServer;
private final FlightTokenManager flightTokenManager;
private final FlightSessionsManager flightSessionsManager;
@@ -56,6 +60,8 @@ public class DorisFlightSqlService {
DorisFlightSqlProducer producer = new DorisFlightSqlProducer(
Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port),
flightSessionsManager);
flightServer = FlightServer.builder(allocator,
Location.forGrpcInsecure("0.0.0.0", port), producer)
+ .transportHint(GRPC_BUILDER_CONSUMER,
(Consumer<ServerBuilder<?>>) builder ->
+ builder.addStreamTracerFactory(new
FlightRemoteIpServerStreamTracer.Factory()))
.headerAuthenticator(new
FlightBearerTokenAuthenticator(flightTokenManager)).build();
LOG.info("Arrow Flight SQL service is created, port: {},
arrow_flight_max_connections: {},"
+ "arrow_flight_token_alive_time_second: {}", port,
Config.arrow_flight_max_connections,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java
index 6676e8526ef..259d5083446 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java
@@ -48,8 +48,7 @@ public class FlightCredentialValidator implements
BasicCallHeaderAuthenticator.C
*/
@Override
public AuthResult validate(String username, String password) {
- // TODO Add ClientAddress information while creating a Token
- String remoteIp = "0.0.0.0";
+ String remoteIp = FlightRemoteIpServerStreamTracer.getRemoteIp();
FlightAuthResult flightAuthResult =
FlightAuthUtils.authenticateCredentials(username, password, remoteIp, LOG);
return getAuthResultWithBearerToken(flightAuthResult);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightRemoteIpServerStreamTracer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightRemoteIpServerStreamTracer.java
new file mode 100644
index 00000000000..5f5deee49bc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightRemoteIpServerStreamTracer.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.auth2;
+
+import io.grpc.Attributes;
+import io.grpc.Context;
+import io.grpc.Grpc;
+import io.grpc.Metadata;
+import io.grpc.ServerStreamTracer;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * Captures the gRPC peer address before Arrow Flight header authentication
runs.
+ * Arrow registers header authentication ahead of user interceptors, so use
ServerStreamTracer to
+ * seed the remote IP into the gRPC Context for Basic credential validation.
+ */
+public class FlightRemoteIpServerStreamTracer extends ServerStreamTracer {
+ static final String UNKNOWN_REMOTE_IP = "0.0.0.0";
+ private static final Context.Key<RemoteIpHolder> REMOTE_IP_CONTEXT_KEY =
+ Context.key("doris.arrow.flight.remote_ip");
+
+ @Override
+ public Context filterContext(Context context) {
+ return context.withValue(REMOTE_IP_CONTEXT_KEY, new RemoteIpHolder());
+ }
+
+ @Override
+ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
+ RemoteIpHolder holder = REMOTE_IP_CONTEXT_KEY.get();
+ if (holder == null) {
+ return;
+ }
+
+ Attributes attributes = callInfo.getAttributes();
+ SocketAddress remoteAddress = attributes == null ? null :
attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ holder.setRemoteIp(extractRemoteIp(remoteAddress));
+ }
+
+ public static String getRemoteIp() {
+ RemoteIpHolder holder = REMOTE_IP_CONTEXT_KEY.get();
+ if (holder == null) {
+ return UNKNOWN_REMOTE_IP;
+ }
+ return holder.getRemoteIp();
+ }
+
+ static String extractRemoteIp(SocketAddress remoteAddress) {
+ if (!(remoteAddress instanceof InetSocketAddress)) {
+ return UNKNOWN_REMOTE_IP;
+ }
+
+ InetSocketAddress inetSocketAddress = (InetSocketAddress)
remoteAddress;
+ InetAddress address = inetSocketAddress.getAddress();
+ if (address != null && isNotEmpty(address.getHostAddress())) {
+ return address.getHostAddress();
+ }
+ if (isNotEmpty(inetSocketAddress.getHostString())) {
+ return inetSocketAddress.getHostString();
+ }
+ return UNKNOWN_REMOTE_IP;
+ }
+
+ private static boolean isNotEmpty(String value) {
+ return value != null && !value.isEmpty();
+ }
+
+ public static class Factory extends ServerStreamTracer.Factory {
+ @Override
+ public ServerStreamTracer newServerStreamTracer(String fullMethodName,
Metadata headers) {
+ return new FlightRemoteIpServerStreamTracer();
+ }
+ }
+
+ private static class RemoteIpHolder {
+ private volatile String remoteIp = UNKNOWN_REMOTE_IP;
+
+ String getRemoteIp() {
+ return remoteIp;
+ }
+
+ void setRemoteIp(String remoteIp) {
+ this.remoteIp = isNotEmpty(remoteIp) ? remoteIp :
UNKNOWN_REMOTE_IP;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/service/arrowflight/auth2/FlightRemoteIpServerStreamTracerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/service/arrowflight/auth2/FlightRemoteIpServerStreamTracerTest.java
new file mode 100644
index 00000000000..250cd9f26ec
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/service/arrowflight/auth2/FlightRemoteIpServerStreamTracerTest.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.auth2;
+
+import io.grpc.Attributes;
+import io.grpc.Context;
+import io.grpc.Grpc;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerStreamTracer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class FlightRemoteIpServerStreamTracerTest {
+
+ @Test
+ public void testGetRemoteIpFromServerCallAttributes() {
+ FlightRemoteIpServerStreamTracer tracer = new
FlightRemoteIpServerStreamTracer();
+ Context context = tracer.filterContext(Context.current());
+ Context previous = context.attach();
+ try {
+ tracer.serverCallStarted(new TestServerCallInfo(new
InetSocketAddress("10.26.20.3", 12345)));
+
+ Assert.assertEquals("10.26.20.3",
FlightRemoteIpServerStreamTracer.getRemoteIp());
+ } finally {
+ context.detach(previous);
+ }
+ }
+
+ @Test
+ public void testFallbackRemoteIpWithoutServerCallAttributes() {
+ FlightRemoteIpServerStreamTracer tracer = new
FlightRemoteIpServerStreamTracer();
+ Context context = tracer.filterContext(Context.current());
+ Context previous = context.attach();
+ try {
+ tracer.serverCallStarted(new TestServerCallInfo(null));
+
+ Assert.assertEquals("0.0.0.0",
FlightRemoteIpServerStreamTracer.getRemoteIp());
+ } finally {
+ context.detach(previous);
+ }
+ }
+
+ @Test
+ public void testFallbackRemoteIpWithoutFlightContext() {
+ Assert.assertEquals("0.0.0.0",
FlightRemoteIpServerStreamTracer.getRemoteIp());
+ }
+
+ private static class TestServerCallInfo extends
ServerStreamTracer.ServerCallInfo<Object, Object> {
+ private final Attributes attributes;
+
+ TestServerCallInfo(SocketAddress remoteAddress) {
+ Attributes.Builder builder = Attributes.newBuilder();
+ if (remoteAddress != null) {
+ builder.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddress);
+ }
+ this.attributes = builder.build();
+ }
+
+ @Override
+ public MethodDescriptor<Object, Object> getMethodDescriptor() {
+ return null;
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public String getAuthority() {
+ return null;
+ }
+ }
+}
diff --git
a/regression-test/suites/arrow_flight_sql_p0/test_auth_remote_ip.groovy
b/regression-test/suites/arrow_flight_sql_p0/test_auth_remote_ip.groovy
new file mode 100644
index 00000000000..1b58a31fb63
--- /dev/null
+++ b/regression-test/suites/arrow_flight_sql_p0/test_auth_remote_ip.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+
+suite("test_auth_remote_ip", "arrow_flight_sql") {
+ String user = "flight_auth_remote_ip_user"
+ String password = "flight_auth_remote_ip_pwd"
+ String wrongPassword = "wrong_flight_auth_remote_ip_pwd"
+ List<String> remoteIpHosts = [
+ "127.%",
+ "10.%",
+ "172.%",
+ "192.168.%",
+ "::1",
+ "0:0:0:0:0:0:0:1"
+ ]
+ List<String> allHosts = ["0.0.0.0"] + remoteIpHosts
+
+ try {
+ allHosts.each { host ->
+ jdbc_sql """DROP USER IF EXISTS '${user}'@'${host}'"""
+ }
+
+ jdbc_sql """CREATE USER '${user}'@'0.0.0.0' IDENTIFIED BY
'${wrongPassword}'"""
+ String validComputeGroup = null
+ if (isCloudMode()) {
+ def computeGroups = sql """SHOW COMPUTE GROUPS"""
+ assertTrue(!computeGroups.isEmpty())
+ validComputeGroup = computeGroups[0][0]
+ }
+ remoteIpHosts.each { host ->
+ jdbc_sql """CREATE USER '${user}'@'${host}' IDENTIFIED BY
'${password}'"""
+ jdbc_sql """GRANT SELECT_PRIV ON *.* TO '${user}'@'${host}'"""
+ if (validComputeGroup != null) {
+ jdbc_sql """GRANT USAGE_PRIV ON COMPUTE GROUP
'${validComputeGroup}' TO '${user}'@'${host}'"""
+ }
+ }
+
+ Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver")
+ String arrowFlightSqlHost =
context.config.otherConfigs.get("extArrowFlightSqlHost")
+ String arrowFlightSqlPort =
context.config.otherConfigs.get("extArrowFlightSqlPort")
+ String arrowFlightSqlUrl =
"jdbc:arrow-flight-sql://${arrowFlightSqlHost}:${arrowFlightSqlPort}" +
+ "/?useServerPrepStmts=false&useSSL=false&useEncryption=false"
+
+ Connection conn = DriverManager.getConnection(arrowFlightSqlUrl, user,
password)
+ try {
+ List<List<Object>> result = sql_impl(conn, "SELECT 1")
+ assertEquals(1, result.size())
+ assertEquals(1, (result[0][0] as Number).intValue())
+ } finally {
+ conn.close()
+ }
+ } finally {
+ allHosts.each { host ->
+ try {
+ jdbc_sql """DROP USER IF EXISTS '${user}'@'${host}'"""
+ } catch (Throwable t) {
+ logger.warn("Failed to drop test user '${user}'@'${host}'", t)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]