Hi @all! I figured out a strange behavior with the Rolling HDFS-Sink. We consume events from a kafka topic and write them into a HDFS Filesystem. We use the RollingSink-Implementation in this way:
RollingSink<String> sink = new RollingSink<String>("/some/hdfs/directory") // .setBucketer(new DateTimeBucketer(YYYY_MM_DD)) // .disableCleanupOnOpen() // .setBatchSize(10485760L) // .setPartPrefix("part") // .setPendingPrefix("") // .setPendingSuffix(""); The last days we had some network trouble causing one or more TaskManager out of service for some time. Due to that reason, some flink jobs are canceled because there were not enought slots available. After the TaskManager come back, the jobs were restarted. After that, all (!!) HDFS-Directories are absolute clean. This means that no data file is left under the root directory /some/hdfs/directory matching our path and file name pattern. The stacktrace below is generated and shows, that the job tries to recover from the last state and expect a data file existing. java.lang.Exception: Could not restore checkpointed state to operators and functions at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Failed to restore state to function: Error while restoring RollingSink state. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:544) ... 3 more Caused by: java.lang.RuntimeException: Error while restoring RollingSink state. at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:680) at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:123) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) ... 4 more Caused by: java.io.FileNotFoundException: File does not exist: /some/hdfs/directory/2016-11-07/part-0-0 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200) at sun.reflect.GeneratedConstructorAccessor133.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1247) at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:279) at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:275) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.recoverLease(DistributedFileSystem.java:291) at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:625) ... 6 more Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/flink/kraft/kraft-vorschlag/2016-11-07/part-0-0 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy13.recoverLease(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.recoverLease(ClientNamenodeProtocolTranslatorPB.java:603) at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy14.recoverLease(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1245) ... 11 more Has anyone out there similar experience or any clue how to stop flink/yarn/hdfs doing this? Greets Dominique
0x962E5CF3.asc
Description: application/pgp-keys
signature.asc
Description: OpenPGP digital signature