Hi team, We have a streaming job running with 1 JM + 4 TM in our k8s cluster. Recently one of the TMs encountered some failure, and the job can't be recovered from the lastest state from the checkpoint. From the log we found something suspicious -
2022-01-21T13:38:41.296Z | FlinkStreamJob | SPNJP1 | spaas-nj-prod01-r00n23 | namespace=s-art | pod=flinkfileintegratehbaseindexer-taskmanager-0 | INFO | pool-16-thread-23 | o.a.f.r.t.Task | Source: consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4) (5051444b987b69d69df24df5aa3adeaf) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 15931959 for operator Source: consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 15931959 for operator Source: consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints/00000000000000000000000000000000/chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints/00000000000000000000000000000000/chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 common frames omitted Caused by: org.apache.hadoop.ipc.RemoteException: No lease on /projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints/00000000000000000000000000000000/chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a (inode 34171509797): File does not exist. Holder DFSClient_NONMAPREDUCE_-946314839_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3693) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3781) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3748) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:549) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.call(Client.java:1508) at org.apache.hadoop.ipc.Client.call(Client.java:1429) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy11.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462) at sun.reflect.GeneratedMethodAccessor185.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:216) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:107) at com.sun.proxy.$Proxy12.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2291) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2267) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2232) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314) ... 12 common frames omitted Is that the root cause and how to avoid this issue in the future? Thanks.