Hey Maciek, thanks for reporting this. Having files linger around looks like a bug to me.
The idea behind having the recursive flag set to false in the AbstractFileStateHandle.discardState() call is that the FileStateHandle is actually just a single file and not a directory. The second call trying to delete the parent directory only succeeds when all other files in that directory have been deleted as well. I think this is what sometimes fails with many state handles. For RocksDB there is only a single state handle, which works well. I will open an issue for this and try to reproduce it reliably and then fix it. – Ufuk On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <m...@touk.pl> wrote: > Hi, > > we have stream job with quite large state (few GB), we're using > FSStateBackend and we're storing checkpoints in hdfs. > What we observe is that v. often old checkpoints are not discarded properly. > In hadoop logs I can see: > > 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: > blk_1084791727_11053122 10.10.113.10:50010 > 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server > handler 9 on 8020, call > org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233 > Call#12337 Retry#0 > org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non > empty': Directory is not empty > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) > > While on flink side (jobmanager log) we don't see any problems: > 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 62 @ 1462875622636 > 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 62 (in 9843 ms) > 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 63 @ 1462875652637 > 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 63 (in 13909 ms) > 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 64 @ 1462875682636 > > I see in the code that delete operations in flink are done with recursive > flag set to false - but I'm not sure why the contents are not being deleted > before? > When we were using RocksDB backed we didn't encounter such situation. > we're using flink 1.0.1 and hdfs 2.7.2. > > Do anybody has any idea why this could be happening? > > thanks, > maciek > > >