This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 32ec6c0 [feature] add stream load config to add double quotes for field when csv format. (#119) 32ec6c0 is described below commit 32ec6c09ee86bad708b93ea3c8d00c668f690585 Author: Chuang Li <64473732+codecooke...@users.noreply.github.com> AuthorDate: Thu Sep 14 18:31:28 2023 +0800 [feature] add stream load config to add double quotes for field when csv format. (#119) --- .../org/apache/doris/spark/load/DorisStreamLoad.java | 10 +++++++++- .../java/org/apache/doris/spark/load/RecordBatch.java | 18 ++++++++++++++++-- .../doris/spark/load/RecordBatchInputStream.java | 6 +++++- .../java/org/apache/doris/spark/util/DataUtil.java | 16 ++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 9ecfa40..0b506b0 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -70,6 +70,7 @@ import java.util.concurrent.TimeUnit; * DorisStreamLoad **/ public class DorisStreamLoad implements Serializable { + private static final String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -88,6 +89,7 @@ public class DorisStreamLoad implements Serializable { private final String columns; private final String maxFilterRatio; private final Map<String, String> streamLoadProp; + private boolean addDoubleQuotes; private static final long cacheExpireTimeout = 4 * 60; private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache; private final String fileType; @@ -111,6 +113,11 @@ public class DorisStreamLoad implements Serializable { fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)) { FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); + this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false")); + if (addDoubleQuotes) { + LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop."); + streamLoadProp.put("trim_double_quotes", "true"); + } } else if ("json".equalsIgnoreCase(fileType)) { streamLoadProp.put("read_json_by_line", "true"); } @@ -189,7 +196,8 @@ public class DorisStreamLoad implements Serializable { .format(fileType) .sep(FIELD_DELIMITER) .delim(LINE_DELIMITER) - .schema(schema).build(), streamingPassthrough); + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java index 779c057..4ce297f 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java @@ -61,14 +61,17 @@ public class RecordBatch { */ private final StructType schema; + private final boolean addDoubleQuotes; + private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim, - StructType schema) { + StructType schema, boolean addDoubleQuotes) { this.iterator = iterator; this.batchSize = batchSize; this.format = format; this.sep = sep; this.delim = delim; this.schema = schema; + this.addDoubleQuotes = addDoubleQuotes; } public Iterator<InternalRow> getIterator() { @@ -94,6 +97,10 @@ public class RecordBatch { public StructType getSchema() { return schema; } + + public boolean getAddDoubleQuotes(){ + return addDoubleQuotes; + } public static Builder newBuilder(Iterator<InternalRow> iterator) { return new Builder(iterator); } @@ -115,6 +122,8 @@ public class RecordBatch { private StructType schema; + private boolean addDoubleQuotes; + public Builder(Iterator<InternalRow> iterator) { this.iterator = iterator; } @@ -144,8 +153,13 @@ public class RecordBatch { return this; } + public Builder addDoubleQuotes(boolean addDoubleQuotes) { + this.addDoubleQuotes = addDoubleQuotes; + return this; + } + public RecordBatch build() { - return new RecordBatch(iterator, batchSize, format, sep, delim, schema); + return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes); } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index 9444c1d..d705501 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -200,7 +200,11 @@ public class RecordBatchInputStream extends InputStream { switch (recordBatch.getFormat().toLowerCase()) { case "csv": - bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); + if (recordBatch.getAddDoubleQuotes()) { + bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); + } else { + bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); + } break; case "json": try { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index aea6dde..3f53d45 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -51,6 +51,22 @@ public class DataUtil { return builder.toString().getBytes(StandardCharsets.UTF_8); } + public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) { + StringBuilder builder = new StringBuilder(); + StructField[] fields = schema.fields(); + int n = row.numFields(); + if (n > 0) { + builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\""); + int i = 1; + while (i < n) { + builder.append(sep); + builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\""); + i++; + } + } + return builder.toString().getBytes(StandardCharsets.UTF_8); + } + public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { StructField[] fields = schema.fields(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org