Hi Shammon, Do I understand it correctly, that you effectively want to expand the checkpoint alignment mechanism across many different jobs and hand over checkpoint barriers from upstream to downstream jobs using the intermediate tables?
Re the watermarks for the "Rejected Alternatives". I don't understand why this has been rejected. Could you elaborate on this point? Here are a couple of my thoughts on this matter, but please correct me if I'm wrong, as I haven't dived deeper into this topic. > As shown above, there are 2 watermarks T1 and T2, T1 < T2. > The StreamTask reads data in order: V11,V12,V21,T1(channel1),V13,T1(channel2). > At this time, StreamTask will confirm that watermark T1 is completed, but the data beyond > T1 has been processed(V13) and the results are written to the sink table. 1. I see the same "problem" with unaligned checkpoints in your current proposal. 2. I don't understand why this is a problem? Just store in the "sink table" what's the watermark (T1), and downstream jobs should process the data with that "watermark" anyway. Record "V13" should be treated as "early" data. Downstream jobs if: a) they are streaming jobs, for example they should aggregate it in windowed/temporal state, but they shouldn't produce the result that contains it, as the watermark T2 was not yet processed. Or they would just pass that record as "early" data. b) they are batch jobs, it looks to me like batch jobs shouldn't take "all available data", but only consider "all the data until some watermark", for example the latest available: T1 3. I'm pretty sure there are counter examples, where your proposed mechanism of using checkpoints (even aligned!) will produce inconsistent data from the perspective of the event time. a) For example what if one of your "ETL" jobs, has the following DAG: [image: flip276.jpg] Even if you use aligned checkpoints for committing the data to the sink table, the watermarks of "Window1" and "Window2" are completely independent. The sink table might easily have data from the Src1/Window1 from the event time T1 and Src2/Window2 from later event time T2. b) I think the same applies if you have two completely independent ETL jobs writing either to the same sink table, or two to different sink tables (that are both later used in the same downstream job). 4a) I'm not sure if I like the idea of centralising the whole system in this way. If you have 10 jobs, the likelihood of the checkpoint failure will be 10 times higher, and/or the duration of the checkpoint can be much much longer (especially under backpressure). And this is actually already a limitation of Apache Flink (global checkpoints are more prone to fail the larger the scale), so I would be anxious about making it potentially even a larger issue. 4b) I'm also worried about increased complexity of the system after adding the global checkpoint, and additional (single?) point of failure. 5. Such a design would also not work if we ever wanted to have task local checkpoints. All in all, it seems to me like actually the watermarks and even time are the better concept in this context that should have been used for synchronising and data consistency across the whole system. Best, Piotrek czw., 1 gru 2022 o 11:50 Shammon FY <zjur...@gmail.com> napisał(a): > Hi @Martijn > > Thanks for your comments, and I'd like to reply to them > > 1. It sounds good to me, I'll update the content structure in FLIP later > and give the problems first. > > 2. "Each ETL job creates snapshots with checkpoint info on sink tables in > Table Store" -> That reads like you're proposing that snapshots need to be > written to Table Store? > > Yes. To support the data consistency in the FLIP, we need to get through > checkpoints in Flink and snapshots in store, this requires a close > combination of Flink and store implementation. In the first stage we plan > to implement it based on Flink and Table Store only, snapshots written to > external storage don't support consistency. > > 3. If you introduce a MetaService, it becomes the single point of failure > because it coordinates everything. But I can't find anything in the FLIP on > making the MetaService high available or how to deal with failovers there. > > I think you raise a very important problem and I missed it in FLIP. The > MetaService is a single point and should support failover, we will do it in > future in the first stage we only support standalone mode, THX > > 4. The FLIP states under Rejected Alternatives "Currently watermark in > Flink cannot align data." which is not true, given that there is FLIP-182 > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > Watermark alignment in FLIP-182 is different from requirements "watermark > align data" in our FLIP. FLIP-182 aims to fix watermark generation in > different sources for "slight imbalance or data skew", which means in some > cases the source must generate watermark even if they should not. When the > operator collects watermarks, the data processing is as described in our > FLIP, and the data cannot be aligned through the barrier like Checkpoint. > > 5. Given the MetaService role, it feels like this is introducing a tight > dependency between Flink and the Table Store. How pluggable is this > solution, given the changes that need to be made to Flink in order to > support this? > > This is a good question, and I will try to expand it. Most of the work will > be completed in the Table Store, such as the new SplitEnumerator and Source > implementation. The changes in Flink are as followed: > 1) Flink job should put its job id in context when creating source/sink to > help MetaService to create relationship between source and sink tables, > it's tiny > 2) Notify a listener when job is terminated in Flink, and the listener > implementation in Table Store will send "delete event" to MetaService. > 3) The changes are related to Flink Checkpoint includes > a) Support triggering checkpoint with checkpoint id by SplitEnumerator > b) Create the SplitEnumerator in Table Store with a strategy to perform > the specific checkpoint when all "SplitEnumerator"s in the job manager > trigger it. > > > Best, > Shammon > > > On Thu, Dec 1, 2022 at 3:43 PM Martijn Visser <martijnvis...@apache.org> > wrote: > > > Hi all, > > > > A couple of first comments on this: > > 1. I'm missing the problem statement in the overall introduction. It > > immediately goes into proposal mode, I would like to first read what is > the > > actual problem, before diving into solutions. > > 2. "Each ETL job creates snapshots with checkpoint info on sink tables in > > Table Store" -> That reads like you're proposing that snapshots need to > be > > written to Table Store? > > 3. If you introduce a MetaService, it becomes the single point of failure > > because it coordinates everything. But I can't find anything in the FLIP > on > > making the MetaService high available or how to deal with failovers > there. > > 4. The FLIP states under Rejected Alternatives "Currently watermark in > > Flink cannot align data." which is not true, given that there is FLIP-182 > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > > > 5. Given the MetaService role, it feels like this is introducing a tight > > dependency between Flink and the Table Store. How pluggable is this > > solution, given the changes that need to be made to Flink in order to > > support this? > > > > Best regards, > > > > Martijn > > > > > > On Thu, Dec 1, 2022 at 4:49 AM Shammon FY <zjur...@gmail.com> wrote: > > > > > Hi devs: > > > > > > I'd like to start a discussion about FLIP-276: Data Consistency of > > > Streaming and Batch ETL in Flink and Table Store[1]. In the whole data > > > stream processing, there are consistency problems such as how to manage > > the > > > dependencies of multiple jobs and tables, how to define and handle E2E > > > delays, and how to ensure the data consistency of queries on flowing > > data? > > > This FLIP aims to support data consistency and answer these questions. > > > > > > I'v discussed the details of this FLIP with @Jingsong Lee and > @libenchao > > > offline several times. We hope to support data consistency of queries > on > > > tables, managing relationships between Flink jobs and tables and > revising > > > tables on streaming in Flink and Table Store to improve the whole data > > > stream processing. > > > > > > Looking forward to your feedback. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store > > > > > > > > > Best, > > > Shammon > > > > > >