By the way, I do not think below is a correct way. As @ Fabian said. The BucketingSink closes files once they reached a certain size (BatchSize) or have not been written to for a certain amount of time (InactiveBucketThreshold).
> . If we can close > file during checkpoint, then the result is accurately. And please take a look on BucketingSink code. Says, there are closed files that we are not currently writing to ….. But which were not yet confirmed by a checkpoint. /** * The suffix for {@code pending} part files. These are closed files that we are * not currently writing to (inactive or reached {@link #batchSize}), but which * were not yet confirmed by a checkpoint. */ private static final String DEFAULT_PENDING_SUFFIX = ".pending"; After checkpoint, the file name neither .pending nor .inprogress. So ,you can check your files name under every bucket to let the ETL team know when a bucket is ready for use. Cheers Minglei > 在 2018年6月29日,上午9:03,XilangYan <xilang....@gmail.com> 写道: > > Hi Febian, > > Finally I have time to read the code, and it is brilliant it does provide > exactly once guarantee。 > However I still suggest to add the function that can close a file when > checkpoint made. I noticed that there is an enhancement > https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a > time based rollover, but it is not very accurate. > My user case is we read data from message queue, write to HDFS, and our ETL > team will use the data in HDFS. In the case, ETL need to know if all data is > ready to be read accurately, so we use a counter to count how many data has > been wrote, if the counter is equal to the number we received, we think HDFS > file is ready. We send the counter message in a custom sink so ETL can know > how many data has been wrote, but if use current BucketingSink, even through > HDFS file is flushed, ETL may still cannot read the data. If we can close > file during checkpoint, then the result is accurately. And for the HDFS > small file problem, it can be controller by use bigger checkpoint interval. > > I did take the BuckingSink code and adapt our case, but if it can be done in > Flink, we can save to time to maintain our own branch. > > Thanks! > Jeffrey > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/