[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494851#comment-16494851 ]
Stefan Richter commented on FLINK-9268: --------------------------------------- Please notice that this is our attempt to "fix" (= for now, a better error message) the problem on the RocksDB side: https://github.com/facebook/rocksdb/issues/3849. > RockDB errors from WindowOperator > --------------------------------- > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing > Affects Versions: 1.4.2 > Reporter: Narayanan Arunachalam > Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=900000 > checkpoint.timeout=300000 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)