I'm sorry, I realized that the stacktrack was poorly formatted, here it is a better formatting:
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operatorKeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852) at org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)] 11/03/2017 13:46:46 Job execution switched to status FAILING. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852) at org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)] 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELING 11/03/2017 13:46:46 Process(1/1) switched to CANCELING 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELED 11/03/2017 13:46:46 Process(1/1) switched to CANCELED 11/03/2017 13:46:46 Job execution switched to status RESTARTING. 11/03/2017 13:46:56 Job execution switched to status CREATED. 11/03/2017 13:46:56 Job execution switched to status RUNNING. 11/03/2017 13:46:56 Source: json-parser(1/1) switched to SCHEDULED 11/03/2017 13:46:56 Process(1/1) switched to SCHEDULED 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to SCHEDULED 11/03/2017 13:46:56 Source: json-parser(1/1) switched to DEPLOYING 11/03/2017 13:46:56 Process(1/1) switched to DEPLOYING 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to DEPLOYING 11/03/2017 13:46:56 Process(1/1) switched to RUNNING 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to RUNNING 11/03/2017 13:46:56 Source: json-parser(1/1) switched to RUNNING 11/03/2017 13:46:57 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to FAILED java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more 11/03/2017 13:46:57 Job execution switched to status FAILING. java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more 2017-11-03 15:12 GMT+01:00 Federico D'Ambrosio < federico.dambro...@smartlab.ws>: > Hello everyone, > > I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when > it comes to checkpoints and within clauses windows closing at the same time > a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs. > > The following is the relevant code: > > val env : StreamExecutionEnvironment = StreamExecutionEnvironment. > getExecutionEnvironment > env.enableCheckpointing(60000) //Checkpoints every minute > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir")) > > //Pattern > val pattern = > Pattern > > .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude > >= 37000) > .notNext("disappearing").where(_.event.instantValues.altitude >= > 37000).within(Time.minutes(1)) > > // Associate KeyedStream with pattern to be detected > val patternStream = CEP.pattern(streamById, pattern) > > which causes failure on the second checkpoint with the following exception > stack trace: > > AsynchronousException{java.lang.Exception: Could not materialize > checkpoint 2 fo r > operator KeyedCEPPatternOperator -> alert-select -> Sink: > notification-sink-1 > (1/1).} > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRu > nnable.run(StreamTask.java:970) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:51 > 1) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor. > java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor > .java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator > KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateExcept > ion: Could not find id for entry: SharedBufferEntry( > ValueTimeWrapper({"origin":" > YUL","destination":"YWG","flight":"AC8593","aircraft":" > CRJ7","registration":"C-G > OJZ","callsign":"JZA593","speed":370,"altitude":38000," > course":287,"time":"2017- > 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, > 150971668500 0, 0), > [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:4 3) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRu > nnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at org.apache.flink.streaming.api.operators. > OperatorSnapshotResu > lt.cancel(OperatorSnapshotResult.java:90) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncChec > kpointRunnable.cleanup(StreamTask.java:1023) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncChec > kpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalSta > teException: Could not find id for entry: SharedBufferEntry( > ValueTimeWrapper({"o > rigin":"YUL","destination":"YWG","flight":"AC8593"," > aircraft":"CRJ7","registrati > on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude" > :38000,"course":287,"time > ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, > 1509 716685000, 0), > [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) > at java.util.concurrent.FutureTask.report(FutureTask. > java:122) > at java.util.concurrent.FutureTask.get(FutureTask. > java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet( > FutureUti l.java:43) > at org.apache.flink.runtime.state.StateUtil. > discardStateFuture(S > tateUtil.java:85) > at org.apache.flink.streaming.api.operators. > OperatorSnapshotResu > lt.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.lang.IllegalStateException: Could not find id for > entry: > SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"," > flight" > :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ"," > callsign":"JZA593","speed": > 370,"altitude":38000,"course":287,"time":"2017-11-03 > 13:44:45.000","lat":47.9129 > ,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), > [SharedBufferEdge(null, 5) > , SharedBufferEdge(null, 6)], 1) > at org.apache.flink.util.Preconditions.checkState( > Preconditions. > java:195) > at org.apache.flink.cep.nfa.SharedBuffer$ > SharedBufferSerializer. > serialize(SharedBuffer.java:971) > at org.apache.flink.cep.nfa.SharedBuffer$ > SharedBufferSerializer. > serialize(SharedBuffer.java:838) > at org.apache.flink.cep.nfa.NFA$ > NFASerializer.serialize(NFA.java > :928) > at org.apache.flink.cep.nfa.NFA$ > NFASerializer.serialize(NFA.java > :852) > at org.apache.flink.runtime.state.heap. > NestedMapsStateTable$Nest > edMapsStateTableSnapshot.writeMappingsInKeyGroup( > NestedMapsStateTable.java:355) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend$1.p > erformOperation(HeapKeyedStateBackend.java:347) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend$1.p > erformOperation(HeapKeyedStateBackend.java:329) > at org.apache.flink.runtime.io. > async.AbstractAsyncIOCallable.cal > l(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask. > java:266) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend.sna > pshot(HeapKeyedStateBackend.java:372) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperat > or.snapshotState(AbstractStreamOperator.java:397) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > Checkpoin > tingOperation.checkpointStreamOperator(StreamTask.java:1162) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > Checkpoin > tingOperation.executeCheckpointing(StreamTask.java:1094) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > checkpoin > tState(StreamTask.java:654) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > performCh > eckpoint(StreamTask.java:590) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > triggerCh > eckpointOnBarrier(StreamTask.java:543) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > notifyChe > ckpoint(BarrierBuffer.java:378) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > processBa > rrier(BarrierBuffer.java:228) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > getNextNo > nBlocked(BarrierBuffer.java:183) > at org.apache.flink.streaming.runtime.io. > StreamInputProcessor.pr > ocessInput(StreamInputProcessor.java:213) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask.r > un(OneInputStreamTask.java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(St > reamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task. > java:702) > ... 1 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not > find id f or entry: > SharedBufferEntry(ValueTimeWrapper({"origin":" > YUL","destination":"YWG" > ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ" > ,"callsign":"JZA593" > ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 > 13:44:45.000","lat > ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), > [SharedBufferEdge > (null, 5), SharedBufferEdge(null, 6)], 1)] > > 11/03/2017 13:46:46 Job execution switched to status FAILING. > AsynchronousException{java.lang.Exception: Could not materialize > checkpoint 2 fo r > operator KeyedCEPPatternOperator -> alert-select -> Sink: > notification-sink-1 > (1/1).} > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRu > nnable.run(StreamTask.java:970) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:51 > 1) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor. > java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor > .java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator > KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateExcept > ion: Could not find id for entry: SharedBufferEntry( > ValueTimeWrapper({"origin":" > YUL","destination":"YWG","flight":"AC8593","aircraft":" > CRJ7","registration":"C-G > OJZ","callsign":"JZA593","speed":370,"altitude":38000," > course":287,"time":"2017- > 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, > 150971668500 0, 0), > [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:4 3) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRu > nnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at org.apache.flink.streaming.api.operators. > OperatorSnapshotResu > lt.cancel(OperatorSnapshotResult.java:90) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncChec > kpointRunnable.cleanup(StreamTask.java:1023) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncChec > kpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException: > Could not find id for entry: SharedBufferEntry( > ValueTimeWrapper({"o > rigin":"YUL","destination":"YWG","flight":"AC8593"," > aircraft":"CRJ7","registrati > on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude" > :38000,"course":287,"time > ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, > 1509 716685000, 0), > [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) > at java.util.concurrent.FutureTask.report(FutureTask. > java:122) > at java.util.concurrent.FutureTask.get(FutureTask. > java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet( > FutureUti l.java:43) > at org.apache.flink.runtime.state.StateUtil. > discardStateFuture(S > tateUtil.java:85) > at org.apache.flink.streaming.api.operators. > OperatorSnapshotResu > lt.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.lang.IllegalStateException: Could not find id for > entry: > SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"," > flight" > :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ"," > callsign":"JZA593","speed": > 370,"altitude":38000,"course":287,"time":"2017-11-03 > 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, > 1509716685000, 0), [SharedBufferEdge(null, 5) > , SharedBufferEdge(null, 6)], 1) > at org.apache.flink.util.Preconditions.checkState( > Preconditions. > java:195) > at org.apache.flink.cep.nfa.SharedBuffer$ > SharedBufferSerializer. > serialize(SharedBuffer.java:971) > at org.apache.flink.cep.nfa.SharedBuffer$ > SharedBufferSerializer. > serialize(SharedBuffer.java:838) > at org.apache.flink.cep.nfa.NFA$ > NFASerializer.serialize(NFA.java > :928) > at org.apache.flink.cep.nfa.NFA$ > NFASerializer.serialize(NFA.java > :852) > at org.apache.flink.runtime.state.heap. > NestedMapsStateTable$Nest > edMapsStateTableSnapshot.writeMappingsInKeyGroup( > NestedMapsStateTable.java:355) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend$1.p > erformOperation(HeapKeyedStateBackend.java:347) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend$1.p > erformOperation(HeapKeyedStateBackend.java:329) > at org.apache.flink.runtime.io. > async.AbstractAsyncIOCallable.cal > l(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask. > java:266) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend.sna > pshot(HeapKeyedStateBackend.java:372) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperat > or.snapshotState(AbstractStreamOperator.java:397) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > Checkpoin > tingOperation.checkpointStreamOperator(StreamTask.java:1162) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > Checkpoin > tingOperation.executeCheckpointing(StreamTask.java:1094) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > checkpoin > tState(StreamTask.java:654) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > performCh > eckpoint(StreamTask.java:590) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > triggerCh > eckpointOnBarrier(StreamTask.java:543) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > notifyChe > ckpoint(BarrierBuffer.java:378) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > processBa > rrier(BarrierBuffer.java:228) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > getNextNo > nBlocked(BarrierBuffer.java:183) > at org.apache.flink.streaming.runtime.io. > StreamInputProcessor.pr > ocessInput(StreamInputProcessor.java:213) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask.r > un(OneInputStreamTask.java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(St > reamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task. > java:702) > ... 1 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not > find id f or entry: > SharedBufferEntry(ValueTimeWrapper({"origin":" > YUL","destination":"YWG" > ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ" > ,"callsign":"JZA593" > ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 > 13:44:45.000","lat > ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), > [SharedBufferEdge > (null, 5), SharedBufferEdge(null, 6)], 1)] > 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELING > 11/03/2017 13:46:46 Process(1/1) switched to CANCELING > 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELED > 11/03/2017 13:46:46 Process(1/1) switched to CANCELED > 11/03/2017 13:46:46 Job execution switched to status RESTARTING. > 11/03/2017 13:46:56 Job execution switched to status CREATED. > 11/03/2017 13:46:56 Job execution switched to status RUNNING. > 11/03/2017 13:46:56 Source: json-parser(1/1) switched to SCHEDULED > 11/03/2017 13:46:56 Process(1/1) switched to SCHEDULED > 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: > notific > ation-sink-1(1/1) switched to SCHEDULED > 11/03/2017 13:46:56 Source: json-parser(1/1) switched to DEPLOYING > 11/03/2017 13:46:56 Process(1/1) switched to DEPLOYING > 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: > notific > ation-sink-1(1/1) switched to DEPLOYING > 11/03/2017 13:46:56 Process(1/1) switched to RUNNING > 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: > notific > ation-sink-1(1/1) switched to RUNNING > 11/03/2017 13:46:56 Source: json-parser(1/1) switched to RUNNING > 11/03/2017 13:46:57 KeyedCEPPatternOperator -> alert-select -> Sink: > notific > ation-sink-1(1/1) switched to FAILED > java.lang.IllegalStateException: Could not initialize keyed state backend. > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initK > eyedState(AbstractStreamOperator.java:321) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initi > alizeState(AbstractStreamOperator.java:217) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperato > rs(StreamTask.java:676) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(S > treamTask.java:663) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask > .java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream$BlockDataInputStream. > readBlockHeader(Object > InputStream.java:2519) > at java.io.ObjectInputStream$BlockDataInputStream.refill( > ObjectInputStre > am.java:2553) > at java.io.ObjectInputStream$BlockDataInputStream. > skipBlockData(ObjectIn > putStream.java:2455) > at java.io.ObjectInputStream.skipCustomData( > ObjectInputStream.java:1951) > at java.io.ObjectInputStream.readNonProxyDesc( > ObjectInputStream.java:162 > 1) > at java.io.ObjectInputStream.readClassDesc( > ObjectInputStream.java:1518) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1 > 774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1351) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:20 > 00) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1 > 801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:371) > at org.apache.flink.cep.nfa.NFA$NFASerializer. > deserializeCondition(NFA.j > ava:1211) > at org.apache.flink.cep.nfa.NFA$NFASerializer. > deserializeStates(NFA.java > :1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA. > java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA. > java:852) > at org.apache.flink.runtime.state.heap. > StateTableByKeyGroupReaders$State > TableByKeyGroupReaderV2V3.readMappingsInKeyGroup( > StateTableByKeyGroupReaders.jav > a:132) > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend. > restorePart > itionedState(HeapKeyedStateBackend.java:518) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend.restore(Hea > pKeyedStateBackend.java:397) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > createKeyedStateB > ackend(StreamTask.java:772) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initK > eyedState(AbstractStreamOperator.java:311) > ... 6 more > > 11/03/2017 13:46:57 Job execution switched to status FAILING. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initK > eyedState(AbstractStreamOperator.java:321) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initi > alizeState(AbstractStreamOperator.java:217) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperato > rs(StreamTask.java:676) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(S > treamTask.java:663) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask > .java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream$BlockDataInputStream. > readBlockHeader(Object > InputStream.java:2519) > at java.io.ObjectInputStream$BlockDataInputStream.refill( > ObjectInputStre > am.java:2553) > at java.io.ObjectInputStream$BlockDataInputStream. > skipBlockData(ObjectIn > putStream.java:2455) > at java.io.ObjectInputStream.skipCustomData( > ObjectInputStream.java:1951) > at java.io.ObjectInputStream.readNonProxyDesc( > ObjectInputStream.java:162 > 1) > at java.io.ObjectInputStream.readClassDesc( > ObjectInputStream.java:1518) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1 > 774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1351) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:20 > 00) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1 > 801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:371) > at org.apache.flink.cep.nfa.NFA$NFASerializer. > deserializeCondition(NFA.j > ava:1211) > at org.apache.flink.cep.nfa.NFA$NFASerializer. > deserializeStates(NFA.java > :1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA. > java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA. > java:852) > at org.apache.flink.runtime.state.heap. > StateTableByKeyGroupReaders$State > TableByKeyGroupReaderV2V3.readMappingsInKeyGroup( > StateTableByKeyGroupReaders.jav > a:132) > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend. > restorePart > itionedState(HeapKeyedStateBackend.java:518) > at org.apache.flink.runtime.state.heap. > HeapKeyedStateBackend.restore(Hea > pKeyedStateBackend.java:397) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > createKeyedStateB > ackend(StreamTask.java:772) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initK > eyedState(AbstractStreamOperator.java:311) > ... 6 more > > > What is happening here? Am I doing something wrong? Is there some sort of > conflict between within clauses deadlines and checkpoint deadlines? > > I found the following similar JIRA pages, but none of those mention > circular references: https://issues.apache.org/jira/browse/FLINK-6321 > https://issues.apache.org/jira/browse/FLINK-7484 > https://issues.apache.org/jira/browse/FLINK-7756 > > Kind Regards, > Federico D'Ambrosio > -- Federico D'Ambrosio