Jiang Xin created FLINK-30420: --------------------------------- Summary: NPE thrown when using window time in Table API Key: FLINK-30420 URL: https://issues.apache.org/jira/browse/FLINK-30420 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Jiang Xin
Run the following unit test and it would fail. {code:java} import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.junit.Before; import org.junit.Test; import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.List; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; import static org.apache.flink.table.api.Expressions.currentTimestamp; import static org.apache.flink.table.api.Expressions.lit; public class TestAggWithSourceWatermark { private StreamTableEnvironment tEnv; private StreamExecutionEnvironment env; @Before public void before() { Configuration config = new Configuration(); env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); tEnv = StreamTableEnvironment.create(env); } @Test public void testWindowTime() { DataStream<Integer> stream = env.addSource( new DataGeneratorSource<>( SequenceGenerator.intGenerator(0, 30), 1, 30l)) .returns(Integer.class); DataStream<Tuple2<Integer, Long>> streamWithTime = stream.map(x -> Tuple2.of(x, System.currentTimeMillis())) .returns(Types.TUPLE(Types.INT, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<Integer, Long>>forBoundedOutOfOrderness( Duration.ofSeconds(2)) .withTimestampAssigner( (ctx) -> (element, recordTimestamp) -> element.f1)); Schema schema = Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.BIGINT()) .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build(); Table table = tEnv.fromDataStream(streamWithTime, schema); table = table.select($("rowtime")); Table windowedTable = table.window(Tumble.over("5.seconds").on("rowtime").as("w")) .groupBy($("w")) .select( call(UDAF.class, $("rowtime")).as("row_times"), $("w").rowtime().as("window_time"), currentTimestamp().as("current_timestamp")); windowedTable = windowedTable .joinLateral(call(SplitFunction.class, $("row_times")).as("rowtime")) .select( $("rowtime").cast(TIMESTAMP(3)).as("rowtime"), $("window_time"), $("current_timestamp")); windowedTable.printSchema(); windowedTable.execute().print(); } public static class SplitFunction extends TableFunction<Timestamp> { public void eval(List<Timestamp> times) { for (int i = 0; i < times.size(); i++) { collect(times.get(i)); } } } public static class UDAF extends AggregateFunction<List<Timestamp>, List<Timestamp>> { public UDAF() {} @Override public List<Timestamp> createAccumulator() { return new ArrayList<>(); } public void accumulate(List<Timestamp> accumulator, Timestamp num) { accumulator.add(num); } @Override public List<Timestamp> getValue(List<Timestamp> accumulator) { return accumulator; } } } {code} Then the following exception occurs {code:java} java.lang.NullPointerException at org.apache.calcite.sql2rel.RelDecorrelator.getNewForOldInputRef(RelDecorrelator.java:1359) at org.apache.calcite.sql2rel.RelDecorrelator.access$400(RelDecorrelator.java:122) at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1638) at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1595) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateExpr(RelDecorrelator.java:348) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:759) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1170) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:734) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:391) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525) at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelate(RelDecorrelator.java:276) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:200) at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:165) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:175) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:82) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:75) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:307) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:187) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) at org.apache.flink.ml.clustering.TestAggWithSourceWatermark.testWindowTime(TestAggWithSourceWatermark.java:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)