Hi Aljoscha,
In my case the valid-length file created contains a value which e.g.
says 100 MB are valid to read for exactly once but the file with the
data is only 95 MB large. As I understand it the valid-length file
contains the length of the file at the checkpoint. This does also not
happen for all files (3 HDFS sinks each with a parallelism of 2). For
some parts the file size and the value in the valid-length file match
exactly.
After looking now over the checkpoint code in BucketingSink I looked
into the hsync behavior again and found the following page:
http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
After this I downloaded the file with the hdfs dfs tool and actually the
file is now even larger than the valid-length file. I checked this
against the things I did before (Impala and hive select count query, and
Hue download of files and wc -l) and this 3 ways result in the same
amount of lines but hdfs dfs -cat <filename> | wc -l gives a much larger
value.
So my conclusion would be that the data is written and not exactly lost
as I thought, but for my use case not visible because the files are not
properly closed during cancel and the namenode is not aware of the
flushed data. So I could imagine 2 ways out of this: 1. implement the
hsync as stated at the Stack Overflow page or 2. ensure that files are
properly closed during cancel.
Best,
Jürgen
On 28.04.2017 17:38, Aljoscha Krettek wrote:
Hi Jürgen,
Is there missing data with respect to what should have been written at
the time of the cancel or when the last checkpoint (or in that case,
the savepoint) was performed. I’m asking because the cancel command is
only sent out once the savepoint has been completed, as can be seen at
[1]. If the savepoint is complete this also means that the snapshot
method of the BucketingSink must have done it’s work, i.e. that it
also flushed all files, which is done in [2]. There’s always the
possibility of a bug, however, so we’ll have to look into this together.
Best,
Aljoscha
[1]
https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607
[2]
https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
On 27. Apr 2017, at 10:27, Jürgen Thomann
<juergen.thom...@innogames.com
<mailto:juergen.thom...@innogames.com>> wrote:
Hi,
I had some time ago problems with writing data to Hadoop with the
BucketingSink and losing data in case of cancel with savepoint
because flush/sync command was interrupted. I tried changing Hadoop
settings as suggested but had no luck at the end and looked into the
Flink code. If I understand the code correctly it behaves the
following way:
1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal
error if timeout is reached
In my case the BucketingSink does not has enough time to flush
everything before the initial interrupt is sent and some files are
not closed properly which causes the missing data in Hadoop in my
understanding.
Is my understanding correct and if yes, do you know a way to get
around this behavior to let the close function finish the sync for
all files?
Best,
Jürgen