This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b8b7d6c8 add thrfit max messahe size (#593)
b8b7d6c8 is described below

commit b8b7d6c83095e030801dafd915cd290e0ff873d6
Author: wudi <[email protected]>
AuthorDate: Fri May 9 10:07:17 2025 +0800

    add thrfit max messahe size (#593)
---
 .../apache/doris/flink/backend/BackendClient.java  | 14 +++++++--
 .../doris/flink/cfg/ConfigurationOptions.java      |  2 ++
 .../apache/doris/flink/cfg/DorisReadOptions.java   | 35 ++++++++++++++++++----
 .../doris/flink/table/DorisConfigOptions.java      |  9 ++++++
 .../flink/table/DorisDynamicTableFactory.java      |  4 +++
 .../doris/flink/source/DorisSourceITCase.java      |  1 +
 .../flink/table/DorisDynamicTableFactoryTest.java  |  5 +++-
 7 files changed, 60 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index be976eff..c97ed202 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -54,6 +54,7 @@ public class BackendClient {
     private final int retries;
     private final int socketTimeout;
     private final int connectTimeout;
+    private final int thriftMaxMessageSize;
 
     public BackendClient(Routing routing, DorisReadOptions readOptions) {
         this.routing = routing;
@@ -69,11 +70,16 @@ public class BackendClient {
                 readOptions.getRequestRetries() == null
                         ? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
                         : readOptions.getRequestRetries();
+        this.thriftMaxMessageSize =
+                readOptions.getRequestRetries() == null
+                        ? 
ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT
+                        : readOptions.getThriftMaxMessageSize();
         logger.trace(
-                "connect timeout set to '{}'. socket timeout set to '{}'. 
retries set to '{}'.",
+                "connect timeout set to '{}'. socket timeout set to '{}'. 
retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
                 this.connectTimeout,
                 this.socketTimeout,
-                this.retries);
+                this.retries,
+                this.thriftMaxMessageSize);
         open();
     }
 
@@ -84,9 +90,11 @@ public class BackendClient {
             logger.debug("Attempt {} to connect {}.", attempt, routing);
             try {
                 TBinaryProtocol.Factory factory = new 
TBinaryProtocol.Factory();
+                TConfiguration.Builder configBuilder = TConfiguration.custom();
+                configBuilder.setMaxMessageSize(thriftMaxMessageSize);
                 transport =
                         new TSocket(
-                                new TConfiguration(),
+                                configBuilder.build(),
                                 routing.getHost(),
                                 routing.getPort(),
                                 socketTimeout,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index 930bc273..bfca9b05 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -52,6 +52,8 @@ public interface ConfigurationOptions {
     Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
     String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
     Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+    String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
+    Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;
 
     String USE_FLIGHT_SQL = "source.use-flight-sql";
     Boolean USE_FLIGHT_SQL_DEFAULT = true;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index ea69a9b5..2b50f253 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -41,6 +41,7 @@ public class DorisReadOptions implements Serializable {
     private Integer flightSqlPort;
     // for flink sql limit push down
     private Long rowLimit;
+    private Integer thriftMaxMessageSize;
 
     public DorisReadOptions(
             String readFields,
@@ -57,7 +58,8 @@ public class DorisReadOptions implements Serializable {
             boolean useOldApi,
             boolean useFlightSql,
             Integer flightSqlPort,
-            Long rowLimit) {
+            Long rowLimit,
+            Integer thriftMaxMessageSize) {
         this.readFields = readFields;
         this.filterQuery = filterQuery;
         this.requestTabletSize = requestTabletSize;
@@ -73,6 +75,7 @@ public class DorisReadOptions implements Serializable {
         this.useFlightSql = useFlightSql;
         this.flightSqlPort = flightSqlPort;
         this.rowLimit = rowLimit;
+        this.thriftMaxMessageSize = thriftMaxMessageSize;
     }
 
     public String getReadFields() {
@@ -151,6 +154,10 @@ public class DorisReadOptions implements Serializable {
         this.rowLimit = rowLimit;
     }
 
+    public Integer getThriftMaxMessageSize() {
+        return thriftMaxMessageSize;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -182,7 +189,8 @@ public class DorisReadOptions implements Serializable {
                 && Objects.equals(deserializeArrowAsync, 
that.deserializeArrowAsync)
                 && Objects.equals(useFlightSql, that.useFlightSql)
                 && Objects.equals(flightSqlPort, that.flightSqlPort)
-                && Objects.equals(rowLimit, that.rowLimit);
+                && Objects.equals(rowLimit, that.rowLimit)
+                && Objects.equals(thriftMaxMessageSize, 
that.thriftMaxMessageSize);
     }
 
     @Override
@@ -202,7 +210,8 @@ public class DorisReadOptions implements Serializable {
                 useOldApi,
                 useFlightSql,
                 flightSqlPort,
-                rowLimit);
+                rowLimit,
+                thriftMaxMessageSize);
     }
 
     public DorisReadOptions copy() {
@@ -221,7 +230,8 @@ public class DorisReadOptions implements Serializable {
                 useOldApi,
                 useFlightSql,
                 flightSqlPort,
-                rowLimit);
+                rowLimit,
+                thriftMaxMessageSize);
     }
 
     /** Builder of {@link DorisReadOptions}. */
@@ -247,7 +257,8 @@ public class DorisReadOptions implements Serializable {
         private Boolean useFlightSql = 
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
         private Integer flightSqlPort = 
ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
         private Long rowLimit;
-
+        private Integer thriftMaxMessageSize =
+                ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
         /**
          * Sets the readFields for doris table to push down projection, such 
as name,age.
          *
@@ -406,6 +417,17 @@ public class DorisReadOptions implements Serializable {
             return this;
         }
 
+        /**
+         * Sets the thriftMaxMessageSize for DorisReadOptions.
+         *
+         * @param thriftMaxMessageSize
+         * @return
+         */
+        public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) {
+            this.thriftMaxMessageSize = thriftMaxMessageSize;
+            return this;
+        }
+
         /**
          * Build the {@link DorisReadOptions}.
          *
@@ -427,7 +449,8 @@ public class DorisReadOptions implements Serializable {
                     useOldApi,
                     useFlightSql,
                     flightSqlPort,
-                    rowLimit);
+                    rowLimit,
+                    thriftMaxMessageSize);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index ffd3ec92..56f96153 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -38,6 +38,7 @@ import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUER
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
 
 /** Options for the Doris connector. */
 @PublicEvolving
@@ -151,6 +152,14 @@ public class DorisConfigOptions {
                     .memoryType()
                     
.defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
                     .withDescription("Memory limit for a single query. The 
default is 8192mb.");
+
+    public static final ConfigOption<Integer> DORIS_THRIFT_MAX_MESSAGE_SIZE =
+            ConfigOptions.key("doris.thrift.max.message.size")
+                    .intType()
+                    .defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT)
+                    .withDescription(
+                            "The maximum message size for thrift protocol. The 
default is Integer.MAX_VALUE.");
+
     public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
             ConfigOptions.key("source.use-old-api")
                     .booleanType()
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 2559e1f0..6495eadc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -51,6 +51,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUER
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
 import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT;
 import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
@@ -130,6 +131,8 @@ public final class DorisDynamicTableFactory
         options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
         options.add(DORIS_BATCH_SIZE);
         options.add(DORIS_EXEC_MEM_LIMIT);
+        options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE);
+
         options.add(LOOKUP_CACHE_MAX_ROWS);
         options.add(LOOKUP_CACHE_TTL);
         options.add(LOOKUP_MAX_RETRIES);
@@ -210,6 +213,7 @@ public final class DorisDynamicTableFactory
         
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
                 
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
                 
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
+                
.setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE))
                 .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
                 .setReadFields(readableConfig.get(DORIS_READ_FIELD))
                 .setRequestQueryTimeoutS(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 73f6e03a..31a94598 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -291,6 +291,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'doris.batch.size' = '1024',"
                                 + " 'doris.exec.mem.limit' = '2048mb',"
                                 + " 'doris.deserialize.arrow.async' = 'true',"
+                                + " 'doris.thrift.max.message.size' = 
'10485760',"
                                 + " 'doris.deserialize.queue.size' = '32',"
                                 + " 'source.use-flight-sql' = '%s'"
                                 + ")",
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 7f6213f7..a3c21097 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -40,6 +40,7 @@ import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUER
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
 import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA;
@@ -59,6 +60,7 @@ public class DorisDynamicTableFactoryTest {
         properties.put("doris.exec.mem.limit", "8192mb");
         properties.put("doris.deserialize.arrow.async", "false");
         properties.put("doris.deserialize.queue.size", "64");
+        properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE + 
"");
 
         properties.put("lookup.cache.max-rows", "100");
         properties.put("lookup.cache.ttl", "20s");
@@ -103,7 +105,8 @@ public class DorisDynamicTableFactoryTest {
                 .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
                 .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
                 .setUseFlightSql(USE_FLIGHT_SQL_DEFAULT)
-                .setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT);
+                .setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT)
+                
.setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
         DorisDynamicTableSource expected =
                 new DorisDynamicTableSource(
                         options,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to