[ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xueyu reassigned FLINK-10845: ----------------------------- Assignee: xueyu > Support DISTINCT aggregates for batch > ------------------------------------- > > Key: FLINK-10845 > URL: https://issues.apache.org/jira/browse/FLINK-10845 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: xueyu > Priority: Major > > Currently, we support distinct aggregates for streaming. However, executing > the same query on batch like the following test: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val sqlQuery = > "SELECT b, " + > " SUM(DISTINCT (a / 3)), " + > " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + > " COUNT(DISTINCT c) " + > "FROM MyTable " + > "GROUP BY b" > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > data.+=((2, 2L, "Hello")) > data.+=((3, 2L, "Hello world")) > data.+=((4, 3L, "Hello world, how are you?")) > data.+=((5, 3L, "I am fine.")) > data.+=((6, 3L, "Luke Skywalker")) > data.+=((7, 4L, "Comment#1")) > data.+=((8, 4L, "Comment#2")) > data.+=((9, 4L, "Comment#3")) > data.+=((10, 4L, "Comment#4")) > data.+=((11, 5L, "Comment#5")) > data.+=((12, 5L, "Comment#6")) > data.+=((13, 5L, "Comment#7")) > data.+=((14, 5L, "Comment#8")) > data.+=((15, 5L, "Comment#9")) > data.+=((16, 6L, "Comment#10")) > data.+=((17, 6L, "Comment#11")) > data.+=((18, 6L, "Comment#12")) > data.+=((19, 6L, "Comment#13")) > data.+=((20, 6L, "Comment#14")) > data.+=((21, 6L, "Comment#15")) > val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("MyTable", t) > tEnv.sqlQuery(sqlQuery).toDataSet[Row].print() > {code} > Fails with: > {code} > org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT > DISTINCT FROM > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at > org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476) > at > org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141) > at > org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50) > at > org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)