Thank you Minglei,

I should describe my current flow and my requirement more clearly.
1. any data we collect have a send-time
2. when collect data, we also send another counter message, says we have
collect 45 message whose send-time is 2018-07-02 10:02:00
3. data is sent to kafka(or other message system), and flink receives data
from kafka and write to HDFS
4. when flink finished part of messages(neither .pending nor .inprogress,
when "finish" it must be finished state that can be read by other system),
we send another counter message, says we have processed 40 message whose
send-time is  2018-07-02 10:02:00

What i have did in flink is :
1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
2. I add another sink says CounterSink which counts message by send-time
2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,  I
close current files and move them   to pending state
3. in CounterSink.snapshotState I prepare to send the special counter
message
4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move
pending files to finish state, CounterSink.notifyCheckpointComplete  will
send the special counter message

So in our counter-system, when the processed-message-counter is equal to the
received-message-counter, it meas ETL can continue their jobs.

The jira you submitted is not exactly what I want, however it will be great
if we can figure out a common solution to this requirement, although I think
it is difficult unless, as you said, we add some assumption like watermark.
On the other side, I think watermark may be able to archived by use the
combination of inactiveBucketThreashold and batchRolloverInterval already.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to