The notifyCheckpointComplete() method will be called when all subtasks of a job completed their checkpoints. Check the JavaDocs of the CheckpointListener class [1].
Please note that you need to handle the case where multiple tasks try to create the _SUCCESS file concurrently. So, there is a good chance of race conditions between file checks and modifications. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java 2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>: > Hi , > > You've got a point. I saw that method, but how can I make sure that all > the subtasks checkpoint are finished, because I can only write _SUCCESS > file at that time. > > Best, > Ben > > > On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > In case of a failure, Flink rolls back the job to the last checkpoint and > reprocesses all data since that checkpoint. > Also the BucketingSink will truncate a file to the position of the last > checkpoint if the file system supports truncate. If not, it writes a file > with the valid length and starts a new file. > > Therefore, all files that the BucketingSink finishes must be treated as > volatile until the next checkpoint is completed. > Only when a checkpoint is completed a finalized file may be read. The > files are renamed on checkpoint to signal that they are final and can be > read. This would also be the right time to generate a _SUCCESS file. > Have a look at the BucketingSink.notifyCheckpointComplete() method. > > Best, Fabian > > > > > 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>: > >> Hi , >> >> I have tested it. There are some small problems. When checkpoint is >> finished, the name of the file will change, and the success file will be >> written before checkpoint. >> >> Best, >> Ben >> >> >> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientru...@gmail.com> wrote: >> >> Hi, >> >> I did not actually test this, but I think with Flink 1.4 you can extend >> BucketingSink and overwrite the invoke method to access the watermark >> >> Pseudo code: >> >> invoke(IN value, SinkFunction.Context context) { >> >> long currentWatermark = context.watermark() >> >> long taskIndex = getRuntimeContext().getIndexOfThisSubtask() >> >> if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) { >> >> Write _SUCCESS >> >> lastSuccessWatermark = currentWatermark round down to 1 hour >> >> } >> >> invoke(value) >> >> } >> >> >> Regards, >> Kien >> >> On 1/31/2018 5:54 PM, xiaobin yan wrote: >> >> Hi: >> >> I think so too! But I have a question that when should I add this logic in >> BucketingSink! And who does this logic, and ensures that the logic is >> executed only once, not every parallel instance of the sink that executes >> this logic! >> >> Best, >> Ben >> >> >> On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> >> <unicorn.bana...@gmail.com> wrote: >> >> it depends on how you partition your file. in my case I write file per hour, >> so I'm sure that file is ready after that hour period, in processing time. >> Here, read to be ready means this file contains all the data in that hour >> period. >> >> If the downstream runs in a batch way, you may want to ensure the file is >> ready. >> In this case, ready to read can mean all the data before watermark as >> arrived. >> You could take the BucketingSink and implement this logic there, maybe wait >> until watermark >> reaches >> >> Best, >> >> Sendoh >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >> >> > >