Hi Jürgen, Is there missing data with respect to what should have been written at the time of the cancel or when the last checkpoint (or in that case, the savepoint) was performed. I’m asking because the cancel command is only sent out once the savepoint has been completed, as can be seen at [1]. If the savepoint is complete this also means that the snapshot method of the BucketingSink must have done it’s work, i.e. that it also flushed all files, which is done in [2]. There’s always the possibility of a bug, however, so we’ll have to look into this together.
Best, Aljoscha [1] https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607 <https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607> [2] https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java > On 27. Apr 2017, at 10:27, Jürgen Thomann <juergen.thom...@innogames.com> > wrote: > > Hi, > > I had some time ago problems with writing data to Hadoop with the > BucketingSink and losing data in case of cancel with savepoint because > flush/sync command was interrupted. I tried changing Hadoop settings as > suggested but had no luck at the end and looked into the Flink code. If I > understand the code correctly it behaves the following way: > > 1. Start a Watchdog thread if we have a cancellation timeout set > 2. invoke cancel on the sink/task, but do not wait for it to finish > 3. destroy buffer pool and a release resources > 4. send initial interrupt to the sink/task > 5. call join on the sink/task and ignore InterruptedException > 6. let the watchdog send more interrupts if needed and throw fatal error if > timeout is reached > > In my case the BucketingSink does not has enough time to flush everything > before the initial interrupt is sent and some files are not closed properly > which causes the missing data in Hadoop in my understanding. > > Is my understanding correct and if yes, do you know a way to get around this > behavior to let the close function finish the sync for all files? > > Best, > Jürgen