刘钊 created KYLIN-5200: ------------------------- Summary: Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent Key: KYLIN-5200 URL: https://issues.apache.org/jira/browse/KYLIN-5200 Project: Kylin Issue Type: Bug Components: Metadata Affects Versions: v4.0.1 Reporter: 刘钊
I created a cube on kylin version 4.0.1. One of the measures is defined as raw. When I query after building, I find that there are inconsistencies between parquet schema and spark schema. When building cube, the raw measure written to parquet is processed with spark max, and the datatype of Max is child Datatype, in my cube, child Datatype is decimal (19,4). However, when I query through SQL, raw is uniformly specified as binarytype in tablescanpaln. Therefore, I wonder if the structtype of raw in tablescanpaln also uses child dataType ? when build ,Raw type is child.dataType @see org.apache.kylin.engine.spark.job.CuboidAggregator {code:java} measure.expression.toUpperCase(Locale.ROOT) match { case "MAX" => max(columns.head).as(id.toString) case "MIN" => min(columns.head).as(id.toString) case "SUM" => sum(columns.head).as(id.toString) case "COUNT" => if (reuseLayout) { sum(columns.head).as(id.toString) } else { count(columns.head).as(id.toString) } case "COUNT_DISTINCT" => // for test if (isSparkSql) { countDistinct(columns.head).as(id.toString) } else { val cdAggregate = getCountDistinctAggregate(columns, measure.returnType, reuseLayout) new Column(cdAggregate.toAggregateExpression()).as(id.toString) } case "TOP_N" => // Uses new TopN aggregate function // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala val schema = StructType(measure.pra.map { col => val dateType = col.dataType if (col == measure) { StructField(s"MEASURE_${col.columnName}", dateType) } else { StructField(s"DIMENSION_${col.columnName}", dateType) } }) if (reuseLayout) { new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr) .toAggregateExpression()).as(id.toString) } else { new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr)) .toAggregateExpression()).as(id.toString) } case "PERCENTILE_APPROX" => val udfName = UdfManager.register(measure.returnType.toKylinDataType, measure.expression, null, !reuseLayout) if (!reuseLayout) { callUDF(udfName, columns.head.cast(StringType)).as(id.toString) } else { callUDF(udfName, columns.head).as(id.toString) } case _ => max(columns.head).as(id.toString) // Raw matcher here,but max dataType is child.dataType } }.toSeq {code} But when query,Raw StructType is BinaryType. @see org.apache.kylin.query.runtime.plans.TableScanPlan ,org.apache.spark.sql.utils.SparkTypeUtil {code:java} def toSparkType(dataTp: DataType, isSum: Boolean = false): org.apache.spark.sql.types.DataType = { dataTp.getName match { // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType case "decimal" => if (isSum) { val i = dataTp.getPrecision + 10 DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale) } else DecimalType(dataTp.getPrecision, dataTp.getScale) case "date" => DateType case "time" => DateType case "timestamp" => TimestampType case "datetime" => DateType case "tinyint" => if (isSum) LongType else ByteType case "smallint" => if (isSum) LongType else ShortType case "integer" => if (isSum) LongType else IntegerType case "int4" => if (isSum) LongType else IntegerType case "bigint" => LongType case "long8" => LongType case "float" => if (isSum) DoubleType else FloatType case "double" => DoubleType case tp if tp.startsWith("varchar") => StringType case tp if tp.startsWith("char") => StringType case "dim_dc" => LongType case "boolean" => BooleanType case tp if tp.startsWith("hllc") => BinaryType case tp if tp.startsWith("bitmap") => BinaryType case tp if tp.startsWith("extendedcolumn") => BinaryType case tp if tp.startsWith("percentile") => BinaryType case tp if tp.startsWith("raw") => BinaryType case _ => throw new IllegalArgumentException(dataTp.toString) } } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)