By the way, I do not think below is a correct way. As @ Fabian said. The
BucketingSink closes files once they reached a certain size (BatchSize) or have
not been written to for a certain amount of time (InactiveBucketThreshold).
> . If we can close
> file during checkpoint, then the result is accurately.
And please take a look on BucketingSink code. Says, there are closed files that
we are not currently writing to ….. But which were not yet confirmed by a
checkpoint.
/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
After checkpoint, the file name neither .pending nor .inprogress. So ,you can
check your files name under every bucket to let the ETL team know when a bucket
is ready for use.
Cheers
Minglei
> 在 2018年6月29日,上午9:03,XilangYan <[email protected]> 写道:
>
> Hi Febian,
>
> Finally I have time to read the code, and it is brilliant it does provide
> exactly once guarantee。
> However I still suggest to add the function that can close a file when
> checkpoint made. I noticed that there is an enhancement
> https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
> time based rollover, but it is not very accurate.
> My user case is we read data from message queue, write to HDFS, and our ETL
> team will use the data in HDFS. In the case, ETL need to know if all data is
> ready to be read accurately, so we use a counter to count how many data has
> been wrote, if the counter is equal to the number we received, we think HDFS
> file is ready. We send the counter message in a custom sink so ETL can know
> how many data has been wrote, but if use current BucketingSink, even through
> HDFS file is flushed, ETL may still cannot read the data. If we can close
> file during checkpoint, then the result is accurately. And for the HDFS
> small file problem, it can be controller by use bigger checkpoint interval.
>
> I did take the BuckingSink code and adapt our case, but if it can be done in
> Flink, we can save to time to maintain our own branch.
>
> Thanks!
> Jeffrey
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/