Hello Dominique,

I think the problem is that you set both pending prefix and suffix to “”.
Doing this makes the “committed” or “finished” filepaths indistinguishable from 
the pending ones.
Thus they are cleaned up upon restoring.

Could you undo this, and put for example a suffix “pending” or sth like this
and let us know if this works?

Thanks,
Kostas

> On Nov 15, 2016, at 2:57 PM, Dominique Rondé <dominique.ro...@allsecur.de> 
> wrote:
> 
> Hi @all!
> 
> I figured out a strange behavior with the Rolling HDFS-Sink. We consume
> events from a kafka topic and write them into a HDFS Filesystem. We use
> the RollingSink-Implementation in this way:
> 
>    RollingSink<String> sink = new
> RollingSink<String>("/some/hdfs/directory") //
>            .setBucketer(new DateTimeBucketer(YYYY_MM_DD)) //
>            .disableCleanupOnOpen() //
>            .setBatchSize(10485760L) //
>            .setPartPrefix("part") //
>            .setPendingPrefix("") //
>            .setPendingSuffix("");
> 
> The last days we had some network trouble causing one or more
> TaskManager out of service for some time. Due to that reason, some flink
> jobs are canceled because there were not enought slots available. After
> the TaskManager come back, the jobs were restarted. After that, all (!!)
> HDFS-Directories are absolute clean. This means that no data file is
> left under the root directory /some/hdfs/directory matching our path and
> file name pattern. The stacktrace below is generated and shows, that the
> job tries to recover from the last state and expect a data file existing.
> 
> java.lang.Exception: Could not restore checkpointed state to operators
> and functions
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>    at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Failed to restore state to function:
> Error while restoring RollingSink state.
>    at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:544)
>    ... 3 more
> Caused by: java.lang.RuntimeException: Error while restoring RollingSink
> state.
>    at
> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:680)
>    at
> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:123)
>    at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>    ... 4 more
> Caused by: java.io.FileNotFoundException: File does not exist:
> /some/hdfs/directory/2016-11-07/part-0-0
>    at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>    at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>    at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877)
>    at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753)
>    at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671)
>    at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>    at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
>    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
>    at java.security.AccessController.doPrivileged(Native Method)
>    at javax.security.auth.Subject.doAs(Subject.java:422)
>    at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
>    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)
> 
>    at sun.reflect.GeneratedConstructorAccessor133.newInstance(Unknown
> Source)
>    at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>    at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>    at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>    at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1247)
>    at
> org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:279)
>    at
> org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:275)
>    at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>    at
> org.apache.hadoop.hdfs.DistributedFileSystem.recoverLease(DistributedFileSystem.java:291)
>    at
> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:625)
>    ... 6 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
> File does not exist: /user/flink/kraft/kraft-vorschlag/2016-11-07/part-0-0
>    at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>    at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>    at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877)
>    at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753)
>    at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671)
>    at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>    at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
>    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
>    at java.security.AccessController.doPrivileged(Native Method)
>    at javax.security.auth.Subject.doAs(Subject.java:422)
>    at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
>    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)
> 
>    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>    at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>    at com.sun.proxy.$Proxy13.recoverLease(Unknown Source)
>    at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.recoverLease(ClientNamenodeProtocolTranslatorPB.java:603)
>    at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)
>    at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>    at java.lang.reflect.Method.invoke(Method.java:497)
>    at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>    at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>    at com.sun.proxy.$Proxy14.recoverLease(Unknown Source)
>    at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1245)
>    ... 11 more
> 
> Has anyone out there similar experience or any clue how to stop
> flink/yarn/hdfs doing this?
> 
> Greets
> 
> Dominique
> 
> <0x962E5CF3.asc>

Reply via email to