[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138764#comment-15138764 ]
ASF GitHub Bot commented on FLINK-3226: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1595#discussion_r52291201 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.Indenter._ +import org.apache.flink.api.table.codegen.OperatorCodeGen._ +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +class CodeGenerator( + config: TableConfig, + input1: TypeInformation[Any], + input2: Option[TypeInformation[Any]] = None) + extends RexVisitor[GeneratedExpression] { + + // set of member statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableMemberStatements = mutable.LinkedHashSet[String]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableInitStatements = mutable.LinkedHashSet[String]() + + // map of initial input unboxing expressions that will be added only once + // (inputTerm, index) -> expr + private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]() + + def reuseMemberCode(): String = { + reusableMemberStatements.mkString("", "\n", "\n") + } + + def reuseInitCode(): String = { + reusableInitStatements.mkString("", "\n", "\n") + } + + def reuseInputUnboxingCode(): String = { + reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n") + } + + def input1Term = "in1" + + def input2Term = "in2" + + def collectorTerm = "c" + + def outRecordTerm = "out" + + def nullCheck: Boolean = config.getNullCheck + + def generateExpression(rex: RexNode): GeneratedExpression = { + rex.accept(this) + } + + def generateFunction[T <: Function]( + name: String, + clazz: Class[T], + bodyCode: String, + returnType: TypeInformation[Any]) + : GeneratedFunction[T] = { + val funcName = newName(name) + + // Janino does not support generics, that's why we need + // manual casting here + val samHeader = + if (clazz == classOf[FlatMapFunction[_,_]]) { + val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", + s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + } else if (clazz == classOf[MapFunction[_,_]]) { + val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + ("Object map(Object _in1)", + s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + } else { + // TODO more functions + throw new CodeGenException("Unsupported Function.") + } + + val funcCode = j""" + public class $funcName + implements ${clazz.getCanonicalName} { + + ${reuseMemberCode()} + + public $funcName() { + ${reuseInitCode()} + } + + @Override + public ${samHeader._1} { + ${samHeader._2} + ${reuseInputUnboxingCode()} + $bodyCode + } + } + """.stripMargin + + GeneratedFunction(funcName, returnType, funcCode) + } + + def generateConverterResultExpression( + returnType: TypeInformation[_ <: Any]) + : GeneratedExpression = { + val input1AccessExprs = for (i <- 0 until input1.getArity) + yield generateInputAccess(input1, input1Term, i) + + val input2AccessExprs = input2 match { + case Some(ti) => for (i <- 0 until ti.getArity) + yield generateInputAccess(ti, input2Term, i) + case None => Seq() // add nothing + } + + generateResultExpression(input1AccessExprs ++ input2AccessExprs, returnType) + } + + def generateResultExpression( + returnType: TypeInformation[_ <: Any], + rexNodes: Seq[RexNode]) + : GeneratedExpression = { + val fieldExprs = rexNodes.map(generateExpression) + generateResultExpression(fieldExprs, returnType) + } + + def generateResultExpression( + fieldExprs: Seq[GeneratedExpression], + returnType: TypeInformation[_ <: Any]) + : GeneratedExpression = { + // initial type check + if (returnType.getArity != fieldExprs.length) { + throw new CodeGenException("Arity of result type does not match number of expressions.") + } + // type check + returnType match { + case ct: CompositeType[_] => + fieldExprs.zipWithIndex foreach { + case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) => + throw new CodeGenException("Incompatible types of expression and result type.") + case _ => // ok + } + case at: AtomicType[_] if at != fieldExprs.head.resultType => + throw new CodeGenException("Incompatible types of expression and result type.") + case _ => // ok + } + + val returnTypeTerm = boxedTypeTermForTypeInfo(returnType) + + // generate result expression + returnType match { + case ri: RowTypeInfo => + addReusableOutRecord(ri) + val resultSetters: String = fieldExprs.zipWithIndex map { + case (fieldExpr, i) => + if (nullCheck) { + s""" + |${fieldExpr.code} + |if (${fieldExpr.nullTerm}) { + | $outRecordTerm.setField($i, null); + |} + |else { + | $outRecordTerm.setField($i, ${fieldExpr.resultTerm}); + |} + |""".stripMargin + } + else { + s""" + |${fieldExpr.code} + |$outRecordTerm.setField($i, ${fieldExpr.resultTerm}); + |""".stripMargin + } + } mkString "\n" + + GeneratedExpression(outRecordTerm, "false", resultSetters, returnType) + + case pj: PojoTypeInfo[_] => + addReusableOutRecord(pj) + val resultSetters: String = fieldExprs.zip(pj.getFieldNames) map { + case (fieldExpr, fieldName) => + if (nullCheck) { + s""" + |${fieldExpr.code} + |if (${fieldExpr.nullTerm}) { + | $outRecordTerm.$fieldName = null; + |} + |else { + | $outRecordTerm.$fieldName = ${fieldExpr.resultTerm}; + |} + |""".stripMargin + } + else { + s""" + |${fieldExpr.code} + |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm}; + |""".stripMargin + } + } mkString "\n" + + GeneratedExpression(outRecordTerm, "false", resultSetters, returnType) + + case tup: TupleTypeInfo[_] => + addReusableOutRecord(tup) + val resultSetters: String = fieldExprs.zipWithIndex map { + case (fieldExpr, i) => + val fieldName = "f" + i + if (nullCheck) { + s""" + |${fieldExpr.code} + |if (${fieldExpr.nullTerm}) { + | throw new NullPointerException("Null result cannot be stored in a Tuple."); + |} + |else { + | $outRecordTerm.$fieldName = ${fieldExpr.resultTerm}; + |} + |""".stripMargin + } + else { + s""" + |${fieldExpr.code} + |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm}; + |""".stripMargin + } + } mkString "\n" + + GeneratedExpression(outRecordTerm, "false", resultSetters, returnType) + + case cc: CaseClassTypeInfo[_] => + val fieldCodes: String = fieldExprs.map(_.code).mkString("\n") + val constructorParams: String = fieldExprs.map(_.resultTerm).mkString(", ") + val resultTerm = newName(outRecordTerm) + + val nullCheckCode = if (nullCheck) { + fieldExprs map { (fieldExpr) => + s""" + |if (${fieldExpr.nullTerm}) { + | throw new NullPointerException("Null result cannot be stored in a Case Class."); + |} + |""".stripMargin + } mkString "\n" + } else { + "" + } + + val resultCode = + s""" + |$fieldCodes + |$nullCheckCode + |$returnTypeTerm $resultTerm = new $returnTypeTerm($constructorParams); + |""".stripMargin + + GeneratedExpression(resultTerm, "false", resultCode, returnType) + + case a: AtomicType[_] => + val fieldExpr = fieldExprs.head + val nullCheckCode = if (nullCheck) { + s""" + |if (${fieldExpr.nullTerm}) { + | throw new NullPointerException("Null result cannot be used for atomic types."); + |} + |""".stripMargin + } else { + "" + } + val resultCode = + s""" + |${fieldExpr.code} + |$nullCheckCode + |""".stripMargin + + GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, returnType) + + case _ => + throw new CodeGenException(s"Unsupported result type: $returnType") + } + } + + // ---------------------------------------------------------------------------------------------- + + override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = { + // if inputRef index is within size of input1 we work with input1, input2 otherwise + val input = if (inputRef.getIndex < input1.getArity) { + (input1, input1Term) + } else { + (input2.getOrElse(throw new CodeGenException("Invalid input access.")), input2Term) + } + + val index = if (input._1 == input1) { + inputRef.getIndex + } else { + inputRef.getIndex - input1.getArity + } + + generateInputAccess(input._1, input._2, index) + } + + override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = ??? + + override def visitLiteral(literal: RexLiteral): GeneratedExpression = { + val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName) + val value = literal.getValue3 + literal.getType.getSqlTypeName match { + case BOOLEAN => + generateNonNullLiteral(resultType, literal.getValue3.toString) + case TINYINT => + val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) + if (decimal.isValidByte) { + generateNonNullLiteral(resultType, decimal.byteValue().toString) + } + else { + throw new CodeGenException("Decimal can not be converted to byte.") + } + case SMALLINT => + val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) + if (decimal.isValidShort) { + generateNonNullLiteral(resultType, decimal.shortValue().toString) + } + else { + throw new CodeGenException("Decimal can not be converted to short.") + } + case INTEGER => + val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) + if (decimal.isValidShort) { + generateNonNullLiteral(resultType, decimal.intValue().toString) + } + else { + throw new CodeGenException("Decimal can not be converted to integer.") + } + case BIGINT => + val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) + if (decimal.isValidLong) { + generateNonNullLiteral(resultType, decimal.longValue().toString) + } + else { + throw new CodeGenException("Decimal can not be converted to long.") + } + case FLOAT => + val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) + if (decimal.isValidFloat) { + generateNonNullLiteral(resultType, decimal.floatValue().toString + "f") + } + else { + throw new CodeGenException("Decimal can not be converted to float.") + } + case DOUBLE => + val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) + if (decimal.isValidDouble) { + generateNonNullLiteral(resultType, decimal.doubleValue().toString) + } + else { + throw new CodeGenException("Decimal can not be converted to double.") + } + case VARCHAR | CHAR => + generateNonNullLiteral(resultType, value.toString) --- End diff -- Do we need quotes around the string value? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > --------------------------------------------------------------------------------------------- > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API > Reporter: Fabian Hueske > Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)