[ https://issues.apache.org/jira/browse/FLINK-20961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuval Itzchakov updated FLINK-20961: ------------------------------------ Description: Given the following program: {code:java} //import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy } import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{$, AnyWithOperations} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction} import java.time.Instant object BugRepro { def text: String = s""" |{ | "s": "hello", | "i": ${Random.nextInt()} |} |""".stripMargin def main(args: Array[String]): Unit = { val flink = StreamExecutionEnvironment.createLocalEnvironment() val tableEnv = StreamTableEnvironment.create(flink) val dataStream = flink .addSource { new SourceFunction[(Long, String)] { var isRunning = true override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = while (isRunning) { val x = (Instant.now().toEpochMilli, text) ctx.collect(x) ctx.emitWatermark(new Watermark(x._1)) Thread.sleep(300) } override def cancel(): Unit = isRunning = false } } // .assignTimestampsAndWatermarks( // WatermarkStrategy // .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30)) // .withTimestampAssigner { // new SerializableTimestampAssigner[(Long, String)] { // override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = // element._1 // } // } // ) // tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text")) val res = tableEnv.sqlQuery(""" |SELECT json_text |FROM testview |""".stripMargin) val sink = tableEnv.executeSql( """ |CREATE TABLE SINK ( | json_text STRING |) |WITH ( | 'connector' = 'print' |) |""".stripMargin ) res.executeInsert("SINK").await() () } res.executeInsert("SINK").await() {code} Flink will throw a NullPointerException at runtime: {code:java} Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) {code} This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code: {code:java} public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0; org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public SourceConversion$3( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { this.references = references; converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0])); this.setup(task, config, output); if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) .setProcessingTimeService(processingTimeService); } } @Override public void open() throws Exception { super.open(); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue()); org.apache.flink.table.data.TimestampData result$1; boolean isNull$1; org.apache.flink.table.data.binary.BinaryStringData field$2; boolean isNull$2; isNull$2 = in1.isNullAt(1); field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$2) { field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)); } ctx.element = element; result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); } isNull$1 = false; if (isNull$1) { out.setField(0, null); } else { out.setField(0, result$1); } if (isNull$2) { out.setField(1, null); } else { out.setField(1, field$2); } output.collect(outElement.replace(out)); ctx.element = null; } @Override public void close() throws Exception { super.close(); } } {code} The important line is here: {code:java} result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); {code} `ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be: {code:java} if (!ctx.hasTimestamp) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); } result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code} was: Given the following program: {code:java} //import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy } import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{$, AnyWithOperations} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction} import java.time.Instant object BugRepro { def text: String = s""" |{ | "s": "hello", | "i": ${Random.nextInt()} |} |""".stripMargin def main(args: Array[String]): Unit = { val flink = StreamExecutionEnvironment.createLocalEnvironment() val tableEnv = StreamTableEnvironment.create(flink) val dataStream = flink .addSource { new SourceFunction[(Long, String)] { var isRunning = true override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = while (isRunning) { val x = (Instant.now().toEpochMilli, text) ctx.collect(x) ctx.emitWatermark(new Watermark(x._1)) Thread.sleep(300) } override def cancel(): Unit = isRunning = false } } // .assignTimestampsAndWatermarks( // WatermarkStrategy // .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30)) // .withTimestampAssigner { // new SerializableTimestampAssigner[(Long, String)] { // override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = // element._1 // } // } // ) // tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text")) val res = tableEnv.sqlQuery(""" |SELECT json_text |FROM testview |""".stripMargin) val sink = tableEnv.executeSql( """ |CREATE TABLE SINK ( | json_text STRING |) |WITH ( | 'connector' = 'print' |) |""".stripMargin ) res.executeInsert("SINK").await() () } res.executeInsert("SINK").await() {code} Flink will throw a NullPointerException at runtime: {code:java} Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) {code} This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code: {code:java} public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0; org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public SourceConversion$3( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { this.references = references; converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0])); this.setup(task, config, output); if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) .setProcessingTimeService(processingTimeService); } } @Override public void open() throws Exception { super.open(); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue()); org.apache.flink.table.data.TimestampData result$1; boolean isNull$1; org.apache.flink.table.data.binary.BinaryStringData field$2; boolean isNull$2; isNull$2 = in1.isNullAt(1); field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$2) { field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)); } ctx.element = element; result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); } isNull$1 = false; if (isNull$1) { out.setField(0, null); } else { out.setField(0, result$1); } if (isNull$2) { out.setField(1, null); } else { out.setField(1, field$2); } output.collect(outElement.replace(out)); ctx.element = null; } @Override public void close() throws Exception { super.close(); } } {code} The important line is here: {code:java} result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); {code} `ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be: {code:java} if (!ctx.hasTimestamp) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic."); } result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code} > Flink throws NullPointerException for tables created from DataStream with no > assigned timestamps and watermarks > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-20961 > URL: https://issues.apache.org/jira/browse/FLINK-20961 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Yuval Itzchakov > Priority: Minor > > > Given the following program: > {code:java} > //import org.apache.flink.api.common.eventtime.{ > SerializableTimestampAssigner, WatermarkStrategy } > import org.apache.flink.streaming.api.functions.source.SourceFunction > import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > import org.apache.flink.streaming.api.watermark.Watermark > import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint} > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.table.api.{$, AnyWithOperations} > import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction} > import java.time.Instant > object BugRepro { > def text: String = > s""" > |{ > | "s": "hello", > | "i": ${Random.nextInt()} > |} > |""".stripMargin > def main(args: Array[String]): Unit = { > val flink = > StreamExecutionEnvironment.createLocalEnvironment() > val tableEnv = StreamTableEnvironment.create(flink) > val dataStream = flink > .addSource { > new SourceFunction[(Long, String)] { > var isRunning = true > override def run(ctx: SourceFunction.SourceContext[(Long, > String)]): Unit = > while (isRunning) { > val x = (Instant.now().toEpochMilli, text) > ctx.collect(x) > ctx.emitWatermark(new Watermark(x._1)) > Thread.sleep(300) > } > override def cancel(): Unit = > isRunning = false > } > } > // .assignTimestampsAndWatermarks( > // WatermarkStrategy > // .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30)) > // .withTimestampAssigner { > // new SerializableTimestampAssigner[(Long, String)] { > // override def extractTimestamp(element: (Long, String), > recordTimestamp: Long): Long = > // element._1 > // } > // } > // ) > // > tableEnv.createTemporaryView("testview", dataStream, > $("event_time").rowtime(), $("json_text")) > val res = tableEnv.sqlQuery(""" > |SELECT json_text > |FROM testview > |""".stripMargin) > val sink = tableEnv.executeSql( > """ > |CREATE TABLE SINK ( > | json_text STRING > |) > |WITH ( > | 'connector' = 'print' > |) > |""".stripMargin > ) res.executeInsert("SINK").await() > () > } > res.executeInsert("SINK").await() > {code} > > Flink will throw a NullPointerException at runtime: > {code:java} > Caused by: java.lang.NullPointerExceptionCaused by: > java.lang.NullPointerException at SourceConversion$3.processElement(Unknown > Source) at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) > at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) > {code} > This is due to the fact that the DataStream did not assign a timestamp to the > underlying source. This is the generated code: > {code:java} > public class SourceConversion$3 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > private final Object[] references; > private transient > org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter > converter$0; > org.apache.flink.table.data.GenericRowData out = new > org.apache.flink.table.data.GenericRowData(2); > private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > public SourceConversion$3( > Object[] references, > org.apache.flink.streaming.runtime.tasks.StreamTask task, > org.apache.flink.streaming.api.graph.StreamConfig config, > org.apache.flink.streaming.api.operators.Output output, > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > this.references = references; > converter$0 = > (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) > references[0])); > this.setup(task, config, output); > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > .setProcessingTimeService(processingTimeService); > } > } @Override > public void open() throws Exception { > super.open(); > > } @Override > public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) > converter$0.toInternal((scala.Tuple2) element.getValue()); > > org.apache.flink.table.data.TimestampData result$1; > boolean isNull$1; > org.apache.flink.table.data.binary.BinaryStringData field$2; > boolean isNull$2; > isNull$2 = in1.isNullAt(1); > field$2 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > if (!isNull$2) { > field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) > in1.getString(1)); > } > > ctx.element = element; > > > > result$1 = > org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); > if (result$1 == null) { > throw new RuntimeException("Rowtime timestamp is null. Please > make sure that a " + > "proper TimestampAssigner is defined and the stream environment > uses the EventTime " + > "time characteristic."); > } > isNull$1 = false; > if (isNull$1) { > out.setField(0, null); > } else { > out.setField(0, result$1); > } > > > > if (isNull$2) { > out.setField(1, null); > } else { > out.setField(1, field$2); > } > > > output.collect(outElement.replace(out)); > ctx.element = null; > > } @Override > public void close() throws Exception { > super.close(); > > } > } > {code} > The important line is here: > {code:java} > result$1 = > org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); > if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is > null. Please make sure that a " + "proper TimestampAssigner is defined and > the stream environment uses the EventTime " + "time characteristic."); > {code} > `ctx.timestamp` returns null in case no timestamp assigner was created, and > `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference > fails. The actual check should be: > {code:java} > if (!ctx.hasTimestamp) { > throw new RuntimeException("Rowtime timestamp is null. Please make sure > that a " + "proper TimestampAssigner is defined and the stream environment > uses the EventTime " + "time characteristic."); > } > result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)