[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17721969#comment-17721969 ]
Shengkai Fang commented on FLINK-31967: --------------------------------------- [~padavan] Sure. There will be a PR to fix this soon. > SQL with LAG function NullPointerException > ------------------------------------------ > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Reporter: padavan > Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream<UserModel> ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)