This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b3abfaba6d8 [feat](refactor-param) refactor routineLoad's code about
fileformat (#50552)
b3abfaba6d8 is described below
commit b3abfaba6d847e12e3b35a1284dd5b68fd077e4a
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed May 14 10:03:47 2025 +0800
[feat](refactor-param) refactor routineLoad's code about fileformat (#50552)
Issue Number: #50238
Problem Summary:
Previously, we refactored the code of the fileFormat attribute (#50225).
However, we only added the relevant code without modifying the business
code. This pull request modifies the code of the `RoutineLoad` feature
that is related to the fileformat.
---
.../doris/analysis/CreateRoutineLoadStmt.java | 105 +++------------------
.../org/apache/doris/analysis/OutFileClause.java | 2 +-
.../doris/common/util/FileFormatConstants.java | 4 -
.../fileformat/AvroFileFormatProperties.java | 2 +-
.../fileformat/CsvFileFormatProperties.java | 29 ++++--
.../property/fileformat/FileFormatProperties.java | 18 ++--
.../fileformat/JsonFileFormatProperties.java | 8 +-
.../fileformat/OrcFileFormatProperties.java | 2 +-
.../fileformat/ParquetFileFormatProperties.java | 2 +-
.../fileformat/WalFileFormatProperties.java | 2 +-
.../doris/load/routineload/RoutineLoadJob.java | 100 +++++++-------------
.../plans/commands/info/CreateRoutineLoadInfo.java | 91 ++++--------------
.../fileformat/CsvFileFormatPropertiesTest.java | 2 +-
.../load_p0/routine_load/test_routine_load.groovy | 4 +-
14 files changed, 115 insertions(+), 256 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 403612f989c..804d9647a53 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -169,22 +170,8 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private int sendBatchParallelism = 1;
private boolean loadToSingleTablet = false;
- /**
- * RoutineLoad support json data.
- * Require Params:
- * 1) dataFormat = "json"
- * 2) jsonPaths = "$.XXX.xxx"
- */
- private String format = ""; //default is csv.
- private String jsonPaths = "";
- private String jsonRoot = ""; // MUST be a jsonpath string
- private boolean stripOuterArray = false;
- private boolean numAsString = false;
- private boolean fuzzyParse = false;
- private byte enclose;
-
- private byte escape;
+ private FileFormatProperties fileFormatProperties;
private String workloadGroupName = "";
@@ -229,6 +216,8 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
if (comment != null) {
this.comment = comment;
}
+ String format =
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+ fileFormatProperties =
FileFormatProperties.createFileFormatProperties(format);
}
/*
@@ -239,9 +228,9 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
Map<String, String> jobProperties, String typeName,
RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, long maxErrorNum, double
maxFilterRatio, long maxBatchIntervalS,
long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int
sendBatchParallelism, String timezone,
- String format, String jsonPaths, String jsonRoot, byte enclose,
byte escape, String workloadGroupName,
- boolean loadToSingleTablet, boolean strictMode, boolean
isPartialUpdate, boolean stripOuterArray,
- boolean numAsString, boolean fuzzyParse,
AbstractDataSourceProperties dataSourceProperties) {
+ String workloadGroupName, boolean loadToSingleTablet, boolean
strictMode,
+ boolean isPartialUpdate, AbstractDataSourceProperties
dataSourceProperties,
+ FileFormatProperties fileFormatProperties) {
this.labelName = labelName;
this.dbName = dbName;
this.name = name;
@@ -261,19 +250,12 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
this.execMemLimit = execMemLimit;
this.sendBatchParallelism = sendBatchParallelism;
this.timezone = timezone;
- this.format = format;
- this.jsonPaths = jsonPaths;
- this.jsonRoot = jsonRoot;
- this.enclose = enclose;
- this.escape = escape;
this.workloadGroupName = workloadGroupName;
this.loadToSingleTablet = loadToSingleTablet;
this.strictMode = strictMode;
this.isPartialUpdate = isPartialUpdate;
- this.stripOuterArray = stripOuterArray;
- this.numAsString = numAsString;
- this.fuzzyParse = fuzzyParse;
this.dataSourceProperties = dataSourceProperties;
+ this.fileFormatProperties = fileFormatProperties;
}
public String getName() {
@@ -340,42 +322,14 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
return timezone;
}
- public String getFormat() {
- return format;
- }
-
- public boolean isStripOuterArray() {
- return stripOuterArray;
- }
-
- public boolean isNumAsString() {
- return numAsString;
- }
-
- public boolean isFuzzyParse() {
- return fuzzyParse;
- }
-
- public String getJsonPaths() {
- return jsonPaths;
- }
-
- public byte getEnclose() {
- return enclose;
- }
-
- public byte getEscape() {
- return escape;
- }
-
- public String getJsonRoot() {
- return jsonRoot;
- }
-
public LoadTask.MergeType getMergeType() {
return mergeType;
}
+ public FileFormatProperties getFileFormatProperties() {
+ return fileFormatProperties;
+ }
+
public AbstractDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}
@@ -564,23 +518,6 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
- String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
- if (encloseStr != null) {
- if (encloseStr.length() != 1) {
- throw new AnalysisException("enclose must be single-char");
- } else {
- enclose = encloseStr.getBytes()[0];
- }
- }
- String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
- if (escapeStr != null) {
- if (escapeStr.length() != 1) {
- throw new AnalysisException("enclose must be single-char");
- } else {
- escape = escapeStr.getBytes()[0];
- }
- }
-
String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
ConnectContext tmpCtx = new ConnectContext();
@@ -603,23 +540,7 @@ public class CreateRoutineLoadStmt extends DdlStmt
implements NotFallbackInParse
}
timezone =
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE,
timezone));
- format = jobProperties.get(FORMAT);
- if (format != null) {
- if (format.equalsIgnoreCase("csv")) {
- format = ""; // if it's not json, then it's mean csv and set
empty
- } else if (format.equalsIgnoreCase("json")) {
- format = "json";
- jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
- jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
- stripOuterArray =
Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
- numAsString =
Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
- fuzzyParse =
Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
- } else {
- throw new UserException("Format type is invalid. format=`" +
format + "`");
- }
- } else {
- format = "csv"; // default csv
- }
+ fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
}
private void checkDataSourceProperties() throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 744442955c8..6ff7ab6c5df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -731,7 +731,7 @@ public class OutFileClause {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ")
- .append(fileFormatProperties.getFileFormatType());
+ .append(fileFormatProperties.getFormatName());
if (properties != null && !properties.isEmpty()) {
sb.append(" PROPERTIES(");
sb.append(new PrintableMap<>(properties, " = ", true, false));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index 7050bec9d77..2cd5852dea4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -20,10 +20,6 @@ package org.apache.doris.common.util;
import java.util.regex.Pattern;
public class FileFormatConstants {
- public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
- public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
- public static final String DEFAULT_LINE_DELIMITER = "\n";
-
public static final String FORMAT_CSV = "csv";
public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES =
"csv_with_names_and_types";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
index 6d2b799ea00..7622cd19644 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
@@ -27,7 +27,7 @@ import java.util.Map;
public class AvroFileFormatProperties extends FileFormatProperties {
public AvroFileFormatProperties() {
- super(TFileFormatType.FORMAT_AVRO);
+ super(TFileFormatType.FORMAT_AVRO, FileFormatProperties.FORMAT_AVRO);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
index 0efea98a5c3..097ea5ab5a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -53,6 +53,7 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
public static final String PROP_ENCLOSE = "enclose";
+ public static final String PROP_ESCAPE = "escape";
private String headerType = "";
private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
@@ -62,24 +63,26 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
private int skipLines;
private byte enclose;
+ private byte escape;
+
// used by tvf
// User specified csv columns, it will override columns got from file
private final List<Column> csvSchema = Lists.newArrayList();
String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
- public CsvFileFormatProperties() {
- super(TFileFormatType.FORMAT_CSV_PLAIN);
+ public CsvFileFormatProperties(String formatName) {
+ super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
}
- public CsvFileFormatProperties(String defaultColumnSeparator,
TTextSerdeType textSerdeType) {
- super(TFileFormatType.FORMAT_CSV_PLAIN);
+ public CsvFileFormatProperties(String defaultColumnSeparator,
TTextSerdeType textSerdeType, String formatName) {
+ super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
this.defaultColumnSeparator = defaultColumnSeparator;
this.textSerdeType = textSerdeType;
}
- public CsvFileFormatProperties(String headerType) {
- super(TFileFormatType.FORMAT_CSV_PLAIN);
+ public CsvFileFormatProperties(String headerType, String formatName) {
+ super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
this.headerType = headerType;
}
@@ -115,6 +118,16 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
}
}
+ String escapeStr = getOrDefault(formatProperties, PROP_ESCAPE,
+ "", isRemoveOriginProperty);
+ if (!Strings.isNullOrEmpty(escapeStr)) {
+ if (escapeStr.length() != 1) {
+ throw new AnalysisException("escape must be single-char");
+ } else {
+ escape = escapeStr.getBytes()[0];
+ }
+ }
+
trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties,
PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty))
.booleanValue();
@@ -186,6 +199,10 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
return enclose;
}
+ public byte getEscape() {
+ return escape;
+ }
+
public List<Column> getCsvSchema() {
return csvSchema;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
index bd0ecc214c6..81cf090fa22 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -40,12 +40,14 @@ public abstract class FileFormatProperties {
public static final String FORMAT_ARROW = "arrow";
public static final String PROP_COMPRESS_TYPE = "compress_type";
+ protected String formatName;
protected TFileFormatType fileFormatType;
protected TFileCompressType compressionType;
- public FileFormatProperties(TFileFormatType fileFormatType) {
+ public FileFormatProperties(TFileFormatType fileFormatType, String
formatName) {
this.fileFormatType = fileFormatType;
+ this.formatName = formatName;
}
/**
@@ -73,16 +75,14 @@ public abstract class FileFormatProperties {
public static FileFormatProperties createFileFormatProperties(String
formatString) {
switch (formatString) {
case FORMAT_CSV:
- return new CsvFileFormatProperties();
+ return new CsvFileFormatProperties(formatString);
case FORMAT_HIVE_TEXT:
return new
CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR,
- TTextSerdeType.HIVE_TEXT_SERDE);
+ TTextSerdeType.HIVE_TEXT_SERDE, formatString);
case FORMAT_CSV_WITH_NAMES:
- return new CsvFileFormatProperties(
- FORMAT_CSV_WITH_NAMES);
+ return new CsvFileFormatProperties(FORMAT_CSV_WITH_NAMES,
formatString);
case FORMAT_CSV_WITH_NAMES_AND_TYPES:
- return new CsvFileFormatProperties(
- FORMAT_CSV_WITH_NAMES_AND_TYPES);
+ return new
CsvFileFormatProperties(FORMAT_CSV_WITH_NAMES_AND_TYPES, formatString);
case FORMAT_PARQUET:
return new ParquetFileFormatProperties();
case FORMAT_ORC:
@@ -121,4 +121,8 @@ public abstract class FileFormatProperties {
public TFileCompressType getCompressionType() {
return compressionType;
}
+
+ public String getFormatName() {
+ return formatName;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
index 3431d366f8b..238844bee22 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
@@ -37,14 +37,14 @@ public class JsonFileFormatProperties extends
FileFormatProperties {
// from ExternalFileTableValuedFunction:
private String jsonRoot = "";
private String jsonPaths = "";
- private boolean stripOuterArray;
+ private boolean stripOuterArray = false;
private boolean readJsonByLine;
- private boolean numAsString;
- private boolean fuzzyParse;
+ private boolean numAsString = false;
+ private boolean fuzzyParse = false;
public JsonFileFormatProperties() {
- super(TFileFormatType.FORMAT_JSON);
+ super(TFileFormatType.FORMAT_JSON, FileFormatProperties.FORMAT_JSON);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
index 412c3d237e8..ac88b225181 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
@@ -41,7 +41,7 @@ public class OrcFileFormatProperties extends
FileFormatProperties {
private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
public OrcFileFormatProperties() {
- super(TFileFormatType.FORMAT_ORC);
+ super(TFileFormatType.FORMAT_ORC, FileFormatProperties.FORMAT_ORC);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
index 8063b25964a..18d1484e596 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
@@ -62,7 +62,7 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
public ParquetFileFormatProperties() {
- super(TFileFormatType.FORMAT_PARQUET);
+ super(TFileFormatType.FORMAT_PARQUET,
FileFormatProperties.FORMAT_PARQUET);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
index 0c6b1777cf6..af8dfd5b70f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
@@ -27,7 +27,7 @@ import java.util.Map;
public class WalFileFormatProperties extends FileFormatProperties {
public WalFileFormatProperties() {
- super(TFileFormatType.FORMAT_WAL);
+ super(TFileFormatType.FORMAT_WAL, FileFormatProperties.FORMAT_WAL);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a363d50b505..af4a977840a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -48,6 +48,9 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
+import
org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
@@ -227,20 +230,6 @@ public abstract class RoutineLoadJob
protected boolean memtableOnSinkNode = false;
- /**
- * RoutineLoad support json data.
- * Require Params:
- * 1) format = "json"
- * 2) jsonPath = "$.XXX.xxx"
- */
- private static final String PROPS_FORMAT = "format";
- private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array";
- private static final String PROPS_NUM_AS_STRING = "num_as_string";
- private static final String PROPS_JSONPATHS = "jsonpaths";
- private static final String PROPS_JSONROOT = "json_root";
- private static final String PROPS_FUZZY_PARSE = "fuzzy_parse";
-
-
protected int currentTaskConcurrentNum;
@SerializedName("pg")
protected RoutineLoadProgress progress;
@@ -395,47 +384,30 @@ public abstract class RoutineLoadJob
this.isPartialUpdate = true;
}
jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(maxFilterRatio));
- if (Strings.isNullOrEmpty(stmt.getFormat()) ||
stmt.getFormat().equals("csv")) {
- jobProperties.put(PROPS_FORMAT, "csv");
- } else if (stmt.getFormat().equals("json")) {
- jobProperties.put(PROPS_FORMAT, "json");
+
+ FileFormatProperties fileFormatProperties =
stmt.getFileFormatProperties();
+ if (fileFormatProperties instanceof CsvFileFormatProperties) {
+ CsvFileFormatProperties csvFileFormatProperties =
(CsvFileFormatProperties) fileFormatProperties;
+ jobProperties.put(FileFormatProperties.PROP_FORMAT, "csv");
+ jobProperties.put(LoadStmt.KEY_ENCLOSE,
String.valueOf(csvFileFormatProperties.getEnclose()));
+ jobProperties.put(LoadStmt.KEY_ESCAPE,
String.valueOf(csvFileFormatProperties.getEscape()));
+ this.enclose = csvFileFormatProperties.getEnclose();
+ this.escape = csvFileFormatProperties.getEscape();
+ } else if (fileFormatProperties instanceof JsonFileFormatProperties) {
+ JsonFileFormatProperties jsonFileFormatProperties =
(JsonFileFormatProperties) fileFormatProperties;
+ jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
+ jobProperties.put(JsonFileFormatProperties.PROP_JSON_PATHS,
jsonFileFormatProperties.getJsonPaths());
+ jobProperties.put(JsonFileFormatProperties.PROP_JSON_ROOT,
jsonFileFormatProperties.getJsonRoot());
+ jobProperties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
+
String.valueOf(jsonFileFormatProperties.isStripOuterArray()));
+ jobProperties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING,
+ String.valueOf(jsonFileFormatProperties.isNumAsString()));
+ jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE,
+ String.valueOf(jsonFileFormatProperties.isFuzzyParse()));
} else {
throw new UserException("Invalid format type.");
}
- if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
- jobProperties.put(PROPS_JSONPATHS, stmt.getJsonPaths());
- } else {
- jobProperties.put(PROPS_JSONPATHS, "");
- }
- if (!Strings.isNullOrEmpty(stmt.getJsonRoot())) {
- jobProperties.put(PROPS_JSONROOT, stmt.getJsonRoot());
- } else {
- jobProperties.put(PROPS_JSONROOT, "");
- }
- if (stmt.isStripOuterArray()) {
- jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "true");
- } else {
- jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
- }
- if (stmt.isNumAsString()) {
- jobProperties.put(PROPS_NUM_AS_STRING, "true");
- } else {
- jobProperties.put(PROPS_NUM_AS_STRING, "false");
- }
- if (stmt.isFuzzyParse()) {
- jobProperties.put(PROPS_FUZZY_PARSE, "true");
- } else {
- jobProperties.put(PROPS_FUZZY_PARSE, "false");
- }
- if (String.valueOf(stmt.getEnclose()) != null) {
- this.enclose = stmt.getEnclose();
- jobProperties.put(LoadStmt.KEY_ENCLOSE,
String.valueOf(stmt.getEnclose()));
- }
- if (String.valueOf(stmt.getEscape()) != null) {
- this.escape = stmt.getEscape();
- jobProperties.put(LoadStmt.KEY_ESCAPE,
String.valueOf(stmt.getEscape()));
- }
if (!StringUtils.isEmpty(stmt.getWorkloadGroupName())) {
jobProperties.put(WORKLOAD_GROUP, stmt.getWorkloadGroupName());
}
@@ -683,7 +655,7 @@ public abstract class RoutineLoadJob
}
public String getFormat() {
- String value = jobProperties.get(PROPS_FORMAT);
+ String value = jobProperties.get(FileFormatProperties.PROP_FORMAT);
if (value == null) {
return "csv";
}
@@ -692,17 +664,17 @@ public abstract class RoutineLoadJob
@Override
public boolean isStripOuterArray() {
- return
Boolean.parseBoolean(jobProperties.get(PROPS_STRIP_OUTER_ARRAY));
+ return
Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY));
}
@Override
public boolean isNumAsString() {
- return Boolean.parseBoolean(jobProperties.get(PROPS_NUM_AS_STRING));
+ return
Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_NUM_AS_STRING));
}
@Override
public boolean isFuzzyParse() {
- return Boolean.parseBoolean(jobProperties.get(PROPS_FUZZY_PARSE));
+ return
Boolean.parseBoolean(jobProperties.get(JsonFileFormatProperties.PROP_FUZZY_PARSE));
}
@Override
@@ -750,7 +722,7 @@ public abstract class RoutineLoadJob
}
public String getJsonPaths() {
- String value = jobProperties.get(PROPS_JSONPATHS);
+ String value =
jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS);
if (value == null) {
return "";
}
@@ -758,7 +730,7 @@ public abstract class RoutineLoadJob
}
public String getJsonRoot() {
- String value = jobProperties.get(PROPS_JSONROOT);
+ String value =
jobProperties.get(JsonFileFormatProperties.PROP_JSON_ROOT);
if (value == null) {
return "";
}
@@ -1808,15 +1780,15 @@ public abstract class RoutineLoadJob
appendProperties(sb,
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS,
false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY,
maxBatchRows, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY,
maxBatchSizeBytes, false);
- appendProperties(sb, PROPS_FORMAT, getFormat(), false);
+ appendProperties(sb, FileFormatProperties.PROP_FORMAT, getFormat(),
false);
if (isPartialUpdate) {
appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS,
isPartialUpdate, false);
}
- appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
- appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(),
false);
- appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
- appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false);
- appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), false);
+ appendProperties(sb, JsonFileFormatProperties.PROP_JSON_PATHS,
getJsonPaths(), false);
+ appendProperties(sb, JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
isStripOuterArray(), false);
+ appendProperties(sb, JsonFileFormatProperties.PROP_NUM_AS_STRING,
isNumAsString(), false);
+ appendProperties(sb, JsonFileFormatProperties.PROP_FUZZY_PARSE,
isFuzzyParse(), false);
+ appendProperties(sb, JsonFileFormatProperties.PROP_JSON_ROOT,
getJsonRoot(), false);
appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
@@ -1890,7 +1862,7 @@ public abstract class RoutineLoadJob
jobProperties.put("precedingFilter", precedingFilter == null ?
STAR_STRING : precedingFilter.toSql());
jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING :
whereExpr.toSql());
if (getFormat().equalsIgnoreCase("json")) {
- jobProperties.put(PROPS_FORMAT, "json");
+ jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
} else {
jobProperties.put(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR,
columnSeparator == null ? "\t" :
columnSeparator.toString());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index daaed3215c2..eecbbf871b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -38,6 +38,9 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
+import
org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -86,12 +89,6 @@ public class CreateRoutineLoadInfo {
public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size";
public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
- public static final String FORMAT = "format"; // the value is csv or json,
default is csv
- public static final String STRIP_OUTER_ARRAY = "strip_outer_array";
- public static final String JSONPATHS = "jsonpaths";
- public static final String JSONROOT = "json_root";
- public static final String NUM_AS_STRING = "num_as_string";
- public static final String FUZZY_PARSE = "fuzzy_parse";
public static final String PARTIAL_COLUMNS = "partial_columns";
public static final String WORKLOAD_GROUP = "workload_group";
public static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
@@ -116,12 +113,6 @@ public class CreateRoutineLoadInfo {
.add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
.add(MAX_BATCH_ROWS_PROPERTY)
.add(MAX_BATCH_SIZE_PROPERTY)
- .add(FORMAT)
- .add(JSONPATHS)
- .add(STRIP_OUTER_ARRAY)
- .add(NUM_AS_STRING)
- .add(FUZZY_PARSE)
- .add(JSONROOT)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.add(EXEC_MEM_LIMIT_PROPERTY)
@@ -129,8 +120,14 @@ public class CreateRoutineLoadInfo {
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.add(WORKLOAD_GROUP)
- .add(LoadStmt.KEY_ENCLOSE)
- .add(LoadStmt.KEY_ESCAPE)
+ .add(FileFormatProperties.PROP_FORMAT)
+ .add(JsonFileFormatProperties.PROP_JSON_PATHS)
+ .add(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY)
+ .add(JsonFileFormatProperties.PROP_NUM_AS_STRING)
+ .add(JsonFileFormatProperties.PROP_FUZZY_PARSE)
+ .add(JsonFileFormatProperties.PROP_JSON_ROOT)
+ .add(CsvFileFormatProperties.PROP_ENCLOSE)
+ .add(CsvFileFormatProperties.PROP_ESCAPE)
.build();
private static final Logger LOG =
LogManager.getLogger(CreateRoutineLoadInfo.class);
@@ -157,22 +154,7 @@ public class CreateRoutineLoadInfo {
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private int sendBatchParallelism = 1;
private boolean loadToSingleTablet = false;
- /**
- * RoutineLoad support json data.
- * Require Params:
- * 1) dataFormat = "json"
- * 2) jsonPaths = "$.XXX.xxx"
- */
- private String format = ""; //default is csv.
- private String jsonPaths = "";
- private String jsonRoot = ""; // MUST be a jsonpath string
- private boolean stripOuterArray = false;
- private boolean numAsString = false;
- private boolean fuzzyParse = false;
-
- private byte enclose;
-
- private byte escape;
+ private FileFormatProperties fileFormatProperties;
private String workloadGroupName;
@@ -392,23 +374,6 @@ public class CreateRoutineLoadInfo {
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
- String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
- if (encloseStr != null) {
- if (encloseStr.length() != 1) {
- throw new AnalysisException("enclose must be single-char");
- } else {
- enclose = encloseStr.getBytes()[0];
- }
- }
- String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
- if (escapeStr != null) {
- if (escapeStr.length() != 1) {
- throw new AnalysisException("enclose must be single-char");
- } else {
- escape = escapeStr.getBytes()[0];
- }
- }
-
String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
ConnectContext tmpCtx = new ConnectContext();
@@ -431,23 +396,9 @@ public class CreateRoutineLoadInfo {
}
timezone =
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE,
timezone));
- format = jobProperties.get(FORMAT);
- if (format != null) {
- if (format.equalsIgnoreCase("csv")) {
- format = ""; // if it's not json, then it's mean csv and set
empty
- } else if (format.equalsIgnoreCase("json")) {
- format = "json";
- jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
- jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
- stripOuterArray =
Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
- numAsString =
Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
- fuzzyParse =
Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
- } else {
- throw new UserException("Format type is invalid. format=`" +
format + "`");
- }
- } else {
- format = "csv"; // default csv
- }
+ String format =
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
+ fileFormatProperties =
FileFormatProperties.createFileFormatProperties(format);
+ fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
}
private void checkDataSourceProperties() throws UserException {
@@ -461,13 +412,11 @@ public class CreateRoutineLoadInfo {
*/
public CreateRoutineLoadStmt translateToLegacyStmt(ConnectContext ctx) {
return new CreateRoutineLoadStmt(labelNameInfo.transferToLabelName(),
dbName, name, tableName, null,
- ctx.getStatementContext().getOriginStatement(),
ctx.getCurrentUserIdentity(),
- jobProperties, typeName, routineLoadDesc,
- desiredConcurrentNum, maxErrorNum, maxFilterRatio,
maxBatchIntervalS, maxBatchRows, maxBatchSizeBytes,
- execMemLimit, sendBatchParallelism, timezone, format,
jsonPaths, jsonRoot, enclose, escape,
- workloadGroupName,
- loadToSingleTablet, strictMode, isPartialUpdate,
stripOuterArray, numAsString, fuzzyParse,
- dataSourceProperties
+ ctx.getStatementContext().getOriginStatement(),
ctx.getCurrentUserIdentity(),
+ jobProperties, typeName, routineLoadDesc,
+ desiredConcurrentNum, maxErrorNum, maxFilterRatio,
maxBatchIntervalS, maxBatchRows, maxBatchSizeBytes,
+ execMemLimit, sendBatchParallelism, timezone, workloadGroupName,
loadToSingleTablet,
+ strictMode, isPartialUpdate, dataSourceProperties,
fileFormatProperties
);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
index a496378b5e5..4b2550cfa52 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
@@ -33,7 +33,7 @@ public class CsvFileFormatPropertiesTest {
@Before
public void setUp() {
- csvFileFormatProperties = new CsvFileFormatProperties();
+ csvFileFormatProperties = new CsvFileFormatProperties("csv");
}
@Test
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 5603b9dd25a..82f0367772c 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -258,7 +258,7 @@ suite("test_routine_load","p0") {
continue;
}
log.info("reason of state changed:
${res[0][17].toString()}".toString())
- assertEquals(res[0][8].toString(), "RUNNING")
+ assertEquals("RUNNING", res[0][8].toString())
break;
}
@@ -1301,7 +1301,7 @@ suite("test_routine_load","p0") {
sql "sync"
}catch (Exception e) {
log.info("create routine load failed: ${e.getMessage()}")
- assertEquals(e.getMessage(), "errCode = 2, detailMessage =
Format type is invalid. format=`test`")
+ assertEquals(e.getMessage(), "errCode = 2, detailMessage =
format:test is not supported.")
}
i++
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]