Hi I have encountered a problem with Flink SQL.
My code: DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 ); tableEnv.registerDataSet( "table0", dataSet0 ); String sql = "select closePrice from table0" Table table = tableEnv.sql( sql ); tableEnv.registerTable( tableName, table ); DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class ); This works fine. But when I change SQL to "select distinct closePrice from table0" "tableEnv.toDataSet" throws exception: java.lang.AssertionError: Internal error: Error occurred while applying rule DataSetAggregateRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall. java:149) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java :214) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.ja va:825) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnviron ment.scala:253) at org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEn vironment.scala:146) at com.streamingedge.marketreport.analytics.flink.FlinkDataSetAnalytics.analize (FlinkDataSetAnalytics.java:96) at com.streamingedge.marketreport.webserver.AnalyticsServlet.processRequest(Ana lyticsServlet.java:117) at com.streamingedge.marketreport.webserver.AnalyticsServlet.doPost(AnalyticsSe rvlet.java:40) at com.streamingedge.marketreport.webserver.AnalyticsServlet.doGet(AnalyticsSer vlet.java:35) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java :229) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java :1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:427) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java: 193) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java: 1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135 ) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHand lerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1 16) at org.eclipse.jetty.server.Server.handle(Server.java:366) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo nnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC onnection.java:973) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet e(AbstractHttpConnection.java:1035) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:641) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:231) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java :82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint. java:696) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j ava:53) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java: 608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5 43) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.api.table.TableException: Unsupported data type encountered at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo wSize$2.apply(DataSetRel.scala:65) at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo wSize$2.apply(DataSetRel.scala:53) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:5 1) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scal a:60) at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47) at org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSi ze(DataSetRel.scala:53) at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSi ze(DataSetAggregate.scala:38) at org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCo st(DataSetAggregate.scala:80) at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulative Cost(RelMdPercentageOriginalRows.java:162) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMet adataQuery.java:258) at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1 134) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubs et.java:336) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubse t.java:319) at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.ja va:1838) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.j ava:1774) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java: 1038) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann er.java:1058) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann er.java:1950) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall. java:137) ... 35 more