[ https://issues.apache.org/jira/browse/FLINK-11419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16753965#comment-16753965 ]
Edward Rojas commented on FLINK-11419: -------------------------------------- Hi [~kkl0u], I performed several tests and even waiting for several minutes, when trying to restart the job from latest successful checkpoint (in order to don't lose state) the error continues to appear. When waiting for several minutes the error is a little different, instead of a "AlreadyBeingCreatedException" I get a "RecoveryInProgressException". {code:java} Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to TRUNCATE_FILE /path/to/hdfs/2019-01-27/.part-0-0.inprogress.c1cab2ae-fd85-48b9-8441-a36af02225ca for DFSClient_NONMAPREDUCE_187299180_62 on xx.x.x.x because lease recovery is in progress. Try again later. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2437) at org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:111) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2056) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1043) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:629) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:868) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:814) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2603) {code} The cluster is configured with a "Fixed Delay" restart strategy and the next time the job is restarted, I get the error with the "AlreadyBeingCreatedException". The error continues appearing even waiting for an hour to restart... and even if this could have solved the issue, we cannot afford to wait that longer to restart the job. I verified that there is no other job writing to the same bucket and that there is no other taskmanager from previous executions running. I'm also able to reproduce the issue running everything in local, a Flink cluster with one jobmanager and 2 taskmanagers, local kafka and local HDFS. I send events to kafka and the job starts writing into HDFS, then I kill one taskmanager and when the other takes over, the Error occurs. When using the BucketingSink I don't have any issue. The other taskmanager takes over and continue without any problem. > StreamingFileSink fails to recover after taskmanager failure > ------------------------------------------------------------ > > Key: FLINK-11419 > URL: https://issues.apache.org/jira/browse/FLINK-11419 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.7.1 > Reporter: Edward Rojas > Priority: Major > > If a job with a StreamingFileSink sending data to HDFS is running in a > cluster with multiple taskmanagers and the taskmanager executing the job goes > down (for some reason), when the other task manager start executing the job, > it fails saying that there is some "missing data in tmp file" because it's > not able to perform a truncate in the file. > Here the full stack trace: > {code:java} > java.io.IOException: Missing data in tmp file: > hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:93) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 > for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file > lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy49.truncate(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:314) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) > at com.sun.proxy.$Proxy50.truncate(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1632) > at > org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:777) > at > org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:774) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:774) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:179) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91) > ... 17 more > {code} > > I noticed there is already some code to handle this kind of situations, I > also compared to BucketingSink (as there is no issue with BucketingSink) and > I noticed that in the StreamingFileSink the "truncate" is done before waiting > until the lease of the file is free... whereas in the BucketingSink the > "truncate" is done after waiting for the lease is free. > > For reference: > BucketingSink: > [https://github.com/apache/flink/blob/release-1.7.1/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L830-L853] > StreaminFileSink: > [https://github.com/apache/flink/blob/release-1.7.1/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L89-L96] -- This message was sent by Atlassian JIRA (v7.6.3#76005)