Hi, Leor

Seems like the checkpoint file
`/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc`
did not exist for some reason, you can check the life cycle of this file
from hdfs audit log and find out why the file did not exist. maybe the
checkpoint directory has been removed because the checkpoint 10631 has been
subsumed[1][2].

[1] https://issues.apache.org/jira/browse/FLINK-10930
[2] https://issues.apache.org/jira/browse/FLINK-10724

Leor Li <brighter.leo...@gmail.com> 于2019年1月13日周日 下午6:59写道:

> Hey,
>
> Recently I used flink1.6.3 to run a task and the following exception
> occurred. Can someone tell me how to solve this problem?
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 10631 for operator Source: Custom Source -> Flat Map -> (Map ->
> Timestamps/Watermarks -> from: (dtEventTime, bkdata_par_offset, N_Route,
> iEventId, vVersion, vFromIp, vLocalIp, iRequestType, vAppid, vOpenid,
> vCmdid, vL5_ip_port, iPermission, iResult, dProcessTime, vErrorMsg, vUrl,
> vDeviceInfo, rowtime) -> where: (OR(=(iResult, -308), =(iResult, -309))),
> select: (rowtime, vErrorMsg) -> time attribute: (rowtime), Map) (8/20).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 10631 for
> operator Source: Custom Source -> Flat Map -> (Map -> Timestamps/Watermarks
> -> from: (dtEventTime, bkdata_par_offset, N_Route, iEventId, vVersion,
> vFromIp, vLocalIp, iRequestType, vAppid, vOpenid, vCmdid, vL5_ip_port,
> iPermission, iResult, dProcessTime, vErrorMsg, vUrl, vDeviceInfo, rowtime)
> -> where: (OR(=(iResult, -308), =(iResult, -309))), select: (rowtime,
> vErrorMsg) -> time attribute: (rowtime), Map) (8/20).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
> hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
> in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
> at
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on
> /app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
> (inode 7450914863): File does not exist. Holder
> DFSClient_NONMAPREDUCE_-1995220833_1 does not have any open files.
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3690)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3660)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:729)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:243)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:527)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1468)
> at org.apache.hadoop.ipc.Client.call(Client.java:1399)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy9.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:443)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2283)
> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2267)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
> ... 12 more
>


-- 
Best,
Congxian

Reply via email to