This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a2e682b  [Improve] DataFrame CSV Stream Load optimization (#137)
a2e682b is described below

commit a2e682bddbb6ef81592d66ab8cb9eff692bc0014
Author: huanccwang <wanghuan0...@outlook.com>
AuthorDate: Tue Nov 7 10:54:40 2023 +0800

    [Improve] DataFrame CSV Stream Load optimization (#137)
---
 .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 69287d8..f473636 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 
 /**
@@ -162,12 +163,17 @@ public class DorisStreamLoad implements Serializable {
         return httpClientBuilder.build();
     }
 
-    private HttpPut getHttpPut(String label, String loadUrlStr, Boolean 
enable2PC) {
+    private HttpPut getHttpPut(String label, String loadUrlStr, Boolean 
enable2PC, StructType schema) {
         HttpPut httpPut = new HttpPut(loadUrlStr);
         addCommonHeader(httpPut);
         httpPut.setHeader("label", label);
         if (StringUtils.isNotBlank(columns)) {
             httpPut.setHeader("columns", columns);
+        } else {
+            if (schema != null && !schema.isEmpty()) {
+                String dfColumns = 
Arrays.stream(schema.fieldNames()).collect(Collectors.joining(","));
+                httpPut.setHeader("columns", dfColumns);
+            }
         }
         if (StringUtils.isNotBlank(maxFilterRatio)) {
             httpPut.setHeader("max_filter_ratio", maxFilterRatio);
@@ -210,7 +216,7 @@ public class DorisStreamLoad implements Serializable {
         try (CloseableHttpClient httpClient = getHttpClient()) {
             String loadUrlStr = String.format(loadUrlPattern, getBackend(), 
db, tbl);
             this.loadUrlStr = loadUrlStr;
-            HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC);
+            HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
             RecordBatchInputStream recodeBatchInputStream = new 
RecordBatchInputStream(RecordBatch.newBuilder(rows)
                     .format(fileType)
                     .sep(FIELD_DELIMITER)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to