According to the JavaDocs of BucketingSink, in-progress files are still
being written to.
I don't know what would cause a file to remain in that state.

Another thing to mention, you might want to ensure that only the last task
that renames the file generates the _SUCCESS file.
Otherwise, the data might be consumed too early. You'd have to figure out
how to do that in a race-condition free way.

Best, Fabian

2018-02-06 11:03 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>:

> Hi,
>
> I think it can be judged at the BucketingSink.notifyCheckpointComplete()
> method  in this way, as shown below:
>
> if (!bucketState.isWriterOpen &&
>         bucketState.pendingFiles.isEmpty() &&
>         bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>     boolean flag = true;
>     RemoteIterator<LocatedFileStatus> files = fs.listFiles(new 
> Path(directory), false);
>     while (files.hasNext()) {
>         LocatedFileStatus file = files.next();
>         String fileName = file.getPath().getName();
>         if (fileName.lastIndexOf(".") != -1) {
>             flag = false;
>             break;
>         }
>     }
>     Path path = new Path(directory + "/_SUCCESS");
>     if (flag && !fs.exists(path)){
>         FSDataOutputStream outputStream = fs.create(path);
>         outputStream.flush();
>         outputStream.close();
>     }
>     // We've dealt with all the pending files and the writer for this bucket 
> is not currently open.
>     // Therefore this bucket is currently inactive and we can remove it from 
> our state.
>     bucketStatesIt.remove();
> }
>
>
> But we find this problem: occasionally, a file is always in the
> in-progress state, and the amount of data processed by each sub task is
> almost the same, and there is no data skew. There are no exceptions in the
> program.
>
> Best,
> Ben
>
>
>
> On 6 Feb 2018, at 5:50 PM, xiaobin yan <yan.xiao.bin.m...@gmail.com>
> wrote:
>
> Hi,
>
> Thanks for your reply! See here :https://github.com/apache/
> flink/blob/master/flink-connectors/flink-connector-
> filesystem/src/main/java/org/apache/flink/streaming/
> connectors/fs/bucketing/BucketingSink.java#L652  After calling this
> method, the file is renamed,and we can't determine which subtask is
> finished at last.
>
> Best,
> Ben
>
> On 6 Feb 2018, at 5:35 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> The notifyCheckpointComplete() method will be called when all subtasks of
> a job completed their checkpoints.
> Check the JavaDocs of the CheckpointListener class [1].
>
> Please note that you need to handle the case where multiple tasks try to
> create the _SUCCESS file concurrently.
> So, there is a good chance of race conditions between file checks and
> modifications.
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/blob/master/flink-
> runtime/src/main/java/org/apache/flink/runtime/state/
> CheckpointListener.java
>
> 2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>:
>
>> Hi ,
>>
>> You've got a point. I saw that method, but how can I make sure that all
>> the subtasks checkpoint are finished, because I can only write _SUCCESS
>> file at that time.
>>
>> Best,
>> Ben
>>
>>
>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> In case of a failure, Flink rolls back the job to the last checkpoint and
>> reprocesses all data since that checkpoint.
>> Also the BucketingSink will truncate a file to the position of the last
>> checkpoint if the file system supports truncate. If not, it writes a file
>> with the valid length and starts a new file.
>>
>> Therefore, all files that the BucketingSink finishes must be treated as
>> volatile until the next checkpoint is completed.
>> Only when a checkpoint is completed a finalized file may be read. The
>> files are renamed on checkpoint to signal that they are final and can be
>> read. This would also be the right time to generate a _SUCCESS file.
>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>>
>> Best, Fabian
>>
>>
>>
>>
>> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>:
>>
>>> Hi ,
>>>
>>> I have tested it. There are some small problems. When checkpoint is
>>> finished, the name of the file will change, and the success file will be
>>> written before checkpoint.
>>>
>>> Best,
>>> Ben
>>>
>>>
>>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientru...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I did not actually test this, but I think with Flink 1.4 you can extend
>>> BucketingSink and overwrite the invoke method to access the watermark
>>>
>>> Pseudo code:
>>>
>>> invoke(IN value, SinkFunction.Context context) {
>>>
>>>    long currentWatermark = context.watermark()
>>>
>>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>>
>>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) 
>>> {
>>>
>>>        Write _SUCCESS
>>>
>>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>>
>>>     }
>>>
>>>     invoke(value)
>>>
>>> }
>>>
>>>
>>> Regards,
>>> Kien
>>>
>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>>
>>> Hi:
>>>
>>> I think so too! But I have a question that when should I add this logic in 
>>> BucketingSink! And who does this logic, and ensures that the logic is 
>>> executed only once, not every parallel instance of the sink that executes 
>>> this logic!
>>>
>>> Best,
>>> Ben
>>>
>>>
>>> On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> 
>>> <unicorn.bana...@gmail.com> wrote:
>>>
>>> it depends on how you partition your file. in my case I write file per hour,
>>> so I'm sure that file is ready after that hour period, in processing time.
>>> Here, read to be ready means this file contains all the data in that hour
>>> period.
>>>
>>> If the downstream runs in a batch way, you may want to ensure the file is
>>> ready.
>>> In this case, ready to read can mean all the data before watermark as
>>> arrived.
>>> You could take the BucketingSink and implement this logic there, maybe wait
>>> until watermark
>>> reaches
>>>
>>> Best,
>>>
>>> Sendoh
>>>
>>>
>>>
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>
>>>
>>
>>
>
>
>

Reply via email to