This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new a2e682b [Improve] DataFrame CSV Stream Load optimization (#137) a2e682b is described below commit a2e682bddbb6ef81592d66ab8cb9eff692bc0014 Author: huanccwang <wanghuan0...@outlook.com> AuthorDate: Tue Nov 7 10:54:40 2023 +0800 [Improve] DataFrame CSV Stream Load optimization (#137) --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 69287d8..f473636 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.stream.Collectors; /** @@ -162,12 +163,17 @@ public class DorisStreamLoad implements Serializable { return httpClientBuilder.build(); } - private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC) { + private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, StructType schema) { HttpPut httpPut = new HttpPut(loadUrlStr); addCommonHeader(httpPut); httpPut.setHeader("label", label); if (StringUtils.isNotBlank(columns)) { httpPut.setHeader("columns", columns); + } else { + if (schema != null && !schema.isEmpty()) { + String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(",")); + httpPut.setHeader("columns", dfColumns); + } } if (StringUtils.isNotBlank(maxFilterRatio)) { httpPut.setHeader("max_filter_ratio", maxFilterRatio); @@ -210,7 +216,7 @@ public class DorisStreamLoad implements Serializable { try (CloseableHttpClient httpClient = getHttpClient()) { String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); this.loadUrlStr = loadUrlStr; - HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); + HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema); RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) .format(fileType) .sep(FIELD_DELIMITER) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org