By the way, I do not think below is a correct way. As @ Fabian said. The 
BucketingSink closes files once they reached a certain size (BatchSize) or have 
not been written to for a certain amount of time (InactiveBucketThreshold). 

> . If we can close
> file during checkpoint, then the result is accurately. 

And please take a look on BucketingSink code. Says, there are closed files that 
we are not currently writing to ….. But which were not yet confirmed by a 
checkpoint.

/**
 * The suffix for {@code pending} part files. These are closed files that we are
 * not currently writing to (inactive or reached {@link #batchSize}), but which
 * were not yet confirmed by a checkpoint.
 */
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
After checkpoint, the file name neither .pending nor .inprogress. So ,you can 
check your files name under every bucket to let the ETL team know when a bucket 
is ready for use.

Cheers
Minglei



> 在 2018年6月29日,上午9:03,XilangYan <xilang....@gmail.com> 写道:
> 
> Hi Febian,
> 
> Finally I have time to read the code, and it is brilliant it does provide
> exactly once guarantee。
> However I still suggest to add the function that can close a file when
> checkpoint made. I noticed that there is an enhancement
> https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
> time based rollover, but it is not very accurate.
> My user case is we read data from message queue, write to HDFS, and our ETL
> team will use the data in HDFS. In the case, ETL need to know if all data is
> ready to be read accurately, so we use a counter to count how many data has
> been wrote, if the counter is equal to the number we received, we think HDFS
> file is ready. We send the counter message in a custom sink so ETL can know
> how many data has been wrote, but if use current BucketingSink, even through
> HDFS file is flushed, ETL may still cannot read the data. If we can close
> file during checkpoint, then the result is accurately. And for the HDFS
> small file problem, it can be controller by use bigger checkpoint interval.
> 
> I did take the BuckingSink code and adapt our case, but if it can be done in
> Flink, we can save to time to maintain our own branch.
> 
> Thanks!
> Jeffrey
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to