Thanks for looking into this, Jürgen! I opened a Jira issue: https://issues.apache.org/jira/browse/FLINK-6427 <https://issues.apache.org/jira/browse/FLINK-6427>
> On 29. Apr 2017, at 09:15, Jürgen Thomann <juergen.thom...@innogames.com> > wrote: > > Hi Aljoscha, > > In my case the valid-length file created contains a value which e.g. says 100 > MB are valid to read for exactly once but the file with the data is only 95 > MB large. As I understand it the valid-length file contains the length of the > file at the checkpoint. This does also not happen for all files (3 HDFS sinks > each with a parallelism of 2). For some parts the file size and the value in > the valid-length file match exactly. > > After looking now over the checkpoint code in BucketingSink I looked into the > hsync behavior again and found the following page: > http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file > > <http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file> > After this I downloaded the file with the hdfs dfs tool and actually the file > is now even larger than the valid-length file. I checked this against the > things I did before (Impala and hive select count query, and Hue download of > files and wc -l) and this 3 ways result in the same amount of lines but hdfs > dfs -cat <filename> | wc -l gives a much larger value. > > So my conclusion would be that the data is written and not exactly lost as I > thought, but for my use case not visible because the files are not properly > closed during cancel and the namenode is not aware of the flushed data. So I > could imagine 2 ways out of this: 1. implement the hsync as stated at the > Stack Overflow page or 2. ensure that files are properly closed during cancel. > > Best, > Jürgen > > On 28.04.2017 17:38, Aljoscha Krettek wrote: >> Hi Jürgen, >> Is there missing data with respect to what should have been written at the >> time of the cancel or when the last checkpoint (or in that case, the >> savepoint) was performed. I’m asking because the cancel command is only sent >> out once the savepoint has been completed, as can be seen at [1]. If the >> savepoint is complete this also means that the snapshot method of the >> BucketingSink must have done it’s work, i.e. that it also flushed all files, >> which is done in [2]. There’s always the possibility of a bug, however, so >> we’ll have to look into this together. >> >> Best, >> Aljoscha >> >> [1] >> https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607 >> >> <https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607> >> >> [2] >> https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java >> >> <https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java> >> >>> On 27. Apr 2017, at 10:27, Jürgen Thomann <juergen.thom...@innogames.com >>> <mailto:juergen.thom...@innogames.com>> wrote: >>> >>> Hi, >>> >>> I had some time ago problems with writing data to Hadoop with the >>> BucketingSink and losing data in case of cancel with savepoint because >>> flush/sync command was interrupted. I tried changing Hadoop settings as >>> suggested but had no luck at the end and looked into the Flink code. If I >>> understand the code correctly it behaves the following way: >>> >>> 1. Start a Watchdog thread if we have a cancellation timeout set >>> 2. invoke cancel on the sink/task, but do not wait for it to finish >>> 3. destroy buffer pool and a release resources >>> 4. send initial interrupt to the sink/task >>> 5. call join on the sink/task and ignore InterruptedException >>> 6. let the watchdog send more interrupts if needed and throw fatal error if >>> timeout is reached >>> >>> In my case the BucketingSink does not has enough time to flush everything >>> before the initial interrupt is sent and some files are not closed properly >>> which causes the missing data in Hadoop in my understanding. >>> >>> Is my understanding correct and if yes, do you know a way to get around >>> this behavior to let the close function finish the sync for all files? >>> >>> Best, >>> Jürgen >>