This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 41ef9cc (#5224)some little fix for spark load (#5233) 41ef9cc is described below commit 41ef9ccda977b22c8d9f234869194f2be0462afb Author: wangbo <506340...@qq.com> AuthorDate: Wed Jan 27 11:16:59 2021 +0800 (#5224)some little fix for spark load (#5233) * (#5224)some little fix for spark load * 1 use yyyy-MM-dd instead of YYYY-MM-DD 2 unify lower case for bitmap column name --- .../apache/doris/load/loadv2/dpp/ColumnParser.java | 12 +++-- .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 55 ++++++++++++++++++---- .../apache/doris/load/loadv2/etl/SparkEtlJob.java | 9 +++- 3 files changed, 61 insertions(+), 15 deletions(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java index c9d6a42..1547191 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java @@ -21,18 +21,22 @@ import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; -import java.util.Date; // Parser to validate value for different type public abstract class ColumnParser implements Serializable { protected static final Logger LOG = LogManager.getLogger(ColumnParser.class); + // thread safe formatter + public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd"); + public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); + public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException { String columnType = etlColumn.columnType; if (columnType.equalsIgnoreCase("TINYINT")) { @@ -158,7 +162,7 @@ class DateParser extends ColumnParser { @Override public boolean parse(String value) { try { - Date.parse(value); + DATE_FORMATTER.parseDateTime(value); } catch (IllegalArgumentException e) { return false; } @@ -170,7 +174,7 @@ class DatetimeParser extends ColumnParser { @Override public boolean parse(String value) { try { - DateTime.parse(value); + DATE_TIME_FORMATTER.parseDateTime(value); } catch (IllegalArgumentException e) { return false; } diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 2af4de4..b30d102 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -108,13 +108,17 @@ public final class SparkDpp implements java.io.Serializable { // we need to wrap it so that we can use it in executor. private SerializableConfiguration serializableHadoopConf; private DppResult dppResult = new DppResult(); + Map<Long, Set<String>> tableToBitmapDictColumns = new HashMap<>(); // just for ut public SparkDpp() {} - public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) { + public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig, Map<Long, Set<String>> tableToBitmapDictColumns) { this.spark = spark; this.etlJobConfig = etlJobConfig; + if (tableToBitmapDictColumns != null) { + this.tableToBitmapDictColumns = tableToBitmapDictColumns; + } } public void init() { @@ -543,7 +547,9 @@ public final class SparkDpp implements java.io.Serializable { } } if (column.columnType.equalsIgnoreCase("DATE")) { - dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date")); + dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.DateType)); + } else if (column.columnType.equalsIgnoreCase("DATETIME")) { + dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.TimestampType)); } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) { dataframe = dataframe.withColumn(dstField.name(), functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1") @@ -844,7 +850,8 @@ public final class SparkDpp implements java.io.Serializable { String hiveDbTableName, EtlJobConfig.EtlIndex baseIndex, EtlJobConfig.EtlFileGroup fileGroup, - StructType dstTableSchema) throws SparkDppException { + StructType dstTableSchema, + Set<String> dictBitmapColumnSet) throws SparkDppException { // select base index columns from hive table StringBuilder sql = new StringBuilder(); sql.append("select "); @@ -857,18 +864,39 @@ public final class SparkDpp implements java.io.Serializable { } Dataset<Row> dataframe = spark.sql(sql.toString()); + // Note(wb): in current spark load implementation, spark load can't be consistent with doris BE; The reason is as follows + // For stream load in doris BE, it runs as follow steps: + // step 1: type check + // step 2: expression calculation + // step 3: strict mode check + // step 4: nullable column check + // BE can do the four steps row by row + // but spark load relies on spark to do step2, so it can only do step 1 for whole dataset and then do step 2 for whole dataset and so on; + // So in spark load, we first do step 1,3,4,and then do step 2. dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode, - dstTableSchema); + dstTableSchema, dictBitmapColumnSet); dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup); return dataframe; } private Dataset<Row> checkDataFromHiveWithStrictMode( - Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException { + Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema, + Set<String> dictBitmapColumnSet) throws SparkDppException { List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>(); List<ColumnParser> columnParserArrayList = new ArrayList<>(); for (EtlJobConfig.EtlColumn column : baseIndex.columns) { - if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") && + // note(wb): there are three data source for bitmap column + // case 1: global dict; need't check + // case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here + // case 3: origin value is a integer value; it should be checked use LongParser + if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) { + if (dictBitmapColumnSet.contains(column.columnName.toLowerCase())) { + continue; + } else { + columnNameNeedCheckArrayList.add(column); + columnParserArrayList.add(new BigIntParser()); + } + } else if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") && !StringUtils.equalsIgnoreCase(column.columnType, "char") && !mappingColKeys.contains(column.columnName)) { columnNameNeedCheckArrayList.add(column); @@ -879,6 +907,7 @@ public final class SparkDpp implements java.io.Serializable { ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[columnParserArrayList.size()]); EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]); + StructType srcSchema = dataframe.schema(); JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() { @Override public Iterator<Row> call(Row row) throws Exception { @@ -898,6 +927,11 @@ public final class SparkDpp implements java.io.Serializable { if (isStrictMode) { validRow = false; LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s", column.columnName, row.toString())); + // a column parsed failed would be filled null, but if doris column is not allowed null, we should skip this row + } else if (!column.isAllowNull) { + validRow = false; + LOG.warn("column:" + i + " can not be null. row:" + row.toString()); + break; } else { columnIndexNeedToRepalceNull.add(fieldIndex); } @@ -909,7 +943,7 @@ public final class SparkDpp implements java.io.Serializable { if (abnormalRowAcc.value() <= 5) { invalidRows.add(row.toString()); } - } if (columnIndexNeedToRepalceNull.size() != 0) { + } else if (columnIndexNeedToRepalceNull.size() != 0) { Object[] newRow = new Object[row.size()]; for (int i = 0; i < row.size(); i++) { if (columnIndexNeedToRepalceNull.contains(i)) { @@ -926,7 +960,8 @@ public final class SparkDpp implements java.io.Serializable { } }); - return spark.createDataFrame(result, dstTableSchema); + // here we just check data but not do cast, so data type should be same with src schema which is hive table schema + return spark.createDataFrame(result, srcSchema); } private void process() throws Exception { @@ -934,6 +969,7 @@ public final class SparkDpp implements java.io.Serializable { for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) { Long tableId = entry.getKey(); EtlJobConfig.EtlTable etlTable = entry.getValue(); + Set<String> dictBitmapColumnSet = tableToBitmapDictColumns.getOrDefault(tableId, new HashSet<>()); // get the base index meta EtlJobConfig.EtlIndex baseIndex = null; @@ -982,7 +1018,8 @@ public final class SparkDpp implements java.io.Serializable { if (sourceType == EtlJobConfig.SourceType.FILE) { fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema); } else if (sourceType == EtlJobConfig.SourceType.HIVE) { - fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema); + fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema, + dictBitmapColumnSet); } else { throw new RuntimeException("Unknown source type: " + sourceType.name()); } diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java index 6fe8291..86bae31 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2.etl; +import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder; import org.apache.doris.load.loadv2.dpp.SparkDpp; import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn; @@ -53,6 +54,7 @@ public class SparkEtlJob { private static final String BITMAP_DICT_FUNC = "bitmap_dict"; private static final String TO_BITMAP_FUNC = "to_bitmap"; + private static final String BITMAP_HASH = "bitmap_hash"; private String jobConfigFilePath; private EtlJobConfig etlJobConfig; @@ -112,8 +114,11 @@ public class SparkEtlJob { String columnName = mappingEntry.getKey(); String exprStr = mappingEntry.getValue().toDescription(); String funcName = functions.expr(exprStr).expr().prettyName(); + if (funcName.equalsIgnoreCase(BITMAP_HASH)) { + throw new SparkDppException("spark load not support bitmap_hash now"); + } if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) { - bitmapDictColumns.add(columnName); + bitmapDictColumns.add(columnName.toLowerCase()); } else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) { newColumnMappings.put(mappingEntry.getKey(), mappingEntry.getValue()); } @@ -137,7 +142,7 @@ public class SparkEtlJob { } private void processDpp() throws Exception { - SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig); + SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig, tableToBitmapDictColumns); sparkDpp.init(); sparkDpp.doDpp(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org