Hi team,

We have a streaming job running with 1 JM + 4 TM in our k8s cluster.
Recently one of the TMs encountered some failure, and the job can't be
recovered from the lastest state from the checkpoint. From the log we found
something suspicious -

2022-01-21T13:38:41.296Z | FlinkStreamJob | SPNJP1 |
spaas-nj-prod01-r00n23 | namespace=s-art |
pod=flinkfileintegratehbaseindexer-taskmanager-0 | INFO |
pool-16-thread-23 | o.a.f.r.t.Task | Source:
consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4)
(5051444b987b69d69df24df5aa3adeaf) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
java.lang.Exception: Could not materialize checkpoint 15931959 for
operator Source:
consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint
15931959 for operator Source:
consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
        ... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to 
hdfs:/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints/00000000000000000000000000000000/chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a
in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
        ... 5 common frames omitted
Caused by: java.io.IOException: Could not flush and close the file
system output stream to
hdfs:/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints/00000000000000000000000000000000/chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a
in order to obtain the stream state handle
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
        at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
        ... 7 common frames omitted
Caused by: org.apache.hadoop.ipc.RemoteException: No lease on
/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints/00000000000000000000000000000000/chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a
(inode 34171509797): File does not exist. Holder
DFSClient_NONMAPREDUCE_-946314839_1 does not have any open files.
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3693)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3781)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3748)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:549)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

        at org.apache.hadoop.ipc.Client.call(Client.java:1508)
        at org.apache.hadoop.ipc.Client.call(Client.java:1429)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy11.complete(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462)
        at sun.reflect.GeneratedMethodAccessor185.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:216)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:107)
        at com.sun.proxy.$Proxy12.complete(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2291)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2267)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2232)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
        at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
        ... 12 common frames omitted


Is that the root cause and how to avoid this issue in the future? Thanks.

Reply via email to