Hello everyone,

I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
it comes to checkpoints and within clauses windows closing at the same time
a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.

The following is the relevant code:

val env : StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000) //Checkpoints every minute
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))

//Pattern
val pattern =
  Pattern

.begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
>= 37000)
    .notNext("disappearing").where(_.event.instantValues.altitude >=
37000).within(Time.minutes(1))

// Associate KeyedStream with pattern to be detected
val patternStream  = CEP.pattern(streamById, pattern)

which causes failure on the second checkpoint with the following exception
stack trace:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 fo                                                       r operator
KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1
(1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator
KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateExcept
ion: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"
YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G
OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-
11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
150971668500                                                       0, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4
3)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed                                                        state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalSta
teException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"o
rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati
on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time
":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509                                                       716685000, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti
l.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(S
tateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"
:"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":
370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129
,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge(null,
5)                                                       ,
SharedBufferEdge(null, 6)], 1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.
java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest
edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal
l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna
pshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperat
or.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin
tState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCh
eckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh
eckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe
ckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa
rrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo
nBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr
ocessInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r
un(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St
reamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id f                                                       or entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"
,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"
,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat
":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge
(null, 5), SharedBufferEdge(null, 6)], 1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 fo                                                       r operator
KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1
(1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator
KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateExcept
ion: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"
YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G
OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-
11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
150971668500                                                       0, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4
3)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed                                                        state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"o
rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati
on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time
":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509                                                       716685000, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti
l.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(S
tateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"
:"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":
370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null,
5)                                                       ,
SharedBufferEdge(null, 6)], 1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.
java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest
edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal
l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna
pshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperat
or.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin
tState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCh
eckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh
eckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe
ckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa
rrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo
nBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr
ocessInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r
un(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St
reamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id f                                                       or entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"
,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"
,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat
":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge
(null, 5), SharedBufferEdge(null, 6)], 1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi
alizeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato
rs(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S
treamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object
InputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre
am.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn
putStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162
1)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20
00)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j
ava:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java
:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State
TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav
a:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart
itionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea
pKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB
ackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi
alizeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato
rs(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S
treamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object
InputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre
am.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn
putStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162
1)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20
00)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j
ava:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java
:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State
TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav
a:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart
itionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea
pKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB
ackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:311)
        ... 6 more


What is happening here? Am I doing something wrong? Is there some sort of
conflict between within clauses deadlines and checkpoint deadlines?

I found the following similar JIRA pages, but none of those mention
circular references: https://issues.apache.org/jira/browse/FLINK-6321
https://issues.apache.org/jira/browse/FLINK-7484
https://issues.apache.org/jira/browse/FLINK-7756

Kind Regards,
Federico D'Ambrosio

Reply via email to