Thank you Minglei, I should describe my current flow and my requirement more clearly. 1. any data we collect have a send-time 2. when collect data, we also send another counter message, says we have collect 45 message whose send-time is 2018-07-02 10:02:00 3. data is sent to kafka(or other message system), and flink receives data from kafka and write to HDFS 4. when flink finished part of messages(neither .pending nor .inprogress, when "finish" it must be finished state that can be read by other system), we send another counter message, says we have processed 40 message whose send-time is 2018-07-02 10:02:00
What i have did in flink is : 1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint 2. I add another sink says CounterSink which counts message by send-time 2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true, I close current files and move them to pending state 3. in CounterSink.snapshotState I prepare to send the special counter message 4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move pending files to finish state, CounterSink.notifyCheckpointComplete will send the special counter message So in our counter-system, when the processed-message-counter is equal to the received-message-counter, it meas ETL can continue their jobs. The jira you submitted is not exactly what I want, however it will be great if we can figure out a common solution to this requirement, although I think it is difficult unless, as you said, we add some assumption like watermark. On the other side, I think watermark may be able to archived by use the combination of inactiveBucketThreashold and batchRolloverInterval already. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/