According to the JavaDocs of BucketingSink, in-progress files are still being written to. I don't know what would cause a file to remain in that state.
Another thing to mention, you might want to ensure that only the last task that renames the file generates the _SUCCESS file. Otherwise, the data might be consumed too early. You'd have to figure out how to do that in a race-condition free way. Best, Fabian 2018-02-06 11:03 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>: > Hi, > > I think it can be judged at the BucketingSink.notifyCheckpointComplete() > method in this way, as shown below: > > if (!bucketState.isWriterOpen && > bucketState.pendingFiles.isEmpty() && > bucketState.pendingFilesPerCheckpoint.isEmpty()) { > boolean flag = true; > RemoteIterator<LocatedFileStatus> files = fs.listFiles(new > Path(directory), false); > while (files.hasNext()) { > LocatedFileStatus file = files.next(); > String fileName = file.getPath().getName(); > if (fileName.lastIndexOf(".") != -1) { > flag = false; > break; > } > } > Path path = new Path(directory + "/_SUCCESS"); > if (flag && !fs.exists(path)){ > FSDataOutputStream outputStream = fs.create(path); > outputStream.flush(); > outputStream.close(); > } > // We've dealt with all the pending files and the writer for this bucket > is not currently open. > // Therefore this bucket is currently inactive and we can remove it from > our state. > bucketStatesIt.remove(); > } > > > But we find this problem: occasionally, a file is always in the > in-progress state, and the amount of data processed by each sub task is > almost the same, and there is no data skew. There are no exceptions in the > program. > > Best, > Ben > > > > On 6 Feb 2018, at 5:50 PM, xiaobin yan <yan.xiao.bin.m...@gmail.com> > wrote: > > Hi, > > Thanks for your reply! See here :https://github.com/apache/ > flink/blob/master/flink-connectors/flink-connector- > filesystem/src/main/java/org/apache/flink/streaming/ > connectors/fs/bucketing/BucketingSink.java#L652 After calling this > method, the file is renamed,and we can't determine which subtask is > finished at last. > > Best, > Ben > > On 6 Feb 2018, at 5:35 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > The notifyCheckpointComplete() method will be called when all subtasks of > a job completed their checkpoints. > Check the JavaDocs of the CheckpointListener class [1]. > > Please note that you need to handle the case where multiple tasks try to > create the _SUCCESS file concurrently. > So, there is a good chance of race conditions between file checks and > modifications. > > Best, Fabian > > [1] https://github.com/apache/flink/blob/master/flink- > runtime/src/main/java/org/apache/flink/runtime/state/ > CheckpointListener.java > > 2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>: > >> Hi , >> >> You've got a point. I saw that method, but how can I make sure that all >> the subtasks checkpoint are finished, because I can only write _SUCCESS >> file at that time. >> >> Best, >> Ben >> >> >> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >> In case of a failure, Flink rolls back the job to the last checkpoint and >> reprocesses all data since that checkpoint. >> Also the BucketingSink will truncate a file to the position of the last >> checkpoint if the file system supports truncate. If not, it writes a file >> with the valid length and starts a new file. >> >> Therefore, all files that the BucketingSink finishes must be treated as >> volatile until the next checkpoint is completed. >> Only when a checkpoint is completed a finalized file may be read. The >> files are renamed on checkpoint to signal that they are final and can be >> read. This would also be the right time to generate a _SUCCESS file. >> Have a look at the BucketingSink.notifyCheckpointComplete() method. >> >> Best, Fabian >> >> >> >> >> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.m...@gmail.com>: >> >>> Hi , >>> >>> I have tested it. There are some small problems. When checkpoint is >>> finished, the name of the file will change, and the success file will be >>> written before checkpoint. >>> >>> Best, >>> Ben >>> >>> >>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientru...@gmail.com> wrote: >>> >>> Hi, >>> >>> I did not actually test this, but I think with Flink 1.4 you can extend >>> BucketingSink and overwrite the invoke method to access the watermark >>> >>> Pseudo code: >>> >>> invoke(IN value, SinkFunction.Context context) { >>> >>> long currentWatermark = context.watermark() >>> >>> long taskIndex = getRuntimeContext().getIndexOfThisSubtask() >>> >>> if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) >>> { >>> >>> Write _SUCCESS >>> >>> lastSuccessWatermark = currentWatermark round down to 1 hour >>> >>> } >>> >>> invoke(value) >>> >>> } >>> >>> >>> Regards, >>> Kien >>> >>> On 1/31/2018 5:54 PM, xiaobin yan wrote: >>> >>> Hi: >>> >>> I think so too! But I have a question that when should I add this logic in >>> BucketingSink! And who does this logic, and ensures that the logic is >>> executed only once, not every parallel instance of the sink that executes >>> this logic! >>> >>> Best, >>> Ben >>> >>> >>> On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> >>> <unicorn.bana...@gmail.com> wrote: >>> >>> it depends on how you partition your file. in my case I write file per hour, >>> so I'm sure that file is ready after that hour period, in processing time. >>> Here, read to be ready means this file contains all the data in that hour >>> period. >>> >>> If the downstream runs in a batch way, you may want to ensure the file is >>> ready. >>> In this case, ready to read can mean all the data before watermark as >>> arrived. >>> You could take the BucketingSink and implement this logic there, maybe wait >>> until watermark >>> reaches >>> >>> Best, >>> >>> Sendoh >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>> >>> >> >> > > >