Re: Let BucketingSink roll file on each checkpoint

2018-06-28 Thread XilangYan
Hi Febian, Finally I have time to read the code, and it is brilliant it does provide exactly once guarantee。 However I still suggest to add the function that can close a file when checkpoint made. I noticed that there is an enhancement https://issues.apache.org/jira/browse/FLINK-9138 which can clo

Re: Let BucketingSink roll file on each checkpoint

2018-07-01 Thread XilangYan
Thank you Minglei, I should describe my current flow and my requirement more clearly. 1. any data we collect have a send-time 2. when collect data, we also send another counter message, says we have collect 45 message whose send-time is 2018-07-02 10:02:00 3. data is sent to kafka(or other message

Re: Let BucketingSink roll file on each checkpoint

2018-07-04 Thread XilangYan
Hi Fabian, We did need a consistent view of data, we need the Counter and HDFS file to be consistent. For example, when the Counter indicate there is 1000 message wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for read. The data we write to HDFS is collected by an Agent(whic

Re: Let BucketingSink roll file on each checkpoint

2018-07-08 Thread XilangYan
Hi Febian, With watermark, I understand it could only write those that are smaller than the received watermark, but could I know why it would also need to maintain a write ahead log of all received rows? When an event received, it just compare time with current watermark, write it to correct buck

Let BucketingSink roll file on each checkpoint

2018-03-19 Thread XilangYan
The behavior of BucketingSink is not exactly we want. If I understood correctly, when checkpoint requested, BucketingSink will flush writer to make sure data not loss, but will not close file, nor roll new file after checkpoint. In the case of HDFS, if file length is not updated to name node(throu

Let BucketingSink roll file on each checkpoint

2018-03-19 Thread XilangYan
The behavior of BucketingSink is not exactly we want. If I understood correctly, when checkpoint requested, BucketingSink will flush writer to make sure data not loss, but will not close file, nor roll new file after checkpoint. In the case of HDFS, if file length is not updated to name node(throu

Re: Let BucketingSink roll file on each checkpoint

2018-03-20 Thread XilangYan
Thank you! Fabian HDFS small file problem can be avoid with big checkpoint interval. Meanwhile, there is potential data lose problem in current BucketingSink. Say we consume data in kafka, when checkpoint is requested, kafka offset is update, but in-progress file in BucketingSink is remained. If

Re: Let BucketingSink roll file on each checkpoint

2018-03-22 Thread XilangYan
Ok, then may be I have misunderstanding about checkpoint. I thought flink use checkpoint to store offset, but when kafka connector making a checkpoint, it doesn't know whether data is in in-progress file or in pending-file so a whole offset is saved in checkpoint. I used to guess, the data in in-p