[ 
https://issues.apache.org/jira/browse/FLINK-20015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satyam Shekhar updated FLINK-20015:
-----------------------------------
    Description: 
Hello,
  
 I have a table T0 with the following schema -

{{root}}
{{       |-- amount: BIGINT}}
{{       |-- timestamp: TIMESTAMP(3)}}
{{  }}
{{ The following two queries fail execution on the above table when executed in 
streaming mode using the Blink planner.}}
{{  }}
{{ WITH A AS (}}
{{   SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm}}
{{     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}}
{{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}}
{{  }}
{{ WITH A AS (}}
{{   SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) as 
tm}}
{{     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}}
{{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}}
{{  }}
{{ The two queries are very similar and only differ in their use of tumble_end 
and tumble_rowtime operator. Both queries use the timestamp column as their 
rowtime attribute. Casting "tm" column to timestamp makes both queries work -}}
{{  }}
{{ WITH A AS (}}
{{   SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) 
as TIMESTAMP(3)) as tm}}
{{     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}}
{{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}}
{{  }}
{{ This workaround, however, loses the rowtime attribute from the output 
resultset for the second query.}}
{{  }}
{{ The first query fails with the following exception -}}
{{  }}
{{ java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class 
java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; 
java.lang.Long is in module java.base of loader 'bootstrap')}}
{{ at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)}}
{{ at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)}}
{{ at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)}}
{{ at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}}
{{ at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}}
{{ at SinkConversion$166.processElement(Unknown Source)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)}}
{{ at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}}
{{ at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}}
{{ at StreamExecCalc$163.processElement(Unknown Source)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)}}
{{ at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}}
{{ at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}}
{{ at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)}}
{{ at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)}}
{{ at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)}}
{{ at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)}}
{{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)}}
{{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)}}
{{ at java.base/java.lang.Thread.run(Thread.java:834)}}
{{ Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be 
cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of 
loader 'platform'; java.lang.Long is in module java.base of loader 
'bootstrap')}}
{{ at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)}}
{{ at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192)}}
{{ at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58)}}
{{ at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)}}
{{ at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)}}
{{ at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)}}
{{ at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)}}
{{ at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)}}
{{ at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)}}
{{ at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)}}
{{ at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)}}
{{ at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)}}
{{  }}
{{ The second one fails with -}}
{{  }}
{{ java.lang.RuntimeException: Error while applying rule 
StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
[rel#606:FlinkLogicalJoin.LOGICAL.any.None: 
0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1, 
$3),joinType=left)]}}
{{ at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)}}
{{ at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)}}
{{ at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)}}
{{ at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)}}
{{ at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)}}
{{ at scala.collection.Iterator$class.foreach(Iterator.scala:891)}}
{{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)}}
{{ at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)}}
{{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
{{ at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)}}
{{ at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)}}
{{ at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)}}
{{ at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)}}
{{ at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)}}
{{ at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)}}
{{ at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)}}
{{ at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)}}
{{ at 
io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353)}}
{{ ....}}
{{ Caused by: java.util.NoSuchElementException: None.get}}
{{ at scala.None$.get(Option.scala:347)}}
{{ at scala.None$.get(Option.scala:345)}}
{{ at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72)}}
{{ at 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122)}}
{{ at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144)}}
{{ at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)}}

  was:
Hello,
 
I have a table T0 with the following schema -

root
      |-- amount: BIGINT
      |-- timestamp: TIMESTAMP(3)
 
The following two queries fail execution on the above table when executed in 
streaming mode using the Blink planner.
 
WITH A AS (
  SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm
    FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
 
WITH A AS (
  SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) as tm
    FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
 
The two queries are very similar and only differ in their use of tumble_end and 
tumble_rowtime operator. Both queries use timestamp column as their rowtime 
attribute. Casting "tm" column to timestamp makes both queries work -
 
WITH A AS (
  SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) as 
TIMESTAMP(3)) as tm
    FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
 
This workaround, however, loses the rowtime attribute from the output resultset 
for the second query.
 
The first query fails with the following exception -
 
java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class 
java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; 
java.lang.Long is in module java.base of loader 'bootstrap')
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at SinkConversion$166.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at StreamExecCalc$163.processElement(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)
 at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)
 at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be 
cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of 
loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
 at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
 at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 
The second one fails with -
 
java.lang.RuntimeException: Error while applying rule 
StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
[rel#606:FlinkLogicalJoin.LOGICAL.any.None: 
0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1, 
$3),joinType=left)]
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
 at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
 at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
 at 
io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353)
....
Caused by: java.util.NoSuchElementException: None.get
 at scala.None$.get(Option.scala:347)
 at scala.None$.get(Option.scala:345)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72)
 at 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122)
 at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144)
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)


> Failure to execute streaming query
> ----------------------------------
>
>                 Key: FLINK-20015
>                 URL: https://issues.apache.org/jira/browse/FLINK-20015
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.11.1
>            Reporter: Satyam Shekhar
>            Priority: Major
>
> Hello,
>   
>  I have a table T0 with the following schema -
> {{root}}
> {{       |-- amount: BIGINT}}
> {{       |-- timestamp: TIMESTAMP(3)}}
> {{  }}
> {{ The following two queries fail execution on the above table when executed 
> in streaming mode using the Blink planner.}}
> {{  }}
> {{ WITH A AS (}}
> {{   SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as 
> tm}}
> {{     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}}
> {{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}}
> {{  }}
> {{ WITH A AS (}}
> {{   SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) 
> as tm}}
> {{     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}}
> {{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}}
> {{  }}
> {{ The two queries are very similar and only differ in their use of 
> tumble_end and tumble_rowtime operator. Both queries use the timestamp column 
> as their rowtime attribute. Casting "tm" column to timestamp makes both 
> queries work -}}
> {{  }}
> {{ WITH A AS (}}
> {{   SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) 
> as TIMESTAMP(3)) as tm}}
> {{     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))}}
> {{ select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm}}
> {{  }}
> {{ This workaround, however, loses the rowtime attribute from the output 
> resultset for the second query.}}
> {{  }}
> {{ The first query fails with the following exception -}}
> {{  }}
> {{ java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to 
> class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 
> 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')}}
> {{ at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)}}
> {{ at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}}
> {{ at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}}
> {{ at SinkConversion$166.processElement(Unknown Source)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)}}
> {{ at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}}
> {{ at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}}
> {{ at StreamExecCalc$163.processElement(Unknown Source)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)}}
> {{ at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)}}
> {{ at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)}}
> {{ at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)}}
> {{ at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)}}
> {{ at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)}}
> {{ at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)}}
> {{ at java.base/java.lang.Thread.run(Thread.java:834)}}
> {{ Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot 
> be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of 
> loader 'platform'; java.lang.Long is in module java.base of loader 
> 'bootstrap')}}
> {{ at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)}}
> {{ at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192)}}
> {{ at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58)}}
> {{ at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)}}
> {{ at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)}}
> {{ at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)}}
> {{ at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)}}
> {{ at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)}}
> {{ at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)}}
> {{ at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)}}
> {{ at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)}}
> {{ at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)}}
> {{  }}
> {{ The second one fails with -}}
> {{  }}
> {{ java.lang.RuntimeException: Error while applying rule 
> StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
> [rel#606:FlinkLogicalJoin.LOGICAL.any.None: 
> 0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1, 
> $3),joinType=left)]}}
> {{ at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)}}
> {{ at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)}}
> {{ at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)}}
> {{ at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)}}
> {{ at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)}}
> {{ at scala.collection.Iterator$class.foreach(Iterator.scala:891)}}
> {{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)}}
> {{ at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)}}
> {{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
> {{ at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)}}
> {{ at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)}}
> {{ at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)}}
> {{ at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)}}
> {{ at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)}}
> {{ at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)}}
> {{ at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)}}
> {{ at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)}}
> {{ at 
> io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353)}}
> {{ ....}}
> {{ Caused by: java.util.NoSuchElementException: None.get}}
> {{ at scala.None$.get(Option.scala:347)}}
> {{ at scala.None$.get(Option.scala:345)}}
> {{ at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72)}}
> {{ at 
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122)}}
> {{ at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144)}}
> {{ at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to