I'm sorry, I realized that the stacktrack was poorly formatted, here it is
a better formatting:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operatorKeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

2017-11-03 15:12 GMT+01:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hello everyone,
>
> I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
> it comes to checkpoints and within clauses windows closing at the same time
> a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.
>
> The following is the relevant code:
>
> val env : StreamExecutionEnvironment = StreamExecutionEnvironment.
> getExecutionEnvironment
> env.enableCheckpointing(60000) //Checkpoints every minute
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))
>
> //Pattern
> val pattern =
>   Pattern
>     
> .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
> >= 37000)
>     .notNext("disappearing").where(_.event.instantValues.altitude >=
> 37000).within(Time.minutes(1))
>
> // Associate KeyedStream with pattern to be detected
> val patternStream  = CEP.pattern(streamById, pattern)
>
> which causes failure on the second checkpoint with the following exception
> stack trace:
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 fo                                                       r
> operator KeyedCEPPatternOperator -> alert-select -> Sink:
> notification-sink-1
> (1/1).}
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:970)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:51
> 1)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.
> java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor
> .java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator
> KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalStateExcept
> ion: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"origin":"
> YUL","destination":"YWG","flight":"AC8593","aircraft":"
> CRJ7","registration":"C-G
> OJZ","callsign":"JZA593","speed":370,"altitude":38000,"
> course":287,"time":"2017-
> 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 150971668500                                                       0, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:4                                                       3)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:897)
>         ... 5 more
>         Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed                                                        state future.
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:90)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.cleanup(StreamTask.java:1023)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.run(StreamTask.java:961)
>                 ... 5 more
>         Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalSta
> teException: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"o
> rigin":"YUL","destination":"YWG","flight":"AC8593","
> aircraft":"CRJ7","registrati
> on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude"
> :38000,"course":287,"time
> ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 1509                                                       716685000, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>                 at java.util.concurrent.FutureTask.report(FutureTask.
> java:122)
>                 at java.util.concurrent.FutureTask.get(FutureTask.
> java:192)
>                 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUti                                                       l.java:43)
>                 at org.apache.flink.runtime.state.StateUtil.
> discardStateFuture(S
> tateUtil.java:85)
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:88)
>                 ... 7 more
>         Caused by: java.lang.IllegalStateException: Could not find id for
> entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","
> flight"
> :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","
> callsign":"JZA593","speed":
> 370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat":47.9129
> ,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
> [SharedBufferEdge(null, 5)
> , SharedBufferEdge(null, 6)], 1)
>                 at org.apache.flink.util.Preconditions.checkState(
> Preconditions.
> java:195)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:971)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:838)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :928)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :852)
>                 at org.apache.flink.runtime.state.heap.
> NestedMapsStateTable$Nest
> edMapsStateTableSnapshot.writeMappingsInKeyGroup(
> NestedMapsStateTable.java:355)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:347)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:329)
>                 at org.apache.flink.runtime.io.
> async.AbstractAsyncIOCallable.cal
> l(AbstractAsyncIOCallable.java:72)
>                 at java.util.concurrent.FutureTask.run(FutureTask.
> java:266)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.sna
> pshot(HeapKeyedStateBackend.java:372)
>                 at org.apache.flink.streaming.api.operators.
> AbstractStreamOperat
> or.snapshotState(AbstractStreamOperator.java:397)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.checkpointStreamOperator(StreamTask.java:1162)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.executeCheckpointing(StreamTask.java:1094)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpoin
> tState(StreamTask.java:654)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCh
> eckpoint(StreamTask.java:590)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCh
> eckpointOnBarrier(StreamTask.java:543)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> notifyChe
> ckpoint(BarrierBuffer.java:378)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBa
> rrier(BarrierBuffer.java:228)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> getNextNo
> nBlocked(BarrierBuffer.java:183)
>                 at org.apache.flink.streaming.runtime.io.
> StreamInputProcessor.pr
> ocessInput(StreamInputProcessor.java:213)
>                 at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.r
> un(OneInputStreamTask.java:69)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(St
> reamTask.java:263)
>                 at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:702)
>                 ... 1 more
>         [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not
> find id f                                                       or entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"
> YUL","destination":"YWG"
> ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ"
> ,"callsign":"JZA593"
> ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat
> ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
> [SharedBufferEdge
> (null, 5), SharedBufferEdge(null, 6)], 1)]
>
> 11/03/2017 13:46:46     Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 fo                                                       r
> operator KeyedCEPPatternOperator -> alert-select -> Sink:
> notification-sink-1
> (1/1).}
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:970)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:51
> 1)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.
> java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor
> .java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator
> KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalStateExcept
> ion: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"origin":"
> YUL","destination":"YWG","flight":"AC8593","aircraft":"
> CRJ7","registration":"C-G
> OJZ","callsign":"JZA593","speed":370,"altitude":38000,"
> course":287,"time":"2017-
> 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 150971668500                                                       0, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:4                                                       3)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:897)
>         ... 5 more
>         Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed                                                        state future.
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:90)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.cleanup(StreamTask.java:1023)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.run(StreamTask.java:961)
>                 ... 5 more
>         Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException:
> Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"o
> rigin":"YUL","destination":"YWG","flight":"AC8593","
> aircraft":"CRJ7","registrati
> on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude"
> :38000,"course":287,"time
> ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 1509                                                       716685000, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>                 at java.util.concurrent.FutureTask.report(FutureTask.
> java:122)
>                 at java.util.concurrent.FutureTask.get(FutureTask.
> java:192)
>                 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUti                                                       l.java:43)
>                 at org.apache.flink.runtime.state.StateUtil.
> discardStateFuture(S
> tateUtil.java:85)
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:88)
>                 ... 7 more
>         Caused by: java.lang.IllegalStateException: Could not find id for
> entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","
> flight"
> :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","
> callsign":"JZA593","speed":
> 370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 1509716685000, 0), [SharedBufferEdge(null, 5)
> , SharedBufferEdge(null, 6)], 1)
>                 at org.apache.flink.util.Preconditions.checkState(
> Preconditions.
> java:195)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:971)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:838)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :928)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :852)
>                 at org.apache.flink.runtime.state.heap.
> NestedMapsStateTable$Nest
> edMapsStateTableSnapshot.writeMappingsInKeyGroup(
> NestedMapsStateTable.java:355)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:347)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:329)
>                 at org.apache.flink.runtime.io.
> async.AbstractAsyncIOCallable.cal
> l(AbstractAsyncIOCallable.java:72)
>                 at java.util.concurrent.FutureTask.run(FutureTask.
> java:266)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.sna
> pshot(HeapKeyedStateBackend.java:372)
>                 at org.apache.flink.streaming.api.operators.
> AbstractStreamOperat
> or.snapshotState(AbstractStreamOperator.java:397)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.checkpointStreamOperator(StreamTask.java:1162)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.executeCheckpointing(StreamTask.java:1094)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpoin
> tState(StreamTask.java:654)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCh
> eckpoint(StreamTask.java:590)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCh
> eckpointOnBarrier(StreamTask.java:543)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> notifyChe
> ckpoint(BarrierBuffer.java:378)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBa
> rrier(BarrierBuffer.java:228)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> getNextNo
> nBlocked(BarrierBuffer.java:183)
>                 at org.apache.flink.streaming.runtime.io.
> StreamInputProcessor.pr
> ocessInput(StreamInputProcessor.java:213)
>                 at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.r
> un(OneInputStreamTask.java:69)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(St
> reamTask.java:263)
>                 at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:702)
>                 ... 1 more
>         [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not
> find id f                                                       or entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"
> YUL","destination":"YWG"
> ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ"
> ,"callsign":"JZA593"
> ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat
> ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
> [SharedBufferEdge
> (null, 5), SharedBufferEdge(null, 6)], 1)]
> 11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
> 11/03/2017 13:46:46     Process(1/1) switched to CANCELING
> 11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
> 11/03/2017 13:46:46     Process(1/1) switched to CANCELED
> 11/03/2017 13:46:46     Job execution switched to status RESTARTING.
> 11/03/2017 13:46:56     Job execution switched to status CREATED.
> 11/03/2017 13:46:56     Job execution switched to status RUNNING.
> 11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
> 11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
> 11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to SCHEDULED
> 11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
> 11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
> 11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to DEPLOYING
> 11/03/2017 13:46:56     Process(1/1) switched to RUNNING
> 11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to RUNNING
> 11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
> 11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to FAILED
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:321)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initi
> alizeState(AbstractStreamOperator.java:217)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperato
> rs(StreamTask.java:676)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(S
> treamTask.java:663)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask
> .java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>         at java.io.ObjectInputStream$BlockDataInputStream.
> readBlockHeader(Object
> InputStream.java:2519)
>         at java.io.ObjectInputStream$BlockDataInputStream.refill(
> ObjectInputStre
> am.java:2553)
>         at java.io.ObjectInputStream$BlockDataInputStream.
> skipBlockData(ObjectIn
> putStream.java:2455)
>         at java.io.ObjectInputStream.skipCustomData(
> ObjectInputStream.java:1951)
>         at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:162
> 1)
>         at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1518)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:20
> 00)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1924)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:371)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeCondition(NFA.j
> ava:1211)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeStates(NFA.java
> :1169)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:957)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:852)
>         at org.apache.flink.runtime.state.heap.
> StateTableByKeyGroupReaders$State
> TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(
> StateTableByKeyGroupReaders.jav
> a:132)
>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> restorePart
> itionedState(HeapKeyedStateBackend.java:518)
>         at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.restore(Hea
> pKeyedStateBackend.java:397)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateB
> ackend(StreamTask.java:772)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:311)
>         ... 6 more
>
> 11/03/2017 13:46:57     Job execution switched to status FAILING.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:321)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initi
> alizeState(AbstractStreamOperator.java:217)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperato
> rs(StreamTask.java:676)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(S
> treamTask.java:663)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask
> .java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>         at java.io.ObjectInputStream$BlockDataInputStream.
> readBlockHeader(Object
> InputStream.java:2519)
>         at java.io.ObjectInputStream$BlockDataInputStream.refill(
> ObjectInputStre
> am.java:2553)
>         at java.io.ObjectInputStream$BlockDataInputStream.
> skipBlockData(ObjectIn
> putStream.java:2455)
>         at java.io.ObjectInputStream.skipCustomData(
> ObjectInputStream.java:1951)
>         at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:162
> 1)
>         at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1518)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:20
> 00)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1924)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:371)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeCondition(NFA.j
> ava:1211)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeStates(NFA.java
> :1169)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:957)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:852)
>         at org.apache.flink.runtime.state.heap.
> StateTableByKeyGroupReaders$State
> TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(
> StateTableByKeyGroupReaders.jav
> a:132)
>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> restorePart
> itionedState(HeapKeyedStateBackend.java:518)
>         at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.restore(Hea
> pKeyedStateBackend.java:397)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateB
> ackend(StreamTask.java:772)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:311)
>         ... 6 more
>
>
> What is happening here? Am I doing something wrong? Is there some sort of
> conflict between within clauses deadlines and checkpoint deadlines?
>
> I found the following similar JIRA pages, but none of those mention
> circular references: https://issues.apache.org/jira/browse/FLINK-6321
> https://issues.apache.org/jira/browse/FLINK-7484
> https://issues.apache.org/jira/browse/FLINK-7756
>
> Kind Regards,
> Federico D'Ambrosio
>



-- 
Federico D'Ambrosio

Reply via email to