This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 628cf758e723b468050194dd9a6a7b7fa130dc07 Author: Youngwb <yangwenbo_mail...@163.com> AuthorDate: Sat Jan 18 00:14:39 2020 +0800 Support param exec_mem_limit for spark-doris-connctor (#2775) --- README.md | 1 + .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 3 +++ src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 9 +++++++++ src/main/thrift/doris/DorisExternalService.thrift | 3 +++ 4 files changed, 16 insertions(+) diff --git a/README.md b/README.md index 2b0d2ef..d32db83 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ dorisSparkRDD.collect() | doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 | | doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。<br />从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | | doris.batch.size | 1024 | 一次从BE读取数据的最大行数。<br />增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的的额外时间开销。 | +| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | ### SQL and Dataframe Only diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index d9b4231..742c3eb 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -53,5 +53,8 @@ public interface ConfigurationOptions { String DORIS_BATCH_SIZE = "doris.batch.size"; int DORIS_BATCH_SIZE_DEFAULT = 1024; + String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit"; + long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; + String DORIS_VALUE_READER_CLASS = "doris.value.reader.class"; } diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 163ee16..13a955a 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -71,8 +71,16 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT } + val execMemLimit = Try { + settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong + } getOrElse { + logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)) + DORIS_EXEC_MEM_LIMIT_DEFAULT + } + params.setBatch_size(batchSize) params.setQuery_timeout(queryDorisTimeout) + params.setMem_limit(execMemLimit) params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) @@ -83,6 +91,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { s"tabletId: ${params.getTablet_ids}, " + s"batch size: $batchSize, " + s"query timeout: $queryDorisTimeout, " + + s"execution memory limit: $execMemLimit, " + s"user: ${params.getUser}, " + s"query plan: ${params.opaqued_query_plan}") diff --git a/src/main/thrift/doris/DorisExternalService.thrift b/src/main/thrift/doris/DorisExternalService.thrift index d3f7e8e..c169874 100644 --- a/src/main/thrift/doris/DorisExternalService.thrift +++ b/src/main/thrift/doris/DorisExternalService.thrift @@ -56,6 +56,9 @@ struct TScanOpenParams { 11: optional i16 keep_alive_min 12: optional i32 query_timeout + + // memory limit for a single query + 13: optional i64 mem_limit } struct TScanColumnDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org