This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit 628cf758e723b468050194dd9a6a7b7fa130dc07
Author: Youngwb <yangwenbo_mail...@163.com>
AuthorDate: Sat Jan 18 00:14:39 2020 +0800

    Support param exec_mem_limit for spark-doris-connctor (#2775)
---
 README.md                                                        | 1 +
 .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java    | 3 +++
 src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 9 +++++++++
 src/main/thrift/doris/DorisExternalService.thrift                | 3 +++
 4 files changed, 16 insertions(+)

diff --git a/README.md b/README.md
index 2b0d2ef..d32db83 100644
--- a/README.md
+++ b/README.md
@@ -101,6 +101,7 @@ dorisSparkRDD.collect()
 | doris.request.query.timeout.s    | 3600              | 
查询doris的超时时间,默认值为1小时,-1表示无超时限制             |
 | doris.request.tablet.size        | Integer.MAX_VALUE | 一个RDD 
Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。<br 
/>从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
 | doris.batch.size                 | 1024              | 一次从BE读取数据的最大行数。<br 
/>增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的的额外时间开销。 |
+| doris.exec.mem.limit             | 2147483648        | 单个查询的内存限制。默认为 
2GB,单位为字节                      |
 
 ### SQL and Dataframe Only
 
diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java 
b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index d9b4231..742c3eb 100644
--- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -53,5 +53,8 @@ public interface ConfigurationOptions {
     String DORIS_BATCH_SIZE = "doris.batch.size";
     int DORIS_BATCH_SIZE_DEFAULT = 1024;
 
+    String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
+    long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
+
     String DORIS_VALUE_READER_CLASS = "doris.value.reader.class";
 }
diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala 
b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 163ee16..13a955a 100644
--- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -71,8 +71,16 @@ class ScalaValueReader(partition: PartitionDefinition, 
settings: Settings) {
       DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
     }
 
+    val execMemLimit = Try {
+      settings.getProperty(DORIS_EXEC_MEM_LIMIT, 
DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong
+    } getOrElse {
+      logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, 
DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))
+      DORIS_EXEC_MEM_LIMIT_DEFAULT
+    }
+
     params.setBatch_size(batchSize)
     params.setQuery_timeout(queryDorisTimeout)
+    params.setMem_limit(execMemLimit)
     params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
     params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))
 
@@ -83,6 +91,7 @@ class ScalaValueReader(partition: PartitionDefinition, 
settings: Settings) {
         s"tabletId: ${params.getTablet_ids}, " +
         s"batch size: $batchSize, " +
         s"query timeout: $queryDorisTimeout, " +
+        s"execution memory limit: $execMemLimit, " +
         s"user: ${params.getUser}, " +
         s"query plan: ${params.opaqued_query_plan}")
 
diff --git a/src/main/thrift/doris/DorisExternalService.thrift 
b/src/main/thrift/doris/DorisExternalService.thrift
index d3f7e8e..c169874 100644
--- a/src/main/thrift/doris/DorisExternalService.thrift
+++ b/src/main/thrift/doris/DorisExternalService.thrift
@@ -56,6 +56,9 @@ struct TScanOpenParams {
   11: optional i16 keep_alive_min
 
   12: optional i32 query_timeout
+
+  // memory limit for a single query
+  13: optional i64 mem_limit
 }
 
 struct TScanColumnDesc {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to