Hi Dominique, I totally agree with you on opening a JIRA. I would suggest to add these “collision” checks also for other types like in-progress and pending for example.
Thanks for reporting it, Kostas > On Nov 15, 2016, at 10:34 PM, Dominique Rondé <dominique.ro...@allsecur.de> > wrote: > > Thanks Kostas for this fast and helpful response! I was able to reproduce it > and changing the prefix and suffix really solve my problem. > > I like to suggest to check both values and write a warning into the logs. If > there is no doubt, i like to open a jira and add this message. > > Greets > Dominique > > > > Von meinem Samsung Gerät gesendet. > > > -------- Ursprüngliche Nachricht -------- > Von: Kostas Kloudas <k.klou...@data-artisans.com> > Datum: 15.11.16 15:51 (GMT+01:00) > An: user@flink.apache.org > Betreff: Re: Data Loss in HDFS after Job failure > > Hello Dominique, > > I think the problem is that you set both pending prefix and suffix to “”. > Doing this makes the “committed” or “finished” filepaths indistinguishable > from the pending ones. > Thus they are cleaned up upon restoring. > > Could you undo this, and put for example a suffix “pending” or sth like this > and let us know if this works? > > Thanks, > Kostas > > > On Nov 15, 2016, at 2:57 PM, Dominique Rondé <dominique.ro...@allsecur.de> > > wrote: > > > > Hi @all! > > > > I figured out a strange behavior with the Rolling HDFS-Sink. We consume > > events from a kafka topic and write them into a HDFS Filesystem. We use > > the RollingSink-Implementation in this way: > > > > RollingSink<String> sink = new > > RollingSink<String>("/some/hdfs/directory") // > > .setBucketer(new DateTimeBucketer(YYYY_MM_DD)) // > > .disableCleanupOnOpen() // > > .setBatchSize(10485760L) // > > .setPartPrefix("part") // > > .setPendingPrefix("") // > > .setPendingSuffix(""); > > > > The last days we had some network trouble causing one or more > > TaskManager out of service for some time. Due to that reason, some flink > > jobs are canceled because there were not enought slots available. After > > the TaskManager come back, the jobs were restarted. After that, all (!!) > > HDFS-Directories are absolute clean. This means that no data file is > > left under the root directory /some/hdfs/directory matching our path and > > file name pattern. The stacktrace below is generated and shows, that the > > job tries to recover from the last state and expect a data file existing. > > > > java.lang.Exception: Could not restore checkpointed state to operators > > and functions > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.Exception: Failed to restore state to function: > > Error while restoring RollingSink state. > > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:544) > > ... 3 more > > Caused by: java.lang.RuntimeException: Error while restoring RollingSink > > state. > > at > > org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:680) > > at > > org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:123) > > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > > ... 4 more > > Caused by: java.io.FileNotFoundException: File does not exist: > > /some/hdfs/directory/2016-11-07/part-0-0 > > at > > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) > > at > > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > > at > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877) > > at > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753) > > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671) > > at > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709) > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200) > > > > at sun.reflect.GeneratedConstructorAccessor133.newInstance(Unknown > > Source) > > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > > at > > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > > at > > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > > at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1247) > > at > > org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:279) > > at > > org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:275) > > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > > at > > org.apache.hadoop.hdfs.DistributedFileSystem.recoverLease(DistributedFileSystem.java:291) > > at > > org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:625) > > ... 6 more > > Caused by: > > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): > > File does not exist: /user/flink/kraft/kraft-vorschlag/2016-11-07/part-0-0 > > at > > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) > > at > > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > > at > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877) > > at > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753) > > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671) > > at > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709) > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200) > > > > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > > at > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > > at com.sun.proxy.$Proxy13.recoverLease(Unknown Source) > > at > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.recoverLease(ClientNamenodeProtocolTranslatorPB.java:603) > > at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > > at com.sun.proxy.$Proxy14.recoverLease(Unknown Source) > > at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1245) > > ... 11 more > > > > Has anyone out there similar experience or any clue how to stop > > flink/yarn/hdfs doing this? > > > > Greets > > > > Dominique > > > > <0x962E5CF3.asc> >