This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b8b7d6c8 add thrfit max messahe size (#593)
b8b7d6c8 is described below
commit b8b7d6c83095e030801dafd915cd290e0ff873d6
Author: wudi <[email protected]>
AuthorDate: Fri May 9 10:07:17 2025 +0800
add thrfit max messahe size (#593)
---
.../apache/doris/flink/backend/BackendClient.java | 14 +++++++--
.../doris/flink/cfg/ConfigurationOptions.java | 2 ++
.../apache/doris/flink/cfg/DorisReadOptions.java | 35 ++++++++++++++++++----
.../doris/flink/table/DorisConfigOptions.java | 9 ++++++
.../flink/table/DorisDynamicTableFactory.java | 4 +++
.../doris/flink/source/DorisSourceITCase.java | 1 +
.../flink/table/DorisDynamicTableFactoryTest.java | 5 +++-
7 files changed, 60 insertions(+), 10 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index be976eff..c97ed202 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -54,6 +54,7 @@ public class BackendClient {
private final int retries;
private final int socketTimeout;
private final int connectTimeout;
+ private final int thriftMaxMessageSize;
public BackendClient(Routing routing, DorisReadOptions readOptions) {
this.routing = routing;
@@ -69,11 +70,16 @@ public class BackendClient {
readOptions.getRequestRetries() == null
? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
: readOptions.getRequestRetries();
+ this.thriftMaxMessageSize =
+ readOptions.getRequestRetries() == null
+ ?
ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT
+ : readOptions.getThriftMaxMessageSize();
logger.trace(
- "connect timeout set to '{}'. socket timeout set to '{}'.
retries set to '{}'.",
+ "connect timeout set to '{}'. socket timeout set to '{}'.
retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
this.connectTimeout,
this.socketTimeout,
- this.retries);
+ this.retries,
+ this.thriftMaxMessageSize);
open();
}
@@ -84,9 +90,11 @@ public class BackendClient {
logger.debug("Attempt {} to connect {}.", attempt, routing);
try {
TBinaryProtocol.Factory factory = new
TBinaryProtocol.Factory();
+ TConfiguration.Builder configBuilder = TConfiguration.custom();
+ configBuilder.setMaxMessageSize(thriftMaxMessageSize);
transport =
new TSocket(
- new TConfiguration(),
+ configBuilder.build(),
routing.getHost(),
routing.getPort(),
socketTimeout,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index 930bc273..bfca9b05 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -52,6 +52,8 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+ String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
+ Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;
String USE_FLIGHT_SQL = "source.use-flight-sql";
Boolean USE_FLIGHT_SQL_DEFAULT = true;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index ea69a9b5..2b50f253 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -41,6 +41,7 @@ public class DorisReadOptions implements Serializable {
private Integer flightSqlPort;
// for flink sql limit push down
private Long rowLimit;
+ private Integer thriftMaxMessageSize;
public DorisReadOptions(
String readFields,
@@ -57,7 +58,8 @@ public class DorisReadOptions implements Serializable {
boolean useOldApi,
boolean useFlightSql,
Integer flightSqlPort,
- Long rowLimit) {
+ Long rowLimit,
+ Integer thriftMaxMessageSize) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
@@ -73,6 +75,7 @@ public class DorisReadOptions implements Serializable {
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
this.rowLimit = rowLimit;
+ this.thriftMaxMessageSize = thriftMaxMessageSize;
}
public String getReadFields() {
@@ -151,6 +154,10 @@ public class DorisReadOptions implements Serializable {
this.rowLimit = rowLimit;
}
+ public Integer getThriftMaxMessageSize() {
+ return thriftMaxMessageSize;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -182,7 +189,8 @@ public class DorisReadOptions implements Serializable {
&& Objects.equals(deserializeArrowAsync,
that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
&& Objects.equals(flightSqlPort, that.flightSqlPort)
- && Objects.equals(rowLimit, that.rowLimit);
+ && Objects.equals(rowLimit, that.rowLimit)
+ && Objects.equals(thriftMaxMessageSize,
that.thriftMaxMessageSize);
}
@Override
@@ -202,7 +210,8 @@ public class DorisReadOptions implements Serializable {
useOldApi,
useFlightSql,
flightSqlPort,
- rowLimit);
+ rowLimit,
+ thriftMaxMessageSize);
}
public DorisReadOptions copy() {
@@ -221,7 +230,8 @@ public class DorisReadOptions implements Serializable {
useOldApi,
useFlightSql,
flightSqlPort,
- rowLimit);
+ rowLimit,
+ thriftMaxMessageSize);
}
/** Builder of {@link DorisReadOptions}. */
@@ -247,7 +257,8 @@ public class DorisReadOptions implements Serializable {
private Boolean useFlightSql =
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
private Integer flightSqlPort =
ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
private Long rowLimit;
-
+ private Integer thriftMaxMessageSize =
+ ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
/**
* Sets the readFields for doris table to push down projection, such
as name,age.
*
@@ -406,6 +417,17 @@ public class DorisReadOptions implements Serializable {
return this;
}
+ /**
+ * Sets the thriftMaxMessageSize for DorisReadOptions.
+ *
+ * @param thriftMaxMessageSize
+ * @return
+ */
+ public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) {
+ this.thriftMaxMessageSize = thriftMaxMessageSize;
+ return this;
+ }
+
/**
* Build the {@link DorisReadOptions}.
*
@@ -427,7 +449,8 @@ public class DorisReadOptions implements Serializable {
useOldApi,
useFlightSql,
flightSqlPort,
- rowLimit);
+ rowLimit,
+ thriftMaxMessageSize);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index ffd3ec92..56f96153 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -38,6 +38,7 @@ import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUER
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
/** Options for the Doris connector. */
@PublicEvolving
@@ -151,6 +152,14 @@ public class DorisConfigOptions {
.memoryType()
.defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
.withDescription("Memory limit for a single query. The
default is 8192mb.");
+
+ public static final ConfigOption<Integer> DORIS_THRIFT_MAX_MESSAGE_SIZE =
+ ConfigOptions.key("doris.thrift.max.message.size")
+ .intType()
+ .defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT)
+ .withDescription(
+ "The maximum message size for thrift protocol. The
default is Integer.MAX_VALUE.");
+
public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions.key("source.use-old-api")
.booleanType()
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 2559e1f0..6495eadc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -51,6 +51,7 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUER
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
@@ -130,6 +131,8 @@ public final class DorisDynamicTableFactory
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
+ options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE);
+
options.add(LOOKUP_CACHE_MAX_ROWS);
options.add(LOOKUP_CACHE_TTL);
options.add(LOOKUP_MAX_RETRIES);
@@ -210,6 +213,7 @@ public final class DorisDynamicTableFactory
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
+
.setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 73f6e03a..31a94598 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -291,6 +291,7 @@ public class DorisSourceITCase extends
AbstractITCaseService {
+ " 'doris.batch.size' = '1024',"
+ " 'doris.exec.mem.limit' = '2048mb',"
+ " 'doris.deserialize.arrow.async' = 'true',"
+ + " 'doris.thrift.max.message.size' =
'10485760',"
+ " 'doris.deserialize.queue.size' = '32',"
+ " 'source.use-flight-sql' = '%s'"
+ ")",
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 7f6213f7..a3c21097 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -40,6 +40,7 @@ import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUER
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA;
@@ -59,6 +60,7 @@ public class DorisDynamicTableFactoryTest {
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
+ properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE +
"");
properties.put("lookup.cache.max-rows", "100");
properties.put("lookup.cache.ttl", "20s");
@@ -103,7 +105,8 @@ public class DorisDynamicTableFactoryTest {
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
.setUseFlightSql(USE_FLIGHT_SQL_DEFAULT)
- .setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT);
+ .setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT)
+
.setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
DorisDynamicTableSource expected =
new DorisDynamicTableSource(
options,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]