[ https://issues.apache.org/jira/browse/FLINK-30420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiang Xin updated FLINK-30420: ------------------------------ Description: Run the following unit test. {code:java} import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.junit.Before; import org.junit.Test; import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.List; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; import static org.apache.flink.table.api.Expressions.currentTimestamp; import static org.apache.flink.table.api.Expressions.lit; public class TestWindowTime { private StreamTableEnvironment tEnv; private StreamExecutionEnvironment env; @Before public void before() { Configuration config = new Configuration(); env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); tEnv = StreamTableEnvironment.create(env); } @Test public void testWindowTime() { DataStream<Integer> stream = env.addSource( new DataGeneratorSource<>( SequenceGenerator.intGenerator(0, 30), 1, 30l)) .returns(Integer.class); DataStream<Tuple2<Integer, Long>> streamWithTime = stream.map(x -> Tuple2.of(x, System.currentTimeMillis())) .returns(Types.TUPLE(Types.INT, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<Integer, Long>>forBoundedOutOfOrderness( Duration.ofSeconds(2)) .withTimestampAssigner( (ctx) -> (element, recordTimestamp) -> element.f1)); Schema schema = Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.BIGINT()) .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build(); Table table = tEnv.fromDataStream(streamWithTime, schema); table = table.select($("rowtime")); Table windowedTable = table.window(Tumble.over("5.seconds").on("rowtime").as("w")) .groupBy($("w")) .select( call(UDAF.class, $("rowtime")).as("row_times"), $("w").rowtime().as("window_time"), currentTimestamp().as("current_timestamp")); windowedTable = windowedTable .joinLateral(call(SplitFunction.class, $("row_times")).as("rowtime")) .select( $("rowtime").cast(TIMESTAMP(3)).as("rowtime"), $("window_time"), $("current_timestamp")); windowedTable.printSchema(); windowedTable.execute().print(); } public static class SplitFunction extends TableFunction<Timestamp> { public void eval(List<Timestamp> times) { for (int i = 0; i < times.size(); i++) { collect(times.get(i)); } } } public static class UDAF extends AggregateFunction<List<Timestamp>, List<Timestamp>> { public UDAF() {} @Override public List<Timestamp> createAccumulator() { return new ArrayList<>(); } public void accumulate(List<Timestamp> accumulator, Timestamp num) { accumulator.add(num); } @Override public List<Timestamp> getValue(List<Timestamp> accumulator) { return accumulator; } } } {code} Then the following exception occurs. {code:java} java.lang.NullPointerException at org.apache.calcite.sql2rel.RelDecorrelator.getNewForOldInputRef(RelDecorrelator.java:1359) at org.apache.calcite.sql2rel.RelDecorrelator.access$400(RelDecorrelator.java:122) at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1638) at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1595) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateExpr(RelDecorrelator.java:348) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:759) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1170) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:734) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:391) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelate(RelDecorrelator.java:276) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:200) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:165) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:175) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:82) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:75) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:307) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:187) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) at org.apache.flink.ml.clustering.TestAggWithSourceWatermark.testWindowTime(TestAggWithSourceWatermark.java:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) {code} was: Run the following unit test. {code:java} import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.junit.Before; import org.junit.Test; import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.List; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; import static org.apache.flink.table.api.Expressions.currentTimestamp; import static org.apache.flink.table.api.Expressions.lit; public class TestAggWithSourceWatermark { private StreamTableEnvironment tEnv; private StreamExecutionEnvironment env; @Before public void before() { Configuration config = new Configuration(); env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); tEnv = StreamTableEnvironment.create(env); } @Test public void testWindowTime() { DataStream<Integer> stream = env.addSource( new DataGeneratorSource<>( SequenceGenerator.intGenerator(0, 30), 1, 30l)) .returns(Integer.class); DataStream<Tuple2<Integer, Long>> streamWithTime = stream.map(x -> Tuple2.of(x, System.currentTimeMillis())) .returns(Types.TUPLE(Types.INT, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<Integer, Long>>forBoundedOutOfOrderness( Duration.ofSeconds(2)) .withTimestampAssigner( (ctx) -> (element, recordTimestamp) -> element.f1)); Schema schema = Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.BIGINT()) .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build(); Table table = tEnv.fromDataStream(streamWithTime, schema); table = table.select($("rowtime")); Table windowedTable = table.window(Tumble.over("5.seconds").on("rowtime").as("w")) .groupBy($("w")) .select( call(UDAF.class, $("rowtime")).as("row_times"), $("w").rowtime().as("window_time"), currentTimestamp().as("current_timestamp")); windowedTable = windowedTable .joinLateral(call(SplitFunction.class, $("row_times")).as("rowtime")) .select( $("rowtime").cast(TIMESTAMP(3)).as("rowtime"), $("window_time"), $("current_timestamp")); windowedTable.printSchema(); windowedTable.execute().print(); } public static class SplitFunction extends TableFunction<Timestamp> { public void eval(List<Timestamp> times) { for (int i = 0; i < times.size(); i++) { collect(times.get(i)); } } } public static class UDAF extends AggregateFunction<List<Timestamp>, List<Timestamp>> { public UDAF() {} @Override public List<Timestamp> createAccumulator() { return new ArrayList<>(); } public void accumulate(List<Timestamp> accumulator, Timestamp num) { accumulator.add(num); } @Override public List<Timestamp> getValue(List<Timestamp> accumulator) { return accumulator; } } } {code} Then the following exception occurs. {code:java} java.lang.NullPointerException at org.apache.calcite.sql2rel.RelDecorrelator.getNewForOldInputRef(RelDecorrelator.java:1359) at org.apache.calcite.sql2rel.RelDecorrelator.access$400(RelDecorrelator.java:122) at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1638) at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1595) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateExpr(RelDecorrelator.java:348) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:759) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1170) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:734) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:391) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelate(RelDecorrelator.java:276) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:200) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:165) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:175) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:82) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:75) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:307) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:187) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) at org.apache.flink.ml.clustering.TestAggWithSourceWatermark.testWindowTime(TestAggWithSourceWatermark.java:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) {code} > NPE thrown when using window time in Table API > ---------------------------------------------- > > Key: FLINK-30420 > URL: https://issues.apache.org/jira/browse/FLINK-30420 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Reporter: Jiang Xin > Priority: Major > > Run the following unit test. > {code:java} > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; > import > org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.Schema; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Tumble; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.functions.AggregateFunction; > import org.apache.flink.table.functions.TableFunction; > import org.junit.Before; > import org.junit.Test; > import java.sql.Timestamp; > import java.time.Duration; > import java.util.ArrayList; > import java.util.List; > import static org.apache.flink.table.api.DataTypes.TIMESTAMP; > import static org.apache.flink.table.api.Expressions.$; > import static org.apache.flink.table.api.Expressions.call; > import static org.apache.flink.table.api.Expressions.currentTimestamp; > import static org.apache.flink.table.api.Expressions.lit; > public class TestWindowTime { > private StreamTableEnvironment tEnv; > private StreamExecutionEnvironment env; > @Before > public void before() { > Configuration config = new Configuration(); > env = StreamExecutionEnvironment.getExecutionEnvironment(config); > env.setParallelism(1); > tEnv = StreamTableEnvironment.create(env); > } > @Test > public void testWindowTime() { > DataStream<Integer> stream = > env.addSource( > new DataGeneratorSource<>( > SequenceGenerator.intGenerator(0, > 30), 1, 30l)) > .returns(Integer.class); > DataStream<Tuple2<Integer, Long>> streamWithTime = > stream.map(x -> Tuple2.of(x, System.currentTimeMillis())) > .returns(Types.TUPLE(Types.INT, Types.LONG)) > .assignTimestampsAndWatermarks( > WatermarkStrategy.<Tuple2<Integer, > Long>>forBoundedOutOfOrderness( > Duration.ofSeconds(2)) > .withTimestampAssigner( > (ctx) -> (element, > recordTimestamp) -> element.f1)); > Schema schema = > Schema.newBuilder() > .column("f0", DataTypes.INT()) > .column("f1", DataTypes.BIGINT()) > .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") > .watermark("rowtime", "SOURCE_WATERMARK()") > .build(); > Table table = tEnv.fromDataStream(streamWithTime, schema); > table = table.select($("rowtime")); > Table windowedTable = > table.window(Tumble.over("5.seconds").on("rowtime").as("w")) > .groupBy($("w")) > .select( > call(UDAF.class, > $("rowtime")).as("row_times"), > $("w").rowtime().as("window_time"), > currentTimestamp().as("current_timestamp")); > windowedTable = > windowedTable > .joinLateral(call(SplitFunction.class, > $("row_times")).as("rowtime")) > .select( > $("rowtime").cast(TIMESTAMP(3)).as("rowtime"), > $("window_time"), > $("current_timestamp")); > windowedTable.printSchema(); > windowedTable.execute().print(); > } > public static class SplitFunction extends TableFunction<Timestamp> { > public void eval(List<Timestamp> times) { > for (int i = 0; i < times.size(); i++) { > collect(times.get(i)); > } > } > } > public static class UDAF extends AggregateFunction<List<Timestamp>, > List<Timestamp>> { > public UDAF() {} > @Override > public List<Timestamp> createAccumulator() { > return new ArrayList<>(); > } > public void accumulate(List<Timestamp> accumulator, Timestamp num) { > accumulator.add(num); > } > @Override > public List<Timestamp> getValue(List<Timestamp> accumulator) { > return accumulator; > } > } > } {code} > Then the following exception occurs. > {code:java} > java.lang.NullPointerException > at > org.apache.calcite.sql2rel.RelDecorrelator.getNewForOldInputRef(RelDecorrelator.java:1359) > at > org.apache.calcite.sql2rel.RelDecorrelator.access$400(RelDecorrelator.java:122) > at > org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1638) > at > org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1595) > at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) > at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) > at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) > at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateExpr(RelDecorrelator.java:348) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:759) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1170) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1153) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:734) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:391) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelate(RelDecorrelator.java:276) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:200) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:165) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.immutable.Range.foreach(Range.scala:155) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:175) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:82) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:75) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:307) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:187) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) > at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) > at > org.apache.flink.ml.clustering.TestAggWithSourceWatermark.testWindowTime(TestAggWithSourceWatermark.java:109) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)