[ 
https://issues.apache.org/jira/browse/FLINK-25095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452139#comment-17452139
 ] 

xiangqiao commented on FLINK-25095:
-----------------------------------

I verified that if default dialect is used, the execution can be successful.

This is my unit test which set dialect to default before executing SQL:

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
{code:java}
@Test
public void testCodeGenFunctionArgumentType() throws Exception {
    TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
    tableEnv.loadModule("hive", new HiveModule());
    tableEnv.useModules("hive", "core");

    tableEnv.executeSql("create database db1");
    try {
        tableEnv.useDatabase("db1");
        tableEnv.executeSql("create table src1(key string, val string)");
        HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1")
                .addRow(new Object[] {"1", "val1"})
                .addRow(new Object[] {"2", "val2"})
                .addRow(new Object[] {"3", "val3"})
                .commit();
        
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        List<Row> results =
                CollectionUtil.iteratorToList(
                        tableEnv.executeSql(
                                        "select t.key, count(case when t.num=1 
then 1 else null end) from "
                                                + "(select key,count(case when 
key='1' then 1 else null end) as num from src1 group by key,val) t "
                                                + "group by t.key")
                                .collect());
        assertEquals("[+I[1, 1], +I[2, 0], +I[3, 0]]", results.toString());
    } finally {
        tableEnv.useDatabase("default");
        tableEnv.executeSql("drop database db1 cascade");
    }
} {code}
The logica plan when use default dialect:
{code:java}
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+- LogicalProject(key=[$0], $f1=[CASE(=($2, 1), 1, null:INTEGER)])
   +- LogicalAggregate(group=[{0, 1}], num=[COUNT($2)])
      +- LogicalProject(key=[$0], val=[$1], $f2=[CASE(=($0, _UTF-16LE'1'), 1, 
null:INTEGER)])
         +- LogicalTableScan(table=[[test-catalog, db1, src1]])== Optimized 
Physical Plan ==
HashAggregate(isMerge=[true], groupBy=[key], select=[key, Final_COUNT(count$0) 
AS EXPR$1])
+- Exchange(distribution=[hash[key]])
   +- LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT($f1) AS 
count$0])
      +- Calc(select=[key, CASE(=(num, 1:BIGINT), 1, null:INTEGER) AS $f1])
         +- HashAggregate(isMerge=[true], groupBy=[key, val], select=[key, val, 
Final_COUNT(count$0) AS num])
            +- Exchange(distribution=[hash[key, val]])
               +- LocalHashAggregate(groupBy=[key, val], select=[key, val, 
Partial_COUNT($f2) AS count$0])
                  +- Calc(select=[key, val, CASE(=(key, 
_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 1, null:INTEGER) AS 
$f2])
                     +- TableSourceScan(table=[[test-catalog, db1, src1]], 
fields=[key, val])
 {code}
 

> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'BOOLEAN NOT NULL' and actual argument type 
> 'BOOLEAN'.
> ---------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25095
>                 URL: https://issues.apache.org/jira/browse/FLINK-25095
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive, Table SQL / Planner
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: xiangqiao
>            Priority: Major
>              Labels: pull-request-available
>
> When we {*}use blink planner{*}'s *batch mode* and set {*}hive dialect{*}. 
> This exception will be reported when the *subquery field* is used in {*}case 
> when{*}.
> {code:java}
> org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of 
> function's argument data type 'BOOLEAN NOT NULL' and actual argument type 
> 'BOOLEAN'.    at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:326)
>     at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:323)
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyArgumentTypes(BridgingFunctionGenUtil.scala:323)
>     at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:98)
>     at 
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:65)
>     at 
> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:73)
>     at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:811)
>     at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:501)
>     at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>     at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>     at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
>     at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$2.apply(CalcCodeGenerator.scala:152)
>     at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$2.apply(CalcCodeGenerator.scala:152)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>     at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:177)
>     at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:50)
>     at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:95)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:84)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.java:103)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:84)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:83)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:82)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:82)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1524)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:797)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1233)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:734)
>     at 
> org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase.testCodeGenFunctionArgumentType(TableEnvHiveConnectorITCase.java:175)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>     at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>     at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Process finished with exit code 255
>  {code}
> This problem can be reproduced by adding a unit test 
> {*}TableEnvHiveConnectorITCase#{*}{*}testCodeGenFunctionArgumentType{*}{*}:{*}
> {code:java}
> @Test
> public void testCodeGenFunctionArgumentType() throws Exception {
>     TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
>     tableEnv.loadModule("hive", new HiveModule());
>     tableEnv.useModules("hive", "core");
>     tableEnv.executeSql("create database db1");
>     try {
>         tableEnv.useDatabase("db1");
>         tableEnv.executeSql("create table src1(key string, val string)");
>         HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1")
>                 .addRow(new Object[] {"1", "val1"})
>                 .addRow(new Object[] {"2", "val2"})
>                 .addRow(new Object[] {"3", "val3"})
>                 .commit();
>         List<Row> results =
>                 CollectionUtil.iteratorToList(
>                         tableEnv.executeSql(
>                                         "select t.key, count(case when 
> t.num=1 then 1 else null end) from "
>                                                 + "(select key,count(case 
> when key='1' then 1 else null end) as num from src1 group by key,val) t "
>                                                 + "group by t.key")
>                                 .collect());
>         assertEquals("[+I[1, 1], +I[2, 0], +I[3, 0]]", results.toString());
>     } finally {
>         tableEnv.useDatabase("default");
>         tableEnv.executeSql("drop database db1 cascade");
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to