Hi Ufuk,
It seems I messed it up a bit :)
I cannot comment on jira, since it's temporarily locked...
1. org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty - this seems to be expected behaviour, as
AbstractFileStateHandle.discardState():
// send a call to delete the checkpoint directory containing the file.
This will
// fail (and be ignored) when some files still exist
try {
getFileSystem().delete(filePath.getParent(), false);
} catch (IOException ignored) {}
- so this is working as expected, although it causes a lot of garbage in
hdfs logs...
2. The problem with not discarded checkpoints seems to be related to
periods when we don't have any traffic (during night).
At that point many checkpoints "expire before completing":
2016-05-13 00:00:10,585 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Triggering checkpoint 199 @ 1463090410585
2016-05-13 00:10:10,585 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Checkpoint 199 expired before completing.
2016-05-13 00:25:14,650 [flink-akka.actor.default-dispatcher-280300]
WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Received late message for now expired checkpoint attempt 199
When checkpoint manage to complete they take v. long to do so:
2016-05-13 00:25:19,071 [flink-akka.actor.default-dispatcher-280176]
INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Completed checkpoint 201 (in 308472 ms)
- this is happening when no new messages arrive (we have simple process
like kafka->keyBy->custom state aggregation->kafka, with EventTime time
characteristic)
I think I messed sth up with eventTime & generating watermarks - I'll
have to check it.
With RocksDB I made checkpoints at much larger intervals, so probably
that's why I haven't noticed the disk is getting full.
OTOH - shouldn't expired checkpoints be cleaned up automatically?
Sorry for confustion and thanks for help
thanks,
maciek
On 12/05/2016 21:28, Maciek Próchniak wrote:
thanks,
I'll try to reproduce it in some test by myself...
maciek
On 12/05/2016 18:39, Ufuk Celebi wrote:
The issue is here: https://issues.apache.org/jira/browse/FLINK-3902
(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)
On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <u...@apache.org> wrote:
Hey Maciek,
thanks for reporting this. Having files linger around looks like a
bug to me.
The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.
I will open an issue for this and try to reproduce it reliably and
then fix it.
– Ufuk
On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <m...@touk.pl> wrote:
Hi,
we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded
properly.
In hadoop logs I can see:
2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK*
addToInvalidates:
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from
10.10.113.9:49233
Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is
non
empty': Directory is not empty
at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507
[flink-akka.actor.default-dispatcher-240088] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563
[flink-akka.actor.default-dispatcher-240028] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 64 @ 1462875682636
I see in the code that delete operations in flink are done with
recursive
flag set to false - but I'm not sure why the contents are not being
deleted
before?
When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.
Do anybody has any idea why this could be happening?
thanks,
maciek