Hi,

I had some time ago problems with writing data to Hadoop with the BucketingSink and losing data in case of cancel with savepoint because flush/sync command was interrupted. I tried changing Hadoop settings as suggested but had no luck at the end and looked into the Flink code. If I understand the code correctly it behaves the following way:

1. Start a Watchdog thread if we have a cancellation timeout set
2. invoke cancel on the sink/task, but do not wait for it to finish
3. destroy buffer pool and a release resources
4. send initial interrupt to the sink/task
5. call join on the sink/task and ignore InterruptedException
6. let the watchdog send more interrupts if needed and throw fatal error if timeout is reached

In my case the BucketingSink does not has enough time to flush everything before the initial interrupt is sent and some files are not closed properly which causes the missing data in Hadoop in my understanding.

Is my understanding correct and if yes, do you know a way to get around this behavior to let the close function finish the sync for all files?

Best,
Jürgen

Reply via email to