[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917819#comment-16917819
 ] 

Gyula Fora commented on FLINK-13874:
------------------------------------

I added some extra logging to figure out what the problem is.
What seems to happen is that the lease revoke timeout is reached before before 
we move to the truncate, so the file is never closed before truncating:

[https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L341]

 

I wonder what might cause it to take so long, and what can we do in these 
cases. Probably throw an error or at least log a warning?

> StreamingFileSink fails to recover (truncate) properly
> ------------------------------------------------------
>
>                 Key: FLINK-13874
>                 URL: https://issues.apache.org/jira/browse/FLINK-13874
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.9.0
>            Reporter: Gyula Fora
>            Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:89)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>       at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>       at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
>       at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1445)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1355)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>       at com.sun.proxy.$Proxy17.truncate(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:366)
>       at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
>       at com.sun.proxy.$Proxy18.truncate(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1545)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:862)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:859)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:869)
>       at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:197)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:164){noformat}
> Seems like even though we waited for the lease before starting the truncate, 
> it's still not ours



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to