This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 0bb76ed [improvement] Adapt to the load format of csv (#65) 0bb76ed is described below commit 0bb76ed0480398583b2541596fece161ea402984 Author: Hong Liu <844981...@qq.com> AuthorDate: Sat Jan 28 11:16:40 2023 +0800 [improvement] Adapt to the load format of csv (#65) Co-authored-by: smallhibiscus <844981280> --- .../org/apache/doris/spark/DorisStreamLoad.java | 56 ++++++++++++++-------- .../doris/spark/sql/DorisSourceProvider.scala | 2 +- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 351ef23..3ada398 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -50,10 +50,10 @@ import java.util.concurrent.TimeUnit; /** * DorisStreamLoad **/ -public class DorisStreamLoad implements Serializable { - public static final String FIELD_DELIMITER = "\t"; - public static final String LINE_DELIMITER = "\n"; - public static final String NULL_VALUE = "\\N"; +public class DorisStreamLoad implements Serializable{ + private String FIELD_DELIMITER; + private String LINE_DELIMITER; + private String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -71,6 +71,7 @@ public class DorisStreamLoad implements Serializable { private Map<String, String> streamLoadProp; private static final long cacheExpireTimeout = 4 * 60; private LoadingCache<String, List<BackendV2.BackendRowV2>> cache; + private String fileType; public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { this.db = db; @@ -114,6 +115,11 @@ public class DorisStreamLoad implements Serializable { cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); + fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format"); + if (fileType.equals("csv")){ + FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator"); + LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter"); + } } public String getLoadUrlStr() { @@ -156,8 +162,10 @@ public class DorisStreamLoad implements Serializable { conn.addRequestProperty(k, v); }); } - conn.addRequestProperty("format", "json"); - conn.addRequestProperty("strip_outer_array", "true"); + if (fileType.equals("json")){ + conn.addRequestProperty("format", "json"); + conn.addRequestProperty("strip_outer_array", "true"); + } return conn; } @@ -200,24 +208,30 @@ public class DorisStreamLoad implements Serializable { public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException { - List<Map<Object, Object>> dataList = new ArrayList<>(); - try { - for (List<Object> row : rows) { - Map<Object, Object> dataMap = new HashMap<>(); - if (dfColumns.length == row.size()) { - for (int i = 0; i < dfColumns.length; i++) { - dataMap.put(dfColumns[i], row.get(i)); + if (fileType.equals("csv")) { + load(listToString(rows)); + } else if(fileType.equals("json")) { + List<Map<Object, Object>> dataList = new ArrayList<>(); + try { + for (List<Object> row : rows) { + Map<Object, Object> dataMap = new HashMap<>(); + if (dfColumns.length == row.size()) { + for (int i = 0; i < dfColumns.length; i++) { + dataMap.put(dfColumns[i], row.get(i)); + } } + dataList.add(dataMap); } - dataList.add(dataMap); + } catch (Exception e) { + throw new StreamLoadException("The number of configured columns does not match the number of data columns."); } - } catch (Exception e) { - throw new StreamLoadException("The number of configured columns does not match the number of data columns."); - } - // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception - List<String> serializedList = ListUtils.getSerializedList(dataList); - for (String serializedRows : serializedList) { - load(serializedRows); + // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception + List<String> serializedList = ListUtils.getSerializedList(dataList); + for (String serializedRows : serializedList) { + load(serializedRows); + } + } else { + throw new StreamLoadException("Not supoort the file format in stream load."); } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 2922d63..0399acf 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -83,7 +83,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister line.add(field.asInstanceOf[AnyRef]) } rowsBuffer.add(line) - if (rowsBuffer.size > maxRowCount) { + if (rowsBuffer.size > maxRowCount - 1 ) { flush } }) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org