The notifyCheckpointComplete() method will be called when all subtasks of a
job completed their checkpoints.
Check the JavaDocs of the CheckpointListener class [1].

Please note that you need to handle the case where multiple tasks try to
create the _SUCCESS file concurrently.
So, there is a good chance of race conditions between file checks and
modifications.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java

2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>:

> Hi ,
>
> You've got a point. I saw that method, but how can I make sure that all
> the subtasks checkpoint are finished, because I can only write _SUCCESS
> file at that time.
>
> Best,
> Ben
>
>
> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> In case of a failure, Flink rolls back the job to the last checkpoint and
> reprocesses all data since that checkpoint.
> Also the BucketingSink will truncate a file to the position of the last
> checkpoint if the file system supports truncate. If not, it writes a file
> with the valid length and starts a new file.
>
> Therefore, all files that the BucketingSink finishes must be treated as
> volatile until the next checkpoint is completed.
> Only when a checkpoint is completed a finalized file may be read. The
> files are renamed on checkpoint to signal that they are final and can be
> read. This would also be the right time to generate a _SUCCESS file.
> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>
> Best, Fabian
>
>
>
>
> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>:
>
>> Hi ,
>>
>> I have tested it. There are some small problems. When checkpoint is
>> finished, the name of the file will change, and the success file will be
>> written before checkpoint.
>>
>> Best,
>> Ben
>>
>>
>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientru...@gmail.com> wrote:
>>
>> Hi,
>>
>> I did not actually test this, but I think with Flink 1.4 you can extend
>> BucketingSink and overwrite the invoke method to access the watermark
>>
>> Pseudo code:
>>
>> invoke(IN value, SinkFunction.Context context) {
>>
>>    long currentWatermark = context.watermark()
>>
>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>
>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>
>>        Write _SUCCESS
>>
>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>
>>     }
>>
>>     invoke(value)
>>
>> }
>>
>>
>> Regards,
>> Kien
>>
>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>
>> Hi:
>>
>> I think so too! But I have a question that when should I add this logic in 
>> BucketingSink! And who does this logic, and ensures that the logic is 
>> executed only once, not every parallel instance of the sink that executes 
>> this logic!
>>
>> Best,
>> Ben
>>
>>
>> On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> 
>> <unicorn.bana...@gmail.com> wrote:
>>
>> it depends on how you partition your file. in my case I write file per hour,
>> so I'm sure that file is ready after that hour period, in processing time.
>> Here, read to be ready means this file contains all the data in that hour
>> period.
>>
>> If the downstream runs in a batch way, you may want to ensure the file is
>> ready.
>> In this case, ready to read can mean all the data before watermark as
>> arrived.
>> You could take the BucketingSink and implement this logic there, maybe wait
>> until watermark
>> reaches
>>
>> Best,
>>
>> Sendoh
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>
>

Reply via email to