[ https://issues.apache.org/jira/browse/FLINK-11419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-11419: ----------------------------------- Labels: pull-request-available (was: ) > StreamingFileSink fails to recover after taskmanager failure > ------------------------------------------------------------ > > Key: FLINK-11419 > URL: https://issues.apache.org/jira/browse/FLINK-11419 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.7.1 > Reporter: Edward Rojas > Assignee: Edward Rojas > Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > > If a job with a StreamingFileSink sending data to HDFS is running in a > cluster with multiple taskmanagers and the taskmanager executing the job goes > down (for some reason), when the other task manager start executing the job, > it fails saying that there is some "missing data in tmp file" because it's > not able to perform a truncate in the file. > Here the full stack trace: > {code:java} > java.io.IOException: Missing data in tmp file: > hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:93) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 > for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file > lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy49.truncate(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:314) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) > at com.sun.proxy.$Proxy50.truncate(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:1632) > at > org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:777) > at > org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:774) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:774) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:179) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:91) > ... 17 more > {code} > > I noticed there is already some code to handle this kind of situations, I > also compared to BucketingSink (as there is no issue with BucketingSink) and > I noticed that in the StreamingFileSink the "truncate" is done before waiting > until the lease of the file is free... whereas in the BucketingSink the > "truncate" is done after waiting for the lease is free. > > For reference: > BucketingSink: > [https://github.com/apache/flink/blob/release-1.7.1/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L830-L853] > StreaminFileSink: > [https://github.com/apache/flink/blob/release-1.7.1/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L89-L96] -- This message was sent by Atlassian JIRA (v7.6.3#76005)