[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735286#comment-15735286
 ] 

ASF GitHub Bot commented on FLINK-5220:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2923#discussion_r91711313
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
 ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.rules.util
    +
    +import java.math.BigDecimal
    +
    +import org.apache.calcite.adapter.java.JavaTypeFactory
    +import org.apache.calcite.jdbc.JavaTypeFactoryImpl
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
    +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, 
VARCHAR}
    +import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram, 
RexProgramBuilder}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +
    +import scala.collection.JavaConverters._
    +import 
org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
    +import org.junit.{Assert, Before, Test}
    +
    +/**
    +  * This class is responsible for testing RexProgramProjectExtractor
    +  */
    +class RexProgramProjectExtractorTest {
    +  private var typeFactory: JavaTypeFactory = null
    +  private var rexBuilder: RexBuilder = null
    +  private var allFieldTypes: Seq[RelDataType] = null
    +  private val allFieldNames = List("name", "id", "amount", "price")
    +
    +  @Before
    +  def setUp: Unit = {
    +    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
    +    rexBuilder = new RexBuilder(typeFactory)
    +    allFieldTypes = List(VARCHAR, BIGINT, INTEGER, 
DOUBLE).map(typeFactory.createSqlType(_))
    +  }
    +
    +  @Test
    +  def testExtractRefInputFields: Unit = {
    +    val usedFields = extractRefInputFields(buildRexProgram)
    +    Assert.assertArrayEquals(usedFields, Array(2, 1, 3))
    +  }
    +
    +  @Test
    +  def testRewriteRexProgram: Unit = {
    +    val originRexProgram = buildRexProgram
    +    
Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
    +      "$0",
    +      "$1",
    +      "$2",
    +      "$3",
    +      "*($t2, $t3)",
    +      "100",
    +      "<($t4, $t5)",
    +      "6",
    +      ">($t2, $t7)",
    +      "AND($t6, $t8)")))
    +    // use amount, id, price fields to create a new RexProgram
    +    val usedFields = Array(2, 1, 3)
    +    val types = usedFields.map(allFieldTypes(_)).toList.asJava
    +    val names = usedFields.map(allFieldNames(_)).toList.asJava
    +    val inputRowType = typeFactory.createStructType(types, names)
    +    val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, 
usedFields, rexBuilder)
    +    Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
    +      "$0",
    +      "$1",
    +      "$2",
    +      "*($t0, $t2)",
    +      "100",
    +      "<($t3, $t4)",
    +      "6",
    +      ">($t0, $t6)",
    +      "AND($t5, $t7)")))
    +  }
    +
    +  private def buildRexProgram: RexProgram = {
    +    val types = allFieldTypes.asJava
    +    val names = allFieldNames.asJava
    +    val inputRowType = typeFactory.createStructType(types, names)
    +    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
    +    val t0 = rexBuilder.makeInputRef(types.get(2), 2)
    +    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
    +    val t2 = rexBuilder.makeInputRef(types.get(3), 3)
    +    val t3 = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
    +    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
    +    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
    +    // project: amount, id, amount * price
    +    builder.addProject(t0, "amount")
    +    builder.addProject(t1, "id")
    +    builder.addProject(t3, "total")
    +    // condition: amount * price < 100 and amount > 6
    --- End diff --
    
    would be good to reference an input field in the condition that has not 
been referenced by the projection.


> Flink SQL projection pushdown
> -----------------------------
>
>                 Key: FLINK-5220
>                 URL: https://issues.apache.org/jira/browse/FLINK-5220
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to