Hi, 

Thanks for your reply! See here 
:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652
 
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652>
  After calling this method, the file is renamed,and we can't determine which 
subtask is finished at last.

Best,
Ben

> On 6 Feb 2018, at 5:35 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> The notifyCheckpointComplete() method will be called when all subtasks of a 
> job completed their checkpoints.
> Check the JavaDocs of the CheckpointListener class [1].
> 
> Please note that you need to handle the case where multiple tasks try to 
> create the _SUCCESS file concurrently.
> So, there is a good chance of race conditions between file checks and 
> modifications.
> 
> Best, Fabian
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
>  
> <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java>
> 
> 2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com 
> <mailto:yan.xiao.bin.m...@gmail.com>>:
> Hi ,
> 
>       You've got a point. I saw that method, but how can I make sure that all 
> the subtasks checkpoint are finished, because I can only write _SUCCESS file 
> at that time.
> 
> Best,
> Ben
> 
> 
>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> In case of a failure, Flink rolls back the job to the last checkpoint and 
>> reprocesses all data since that checkpoint. 
>> Also the BucketingSink will truncate a file to the position of the last 
>> checkpoint if the file system supports truncate. If not, it writes a file 
>> with the valid length and starts a new file.
>> 
>> Therefore, all files that the BucketingSink finishes must be treated as 
>> volatile until the next checkpoint is completed. 
>> Only when a checkpoint is completed a finalized file may be read. The files 
>> are renamed on checkpoint to signal that they are final and can be read. 
>> This would also be the right time to generate a _SUCCESS file.
>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>> 
>> Best, Fabian
>> 
>> 
>> 
>> 
>> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com 
>> <mailto:yan.xiao.bin.m...@gmail.com>>:
>> Hi ,
>> 
>>      I have tested it. There are some small problems. When checkpoint is 
>> finished, the name of the file will change, and the success file will be 
>> written before checkpoint.
>> 
>> Best,
>> Ben
>> 
>> 
>>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientru...@gmail.com 
>>> <mailto:duckientru...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I did not actually test this, but I think with Flink 1.4 you can extend 
>>> BucketingSink and overwrite the invoke method to access the watermark
>>> Pseudo code:
>>> invoke(IN value, SinkFunction.Context context) {
>>>    long currentWatermark = context.watermark()
>>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) 
>>> {
>>>        Write _SUCCESS
>>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>>     }
>>>     invoke(value)
>>> }
>>> 
>>> Regards,
>>> Kien
>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>>> Hi:
>>>> 
>>>> I think so too! But I have a question that when should I add this logic in 
>>>> BucketingSink! And who does this logic, and ensures that the logic is 
>>>> executed only once, not every parallel instance of the sink that executes 
>>>> this logic!
>>>> 
>>>> Best,
>>>> Ben
>>>> 
>>>>> On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> 
>>>>> <mailto:unicorn.bana...@gmail.com> wrote:
>>>>> 
>>>>> it depends on how you partition your file. in my case I write file per 
>>>>> hour,
>>>>> so I'm sure that file is ready after that hour period, in processing time.
>>>>> Here, read to be ready means this file contains all the data in that hour
>>>>> period.
>>>>> 
>>>>> If the downstream runs in a batch way, you may want to ensure the file is
>>>>> ready.
>>>>> In this case, ready to read can mean all the data before watermark as
>>>>> arrived.
>>>>> You could take the BucketingSink and implement this logic there, maybe 
>>>>> wait
>>>>> until watermark
>>>>> reaches
>>>>> 
>>>>> Best,
>>>>> 
>>>>> Sendoh
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Sent from: 
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
>> 
> 
> 

Reply via email to