Hi Fabian,

Perhaps I didn’t explain that clearly. Actually I want a trigger to fire when a 
checkpoint is completed, and emit the intermediate results in consistency
with the completed checkpoint.

It works like this: 
1) Once the window operator receives a barrier, it performs the snapshot as 
usual, and also makes a copy of the current aggregates. 
2) When the checkpoint succeeds, the trigger gets a notification by checkpoint 
listener and emits the intermediate aggregates that was copied previously.

It’s kind of similar to TwoPhaseCommitSinkFunction, but it’s used in a window 
operator instead of a sink. 

The original motivation is that we want to keep a mysql table in 
synchronization with the window aggregates, it can be done by firing the 
trigger periodically to the get the newest intermediate results that can used 
to update the external table. But neither timer nor queryable can provide 
read-committed isolation, which is intolerable in my case, so I suggest adding 
checkpoint hooks to the triggers to solve this problem.

I think more cases that need to emit window aggregates periodically can 
leverage this feature, for timers and queryable states are too heavy to meet a 
simple need like this while providing a lower isolation level.

Thanks a lot!

Best,
Paul Lam

> 在 2018年10月15日,15:47,Fabian Hueske <fhue...@gmail.com> 写道:
> 
> Hi Paul,
> 
> If I got your proposal right, you'd like to fire a Trigger right before a
> checkpoint is taken, correct?
> So, before taking a checkpoint, a Trigger would fire and the operator would
> process and emit some intermediate results.
> 
> This approach would not completely solve the consistency issue because a
> checkpoint might fail.
> A better approach would be to use a transactional sink that is integrated
> with the checkpointing mechanism and emits data only on successful
> checkpoints.
> Flink provides the TwoPhaseCommitSinkFunction (see blog post [1]) and one
> implemention for an exactly-once Kafka sink.
> 
> Best,
> Fabian
> 
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
> 
> Am Mo., 15. Okt. 2018 um 04:52 Uhr schrieb Paul Lam <paullin3...@gmail.com>:
> 
>> Hi,
>> 
>> I’ve come across some scenarios that periodic emitting aggregates is
>> needed in case of event time windows, and I think it’s good to have a
>> checkpoint hook on triggers.
>> 
>> Suppose we want a day metric, and the most intuitive way is to define a 1d
>> event time window to calculate it. By default, the event time trigger fires
>> and emit the final results when the watermark reaches the end of a day, but
>> we hope to see the realtime(or near realtime) intermediate results also, so
>> now we have several viable approaches I can think of:
>> 
>> 1. Implement a custom trigger (FIRE_AND_PURGE at midnight, and FIRE
>> periodically). We could register a processing time timer to fire the
>> trigger in the trigger context, but it has some drawbacks. First, we can
>> only access the trigger context in a method, and there it’s no some method
>> like open(TriggerContext) which was called on initialization, so we have to
>> register a timer in the onElement(..) method when it was called for the
>> first time and it’s not elegant. Second, emitting result on processing time
>> provides only read-uncommitted consistency, which is not enough in some
>> scenarios.
>> 
>> 2. Use queryable states and pull state updates from external systems. This
>> requires changing the architecture to pull-based and the change would be
>> too much. What’s more, the queryable state API is not stable yet.
>> 
>> 3. Change the window to a smaller one (e.g. 1 min window) which emits
>> incremental aggregates, and reduce the results in external systems. This
>> falls back to a stateless streaming job, making the architecture complex
>> and the consistency weak.
>> 
>> So I suggest adding a checkpoint hook to the window triggers to enable
>> emitting aggregates periodically with awareness of checkpointing, which
>> solves the problems I mentioned in approach 1.
>> 
>> Since this is a most common scenario, there should be lots of practices to
>> get it done which I haven't figured out yet, but I think it still make
>> sense to add such a method to the triggers for the consistency reason.
>> 
>> Any suggestion is appreciated! Thanks a lot!
>> 
>> Best,
>> Paul Lam
>> 
>> 
>> 
>> 

Reply via email to