[ https://issues.apache.org/jira/browse/FLINK-20015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Satyam Shekhar updated FLINK-20015: ----------------------------------- Description: Hello, I have a table T0 with the following schema - root |-- amount: BIGINT |-- timestamp: TIMESTAMP(3) The following two queries fail execution on the above table when executed in streaming mode using the Blink planner. WITH A AS ( SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm WITH A AS ( SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) as tm FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm The two queries are very similar and only differ in their use of tumble_end and tumble_rowtime operator. Both queries use the timestamp column as their rowtime attribute. Casting "tm" column to timestamp makes both queries work - WITH A AS ( SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) as TIMESTAMP(3)) as tm FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm This workaround, however, loses the rowtime attribute from the output resultset for the second query. The first query fails with the following exception - java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap') at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at SinkConversion$166.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$163.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap') at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) The second one fails with - java.lang.RuntimeException: Error while applying rule StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#606:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1, $3),joinType=left)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213) at io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353) .... Caused by: java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72) at org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217) was: Hello, I have a table T0 with the following schema - {{root}} {{ |-- amount: BIGINT}} {{ |-- timestamp: TIMESTAMP(3)}} {{ }} {{ The following two queries fail execution on the above table when executed in streaming mode using the Blink planner.}} {{ }} {{ WITH A AS (}} {{ SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm}} {{ FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}} {{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}} {{ }} {{ WITH A AS (}} {{ SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) as tm}} {{ FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}} {{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}} {{ }} {{ The two queries are very similar and only differ in their use of tumble_end and tumble_rowtime operator. Both queries use the timestamp column as their rowtime attribute. Casting "tm" column to timestamp makes both queries work -}} {{ }} {{ WITH A AS (}} {{ SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) as TIMESTAMP(3)) as tm}} {{ FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}} {{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}} {{ }} {{ This workaround, however, loses the rowtime attribute from the output resultset for the second query.}} {{ }} {{ The first query fails with the following exception -}} {{ }} {{ java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')}} {{ at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)}} {{ at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)}} {{ at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)}} {{ at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}} {{ at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}} {{ at SinkConversion$166.processElement(Unknown Source)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)}} {{ at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}} {{ at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}} {{ at StreamExecCalc$163.processElement(Unknown Source)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)}} {{ at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}} {{ at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}} {{ at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)}} {{ at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)}} {{ at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)}} {{ at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)}} {{ at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)}} {{ at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)}} {{ at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)}} {{ at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)}} {{ at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)}} {{ at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)}} {{ at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)}} {{ at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)}} {{ at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)}} {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)}} {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)}} {{ at java.base/java.lang.Thread.run(Thread.java:834)}} {{ Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')}} {{ at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)}} {{ at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192)}} {{ at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58)}} {{ at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)}} {{ at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)}} {{ at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)}} {{ at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)}} {{ at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)}} {{ at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)}} {{ at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)}} {{ at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)}} {{ at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)}} {{ }} {{ The second one fails with -}} {{ }} {{ java.lang.RuntimeException: Error while applying rule StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#606:FlinkLogicalJoin.LOGICAL.any.None: 0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1, $3),joinType=left)]}} {{ at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)}} {{ at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)}} {{ at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)}} {{ at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)}} {{ at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)}} {{ at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)}} {{ at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)}} {{ at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)}} {{ at scala.collection.Iterator$class.foreach(Iterator.scala:891)}} {{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)}} {{ at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)}} {{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}} {{ at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)}} {{ at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)}} {{ at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)}} {{ at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)}} {{ at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)}} {{ at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)}} {{ at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)}} {{ at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)}} {{ at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)}} {{ at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)}} {{ at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)}} {{ at io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353)}} {{ ....}} {{ Caused by: java.util.NoSuchElementException: None.get}} {{ at scala.None$.get(Option.scala:347)}} {{ at scala.None$.get(Option.scala:345)}} {{ at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72)}} {{ at org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122)}} {{ at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144)}} {{ at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)}} > Failure to execute streaming query > ---------------------------------- > > Key: FLINK-20015 > URL: https://issues.apache.org/jira/browse/FLINK-20015 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.11.1 > Reporter: Satyam Shekhar > Priority: Major > > Hello, > > I have a table T0 with the following schema - > root > |-- amount: BIGINT > |-- timestamp: TIMESTAMP(3) > > The following two queries fail execution on the above table when executed in > streaming mode using the Blink planner. > > WITH A AS ( > SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm > FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) > select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm > > WITH A AS ( > SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) as > tm > FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) > select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm > > The two queries are very similar and only differ in their use of tumble_end > and tumble_rowtime operator. Both queries use the timestamp column as their > rowtime attribute. Casting "tm" column to timestamp makes both queries work - > > WITH A AS ( > SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) > as TIMESTAMP(3)) as tm > FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) > select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm > > This workaround, however, loses the rowtime attribute from the output > resultset for the second query. > > The first query fails with the following exception - > > java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class > java.lang.Long (java.sql.Timestamp is in module java.sql of loader > 'platform'; java.lang.Long is in module java.base of loader 'bootstrap') > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at SinkConversion$166.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at StreamExecCalc$163.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > at > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) > at > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206) > at > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be > cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of > loader 'platform'; java.lang.Long is in module java.base of loader > 'bootstrap') > at > org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > > The second one fails with - > > java.lang.RuntimeException: Error while applying rule > StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args > [rel#606:FlinkLogicalJoin.LOGICAL.any.None: > 0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1, > $3),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213) > at > io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353) > .... > Caused by: java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72) > at > org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217) -- This message was sent by Atlassian Jira (v8.3.4#803005)