wyb commented on a change in pull request #4524:
URL: https://github.com/apache/incubator-doris/pull/4524#discussion_r483936102
##########
File path:
fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
##########
@@ -186,4 +192,61 @@ public boolean parse(String value) {
throw new RuntimeException("string check failed ", e);
}
}
+}
+
+class DecimalParser extends ColumnParser {
+
+ public static int PRECISION = 27;
+ public static int SCALE = 9;
+
+ private BigDecimal maxValue;
+ private BigDecimal minValue;
+
+ public DecimalParser(EtlJobConfig.EtlColumn etlColumn) {
+ StringBuilder precisionStr = new StringBuilder();
+ for (int i = 0; i < etlColumn.precision; i++) {
+ precisionStr.append("9");
+ }
+ StringBuilder scaleStr = new StringBuilder();
+ for (int i = 0; i < etlColumn.scale; i++) {
+ scaleStr.append("9");
+ }
+ maxValue = new BigDecimal(precisionStr.toString() + "." +
scaleStr.toString());
+ minValue = new BigDecimal("-" + precisionStr.toString() + "." +
scaleStr.toString());
+ }
+
+ @Override
+ public boolean parse(String value) {
+ try {
+ BigDecimal bigDecimal = new BigDecimal(value);
+ return bigDecimal.precision() - bigDecimal.scale() <= PRECISION -
SCALE && bigDecimal.scale() <= SCALE;
+ } catch (NumberFormatException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("decimal parse failed ", e);
+ }
+ }
+
+ public BigDecimal getMaxValue() {
+ return maxValue;
+ }
+
+ public BigDecimal getMinValue() {
+ return minValue;
+ }
+}
+
+class LargeIntParser extends ColumnParser {
+
+ @Override
+ public boolean parse(String value) {
+ try {
+ BigInteger bigInteger = new BigInteger(value);
Review comment:
Check largeint type range
##########
File path:
fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
##########
@@ -507,17 +558,6 @@ private void processRollupTree(RollupTreeNode rootNode,
dataframe = dataframe.withColumn(mappingColumn,
functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
}
- // projection and reorder the columns
- dataframe.createOrReplaceTempView("src_table");
- StringBuilder selectSqlBuilder = new StringBuilder();
- selectSqlBuilder.append("select ");
- for (String name : dstColumnNames) {
- selectSqlBuilder.append(name + ",");
- }
- selectSqlBuilder.deleteCharAt(selectSqlBuilder.length() - 1);
- selectSqlBuilder.append(" from src_table");
- String selectSql = selectSqlBuilder.toString();
- dataframe = spark.sql(selectSql);
Review comment:
Don't delete. Source data schema may be different from load table schema.
These code is used for projection and reorder columns according to columns
order in table schema.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]