Hi I have encountered a problem with Flink SQL.

 

My code:

 

DataSet<MarketDataInfo> dataSet0 = env.fromCollection( infos0 );

tableEnv.registerDataSet( "table0", dataSet0 );

 

 

String sql = "select closePrice from table0"

 

Table table = tableEnv.sql( sql );

tableEnv.registerTable( tableName, table );

 

 

DataSet<Row> redyData = tableEnv.toDataSet( table, Row.class );

 

This works fine. 

 

But when I change SQL to "select distinct closePrice from table0"
"tableEnv.toDataSet" throws exception:

 

java.lang.AssertionError: Internal error: Error occurred while applying rule
DataSetAggregateRule

                at org.apache.calcite.util.Util.newInternal(Util.java:792)

                at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:149)

                at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)

                at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:118)

                at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java
:214)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.ja
va:825)

                at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)

                at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnviron
ment.scala:253)

                at
org.apache.flink.api.java.table.BatchTableEnvironment.toDataSet(BatchTableEn
vironment.scala:146)

                at
com.streamingedge.marketreport.analytics.flink.FlinkDataSetAnalytics.analize
(FlinkDataSetAnalytics.java:96)

                at
com.streamingedge.marketreport.webserver.AnalyticsServlet.processRequest(Ana
lyticsServlet.java:117)

                at
com.streamingedge.marketreport.webserver.AnalyticsServlet.doPost(AnalyticsSe
rvlet.java:40)

                at
com.streamingedge.marketreport.webserver.AnalyticsServlet.doGet(AnalyticsSer
vlet.java:35)

                at
javax.servlet.http.HttpServlet.service(HttpServlet.java:707)

                at
javax.servlet.http.HttpServlet.service(HttpServlet.java:820)

                at
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)

                at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)

                at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java
:229)

                at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1086)

                at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:427)

                at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:
193)

                at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
1020)

                at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)

                at
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHand
lerCollection.java:255)

                at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)

                at org.eclipse.jetty.server.Server.handle(Server.java:366)

                at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:494)

                at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:973)

                at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:1035)

                at
org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:641)

                at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:231)

                at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)

                at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:696)

                at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
ava:53)

                at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:
608)

                at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5
43)

                at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.flink.api.table.TableException: Unsupported data type
encountered

                at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:65)

                at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$$anonfun$estimateRo
wSize$2.apply(DataSetRel.scala:53)

                at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:5
1)

                at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scal
a:60)

                at
scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)

                at
org.apache.flink.api.table.plan.nodes.dataset.DataSetRel$class.estimateRowSi
ze(DataSetRel.scala:53)

                at
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.estimateRowSi
ze(DataSetAggregate.scala:38)

                at
org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate.computeSelfCo
st(DataSetAggregate.scala:80)

                at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulative
Cost(RelMdPercentageOriginalRows.java:162)

                at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)

                at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)

                at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMet
adataQuery.java:258)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1
134)

                at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubs
et.java:336)

                at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubse
t.java:319)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.ja
va:1838)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.j
ava:1774)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:
1038)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann
er.java:1058)

                at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlann
er.java:1950)

                at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.
java:137)

                ... 35 more

 

 

 

 

 

Reply via email to