Hi Yan,
I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a list of elements that was already cleared and does not check against null. Could you please file a JIRA for that? Best, Dawid On 30/05/18 08:27, Yan Zhou [FDS Science] wrote: > > I also get warnning that CodeCache is full around that time. It's > printed by JVM and doesn't have timestamp. But I suspect that it's > because so many failure recoveries from checkpoint and the sql queries > are dynamically compiled too many times. > > > > /Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. > Compiler has been disabled./ > /Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code > cache size using -XX:ReservedCodeCacheSize=/ > /CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb/ > /bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]/ > /total_blobs=54308 nmethods=53551 adapters=617/ > /compilation: disabled (not enough contiguous free space left)/ > > > > ------------------------------------------------------------------------ > *From:* Yan Zhou [FDS Science] <yz...@coupang.com> > *Sent:* Tuesday, May 29, 2018 10:52:18 PM > *To:* user@flink.apache.org > *Subject:* NPE in flink sql over-window > > > Hi, > > I am using flink sql 1.5.0. My application throws NPE. And after it > recover from checkpoint automatically, it throws NPE immediately from > same line of code. > > > My application read message from kafka, convert the datastream into a > table, issue an Over-window aggregation and write the result into a > sink. NPE throws from class ProcTimeBoundedRangeOver. Please see > exception log at the bottom. > > > The exceptions always happens after the application started > for /maxIdleStateRetentionTime /time. What could be the possible causes? > > > Best > > Yan > > > /2018-05-27 11:03:37,656 INFO > org.apache.flink.runtime.taskmanager.Task - over: > (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 86400000 PRECEDI/ > /NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS > w0$o0)) -> select: / > /(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter > -> Sink: Unnamed (3/15) (327/ > /efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED./ > /TimerException{java.lang.NullPointerException}/ > / at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)/ > / at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)/ > / at java.util.concurrent.FutureTask.run(FutureTask.java:266)/ > / at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)/ > / at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)/ > / at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)/ > / at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)/ > / at java.lang.Thread.run(Thread.java:748)/ > /Caused by: java.lang.NullPointerException/ > / at > org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)/ > / at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)/ > / at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)/ > / at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)/ > / at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)/ > > >
signature.asc
Description: OpenPGP digital signature