Hai Zhou created FLINK-9233: ------------------------------- Summary: Merging state may cause runtime exception when windows trigger onMerge Key: FLINK-9233 URL: https://issues.apache.org/jira/browse/FLINK-9233 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.0 Reporter: Hai Zhou
the main logic of my flink job is as follows: {code:java} clickStream.coGroup(exposureStream).where(...).equalTo(...) .window(EventTimeSessionWindows.withGap()) .trigger(new SessionMatchTrigger) .evictor() .apply(); {code} {code:java} SessionMatchTrigger{ ReducingStateDescriptor stateDesc = new ReducingStateDescriptor() ... public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.mergePartitionedState(this.stateDesc); ctx.registerEventTimeTimer(window.maxTimestamp()); } .... } {code} {panel:title=detailed error logs} java.lang.RuntimeException: Error while merging state. at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:895) at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:56) at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:14) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onMerge(WindowOperator.java:939) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:141) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:120) at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:119) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Error while merging state in RocksDB at org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:186) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:887) ... 12 more Caused by: java.lang.IllegalArgumentException: Illegal value provided for SubCode. at org.rocksdb.Status$SubCode.getSubCode(Status.java:109) at org.rocksdb.Status.<init>(Status.java:30) at org.rocksdb.RocksDB.delete(Native Method) at org.rocksdb.RocksDB.delete(RocksDB.java:1110) at org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:143) ... 13 more {panel} I found the reason of this error. Due to Java's {RocksDB.Status.SubCode} was out of sync with {include/rocksdb/status.h:SubCode} . When running out of disc space this led to an {IllegalArgumentException} because of an invalid status code, rather than just returning the corresponding status code without an exception. more details:<[https://github.com/facebook/rocksdb/pull/3050]> -- This message was sent by Atlassian JIRA (v7.6.3#76005)