Hi Paul, I guess it depends on your use case whether the transactional sink "needs to cooperate with a trigger that fires right before each checkpoint". You could use a regular Trigger that fires every minute (in processing time) and configure checkpointing to an interval of 1 minute. In worst-case, you'll emit data that is 1 minute old (the trigger fires right after a checkpoint, the next checkpoint is triggered in one minute, and the sink writes when the checkpoint completed). You can tweak the fire and checkpoint intervals to achieve different latencies. Maybe this would already be sufficient for your use case.
A trigger that fires on checkpoints would improve the latency, but is not available yet. I think there might be value in such a trigger, but I'm not sure how complex the implementation would be. AFAIK, the current design does not allow to emit any data when an operator needs to checkpoint its data. Best, Fabian Am Di., 16. Okt. 2018 um 08:14 Uhr schrieb Paul Lam <paullin3...@gmail.com>: > Hi Fabian, > > Thanks for your reply! > > It takes me a while to reconsider the proposal, and I think you’re right. > The checkpoint hook would affect a lot of mechanisms we already have, and > it’s unworthy. > > > Wouldn't a transactional sink provide exactly the same guarantees? > > Yes, it is. But it needs to cooperate with a trigger that fires right > before each checkpoint, as you previously mentioned, and how should I > achieve that with the current API? > > Thank you very much! > > Best, > Paul Lam > > > 在 2018年10月15日,19:45,Fabian Hueske <fhue...@gmail.com> 写道: > > Hi Paul, > > I think this would be very tricky to implement and interfere with many > parts of the system like state backends, checkpointing logic, etc. > We would need to maintain a copy (or version) of the state at the time of > a checkpoint. There might be multiple checkpoints in flight. Checkpoints > might fail. We'd need to clean up the copies/versions. > Overall, I think this would be very complicated. > > Wouldn't a transactional sink provide exactly the same guarantees? > It would collect all results of the window operator and only apply them > when a checkpoint was successful. > In case of a failure, an open transaction is aborted and the non-committed > results are re-computed. > > Best, Fabian > > Am Mo., 15. Okt. 2018 um 13:29 Uhr schrieb Paul Lam <paullin3...@gmail.com > >: > >> Hi Fabian, >> >> Perhaps I didn’t explain that clearly. Actually I want a trigger to fire >> when a checkpoint is completed, and emit the intermediate results in >> consistency >> with the completed checkpoint. >> >> It works like this: >> 1) Once the window operator receives a barrier, it performs the snapshot >> as usual, and also makes a copy of the current aggregates. >> 2) When the checkpoint succeeds, the trigger gets a notification by >> checkpoint listener and emits the intermediate aggregates that was copied >> previously. >> >> It’s kind of similar to TwoPhaseCommitSinkFunction, but it’s used in a >> window operator instead of a sink. >> >> The original motivation is that we want to keep a mysql table in >> synchronization with the window aggregates, it can be done by firing the >> trigger periodically to the get the newest intermediate results that can >> used to update the external table. But neither timer nor queryable can >> provide read-committed isolation, which is intolerable in my case, so I >> suggest adding checkpoint hooks to the triggers to solve this problem. >> >> I think more cases that need to emit window aggregates periodically can >> leverage this feature, for timers and queryable states are too heavy to >> meet a simple need like this while providing a lower isolation level. >> >> Thanks a lot! >> >> Best, >> Paul Lam >> >> > 在 2018年10月15日,15:47,Fabian Hueske <fhue...@gmail.com> 写道: >> > >> > Hi Paul, >> > >> > If I got your proposal right, you'd like to fire a Trigger right before >> a >> > checkpoint is taken, correct? >> > So, before taking a checkpoint, a Trigger would fire and the operator >> would >> > process and emit some intermediate results. >> > >> > This approach would not completely solve the consistency issue because a >> > checkpoint might fail. >> > A better approach would be to use a transactional sink that is >> integrated >> > with the checkpointing mechanism and emits data only on successful >> > checkpoints. >> > Flink provides the TwoPhaseCommitSinkFunction (see blog post [1]) and >> one >> > implemention for an exactly-once Kafka sink. >> > >> > Best, >> > Fabian >> > >> > [1] >> > >> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html >> > >> > Am Mo., 15. Okt. 2018 um 04:52 Uhr schrieb Paul Lam < >> paullin3...@gmail.com>: >> > >> >> Hi, >> >> >> >> I’ve come across some scenarios that periodic emitting aggregates is >> >> needed in case of event time windows, and I think it’s good to have a >> >> checkpoint hook on triggers. >> >> >> >> Suppose we want a day metric, and the most intuitive way is to define >> a 1d >> >> event time window to calculate it. By default, the event time trigger >> fires >> >> and emit the final results when the watermark reaches the end of a >> day, but >> >> we hope to see the realtime(or near realtime) intermediate results >> also, so >> >> now we have several viable approaches I can think of: >> >> >> >> 1. Implement a custom trigger (FIRE_AND_PURGE at midnight, and FIRE >> >> periodically). We could register a processing time timer to fire the >> >> trigger in the trigger context, but it has some drawbacks. First, we >> can >> >> only access the trigger context in a method, and there it’s no some >> method >> >> like open(TriggerContext) which was called on initialization, so we >> have to >> >> register a timer in the onElement(..) method when it was called for the >> >> first time and it’s not elegant. Second, emitting result on processing >> time >> >> provides only read-uncommitted consistency, which is not enough in some >> >> scenarios. >> >> >> >> 2. Use queryable states and pull state updates from external systems. >> This >> >> requires changing the architecture to pull-based and the change would >> be >> >> too much. What’s more, the queryable state API is not stable yet. >> >> >> >> 3. Change the window to a smaller one (e.g. 1 min window) which emits >> >> incremental aggregates, and reduce the results in external systems. >> This >> >> falls back to a stateless streaming job, making the architecture >> complex >> >> and the consistency weak. >> >> >> >> So I suggest adding a checkpoint hook to the window triggers to enable >> >> emitting aggregates periodically with awareness of checkpointing, which >> >> solves the problems I mentioned in approach 1. >> >> >> >> Since this is a most common scenario, there should be lots of >> practices to >> >> get it done which I haven't figured out yet, but I think it still make >> >> sense to add such a method to the triggers for the consistency reason. >> >> >> >> Any suggestion is appreciated! Thanks a lot! >> >> >> >> Best, >> >> Paul Lam >> >> >> >> >> >> >> >> >> >> >