This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ec6c0  [feature] add stream load config to add double quotes for 
field when csv format. (#119)
32ec6c0 is described below

commit 32ec6c09ee86bad708b93ea3c8d00c668f690585
Author: Chuang Li <64473732+codecooke...@users.noreply.github.com>
AuthorDate: Thu Sep 14 18:31:28 2023 +0800

    [feature] add stream load config to add double quotes for field when csv 
format. (#119)
---
 .../org/apache/doris/spark/load/DorisStreamLoad.java   | 10 +++++++++-
 .../java/org/apache/doris/spark/load/RecordBatch.java  | 18 ++++++++++++++++--
 .../doris/spark/load/RecordBatchInputStream.java       |  6 +++++-
 .../java/org/apache/doris/spark/util/DataUtil.java     | 16 ++++++++++++++++
 4 files changed, 46 insertions(+), 4 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 9ecfa40..0b506b0 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -70,6 +70,7 @@ import java.util.concurrent.TimeUnit;
  * DorisStreamLoad
  **/
 public class DorisStreamLoad implements Serializable {
+    private static final String NULL_VALUE = "\\N";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisStreamLoad.class);
 
@@ -88,6 +89,7 @@ public class DorisStreamLoad implements Serializable {
     private final String columns;
     private final String maxFilterRatio;
     private final Map<String, String> streamLoadProp;
+    private boolean addDoubleQuotes;
     private static final long cacheExpireTimeout = 4 * 60;
     private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
     private final String fileType;
@@ -111,6 +113,11 @@ public class DorisStreamLoad implements Serializable {
         fileType = streamLoadProp.getOrDefault("format", "csv");
         if ("csv".equals(fileType)) {
             FIELD_DELIMITER = 
escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
+            this.addDoubleQuotes = 
Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
+            if (addDoubleQuotes) {
+                LOG.info("set add_double_quotes for csv mode, add 
trim_double_quotes to true for prop.");
+                streamLoadProp.put("trim_double_quotes", "true");
+            }
         } else if ("json".equalsIgnoreCase(fileType)) {
             streamLoadProp.put("read_json_by_line", "true");
         }
@@ -189,7 +196,8 @@ public class DorisStreamLoad implements Serializable {
                     .format(fileType)
                     .sep(FIELD_DELIMITER)
                     .delim(LINE_DELIMITER)
-                    .schema(schema).build(), streamingPassthrough);
+                    .schema(schema)
+                    .addDoubleQuotes(addDoubleQuotes).build(), 
streamingPassthrough);
             httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
             HttpResponse httpResponse = httpClient.execute(httpPut);
             loadResponse = new LoadResponse(httpResponse);
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
index 779c057..4ce297f 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
@@ -61,14 +61,17 @@ public class RecordBatch {
      */
     private final StructType schema;
 
+    private final boolean addDoubleQuotes;
+
     private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String 
format, String sep, byte[] delim,
-                        StructType schema) {
+                        StructType schema, boolean addDoubleQuotes) {
         this.iterator = iterator;
         this.batchSize = batchSize;
         this.format = format;
         this.sep = sep;
         this.delim = delim;
         this.schema = schema;
+        this.addDoubleQuotes = addDoubleQuotes;
     }
 
     public Iterator<InternalRow> getIterator() {
@@ -94,6 +97,10 @@ public class RecordBatch {
     public StructType getSchema() {
         return schema;
     }
+
+    public boolean getAddDoubleQuotes(){
+        return addDoubleQuotes;
+    }
     public static Builder newBuilder(Iterator<InternalRow> iterator) {
         return new Builder(iterator);
     }
@@ -115,6 +122,8 @@ public class RecordBatch {
 
         private StructType schema;
 
+        private boolean addDoubleQuotes;
+
         public Builder(Iterator<InternalRow> iterator) {
             this.iterator = iterator;
         }
@@ -144,8 +153,13 @@ public class RecordBatch {
             return this;
         }
 
+        public Builder addDoubleQuotes(boolean addDoubleQuotes) {
+            this.addDoubleQuotes = addDoubleQuotes;
+            return this;
+        }
+
         public RecordBatch build() {
-            return new RecordBatch(iterator, batchSize, format, sep, delim, 
schema);
+            return new RecordBatch(iterator, batchSize, format, sep, delim, 
schema, addDoubleQuotes);
         }
 
     }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
index 9444c1d..d705501 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
@@ -200,7 +200,11 @@ public class RecordBatchInputStream extends InputStream {
 
         switch (recordBatch.getFormat().toLowerCase()) {
             case "csv":
-                bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), 
recordBatch.getSep());
+                if (recordBatch.getAddDoubleQuotes()) {
+                    bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, 
recordBatch.getSchema(), recordBatch.getSep());
+                } else {
+                    bytes = DataUtil.rowToCsvBytes(row, 
recordBatch.getSchema(), recordBatch.getSep());
+                }
                 break;
             case "json":
                 try {
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index aea6dde..3f53d45 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -51,6 +51,22 @@ public class DataUtil {
         return builder.toString().getBytes(StandardCharsets.UTF_8);
     }
 
+    public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, 
StructType schema, String sep) {
+        StringBuilder builder = new StringBuilder();
+        StructField[] fields = schema.fields();
+        int n = row.numFields();
+        if (n > 0) {
+            builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, 
fields[0].dataType())).append("\"");
+            int i = 1;
+            while (i < n) {
+                builder.append(sep);
+                builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, 
fields[i].dataType())).append("\"");
+                i++;
+            }
+        }
+        return builder.toString().getBytes(StandardCharsets.UTF_8);
+    }
+
     public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
             throws JsonProcessingException {
         StructField[] fields = schema.fields();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to