Hello everyone!

In our planned setup we have 2 data centers, each in different geographic zone 
(and third for ZK as tie breaker). We use HA with ZooKeeper, as follows:

Normally, DC1 will run our job:

DC1

DC2

DC3

Machine 1

Machine 2

Machine 3

Machine 4

Machine 5

ZK1

ZK2

ZK3

ZK4

ZK5

JM1 (leader)

JM2







TM1

TM2








But, after DC1 crashes, DC2 will take over, starting Flink processes and resume 
our job from checkpoint:

DC1

DC2

DC3

Machine 1

Machine 2

Machine 3

Machine 4

Machine 5





ZK3

ZK4

ZK5





JM3 (leader)

JM4







TM3

TM4





For checkpoints, we use filesystem over NAS.
And we have NAS in DC1, and replication of it in DC2. The replication is done 
in background, once a 1 minute.
If DC1 crashes, we recover our job in DC2, over its NAS replica (no zero data 
loss).
Our concern is that when job recovers in DC2, the checkpoint state in its NAS 
replica will be behind with respect to ZooKeeper's checkpoint reference. 
Meaning, ZooKeeper might point to checkpoint x, while in NAS of DC2 it still 
has only checkpoint x-1.

We hoped that using configuration "state.checkpoints.num-retained: 3" we will 
be able to solve it. That is - we had 3 latest checkpoints retained, and we 
tried to simulate such scenario by deleting files of checkpoint 14 (current 
latest, besides 13, 12), while leaving reference to checkpoint 14 in ZooKeeper. 
Our hope was that Flink will failover from that to checkpoint 13, but instead 
we see that it keeps trying to recover from 14, and failing with error "no such 
file or directory". Like that:

java.lang.IllegalStateException: Could not initialize keyed state backend.
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 
~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
[flink-dist_2.11-1.4.2.jar:1.4.2]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
Caused by: java.io.FileNotFoundException: 
/logs/failsafe/checkpoints/3e1de245fb1cf1226aad7351a818be96/chk-14/57d26c1c-df40-4b6f-8046-500eb4c8a0b2
 (No such file or directory)
        at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_161]
        at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_161]
        at java.io.FileInputStream.<init>(FileInputStream.java:138) 
~[?:1.8.0_161]
        at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:471)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:446)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:282)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
        ... 6 more

BTW, if we delete reference to checkpoint 14 in ZK, job recovers successfully 
from checkpoint 13. So we could try to automate that, by somehow monitoring & 
detecting that we fail to recover checkpoint x, so delete it in ZK so x-1 will 
be taken. Not great.

But maybe you have better ideas for how to deal with such setup with 
checkpoints replicated between 2 DCs while using ZK cluster for HA stretched 
over these 2 DCs?

Thanks!
Shay

Reply via email to