Hi Shammon,
Thank you for opening this FLIP which is very interesting and such an important feature to add to the Flink ecosystem. I have a couple of suggestions/questions: - Consistency is a very broad term with different meanings. There are many variations between the two extremes of weak and strong consistency that tradeoff latency for consistency. https://jepsen.io/consistency It would be great if we could devise an approach that allows the user to choose which consistency level they want to use for a query. Example: In your figure where you have a DAG, assume a user queries only Table1 for a specific key. Then, a failure happens and the table restores from a checkpoint. The user issues the same query, looking up the same key. What value does she see? With monotonic-reads, the system guarantees that she will only see the same or newer values but not older, hence will not experience time-travel. This is a very useful property for a system to have albeit it is at the weaker-end of consistency guarantees. But it is a good stepping stone. Another example, assume the user queries Table1 for key K1 and gets the value V11. Then, she queries Table2 that is derived from Table1 for the same key, K1, that returns value V21. What is the relationship between V21 and V11? Is V21 derived from V11 or can it be an older value V1 (the previous value of K1)? What if value V21 is not yet in table Table2? What should she see when she queries Table1? Should she see the key V11 or not? Should the requirement be that a record is not visible in any of the tables in a DAG unless it is available in all of them? - It would we good to have a set of examples with consistency anomalies that can happen (like the examples above) and what consistency levels we want the system to offer to prevent them. Moreover, for each such example, it would be good to have a description of how the approach (Timestamp Barriers) will work in practice to prevent such anomalies. Thank you, Vicky On Fri, Jan 27, 2023 at 4:46 PM John Roesler <vvcep...@apache.org> wrote: > Hello Shammon and all, > > Thanks for this FLIP! I've been working toward this kind of global > consistency across large scale data infrastructure for a long time, and > it's fantastic to see a high-profile effort like this come into play. > > I have been lurking in the discussion for a while and delaying my response > while I collected my thoughts. However, I've realized at some point, > delaying more is not as useful as just asking a few questions, so I'm sorry > if some of this seems beside the point. I'll number these to not collide > with prior discussion points: > > 10. Have you considered proposing a general consistency mechanism instead > of restricting it to TableStore+ETL graphs? For example, it seems to me to > be possible and valuable to define instead the contract that sources/sinks > need to implement in order to participate in globally consistent snapshots. > > 11. It seems like this design is assuming that the "ETL Topology" under > the envelope of the consistency model is a well-ordered set of jobs, but I > suspect this is not the case for many organizations. It may be > aspirational, but I think the gold-standard here would be to provide an > entire organization with a consistency model spanning a loosely coupled > ecosystem of jobs and data flows spanning teams and systems that are > organizationally far apart. > > I realize that may be kind of abstract. Here's some examples of what's on > my mind here: > > 11a. Engineering may operate one Flink cluster, and some other org, like > Finance may operate another. In most cases, those are separate domains that > don't typically get mixed together in jobs, but some people, like the CEO, > would still benefit from being able to make a consistent query that spans > arbitrary contexts within the business. How well can a feature like this > transcend a single Flink infrastructure? Does it make sense to consider a > model in which snapshots from different domains can be composable? > > 11b. Some groups may have a relatively stable set of long-running jobs, > while others (like data science, skunkworks, etc) may adopt a more > experimental, iterative approach with lots of jobs entering and exiting the > ecosystem over time. It's still valuable to have them participate in the > consistency model, but it seems like the consistency system will have to > deal with more chaos than I see in the design. For example, how can this > feature tolerate things like zombie jobs (which are registered in the > system, but fail to check in for a long time, and then come back later). > > 12. I didn't see any statements about patterns like cycles in the ETL > Topology. I'm aware that there are fundamental constraints on how well > cyclic topologies can be supported by a distributed snapshot algorithm. > However, there are a range of approaches/compromises that we can apply to > cyclic topologies. At the very least, we can state that we will detect > cycles and produce a warning, etc. > > 13. I'm not sure how heavily you're waiting the query syntax part of the > proposal, so please feel free to defer this point. It looked to me like the > proposal assumes people want to query either the latest consistent snapshot > or the latest inconsistent state. However, it seems like there's a > significant opportunity to maintain a manifest of historical snapshots and > allow people to query as of old points in time. That can be valuable for > individuals answering data questions, building products, and crucially > supporting auditability use cases. To that latter point, it seems nice to > provide not only a mechanism to query arbitrary snapshots, but also to > define a TTL/GC model that allows users to keep hourly snapshots for N > hours, daily snapshots for N days, weekly snapshots for N weeks, and the > same for monthly, quarterly, and yearly snapshots. > > Ok, that's all I have for now :) I'd also like to understand some > lower-level details, but I wanted to get these high-level questions off my > chest. > > Thanks again for the FLIP! > -John > > On 2023/01/13 11:43:28 Shammon FY wrote: > > Hi Piotr, > > > > I discussed with @jinsong lee about `Timestamp Barrier` and `Aligned > > Checkpoint` for data consistency in FLIP, we think there are many defects > > indeed in using `Aligned Checkpoint` to support data consistency as you > > mentioned. > > > > According to our historical discussion, I think we have reached an > > agreement on an important point: we finally need `Timestamp Barrier > > Mechanism` to support data consistency. But according to our (@jinsong > lee > > and I) opinions, the total design and implementation based on 'Timestamp > > Barrier' will be too complex, and it's also too big in one FLIP. > > > > So we‘d like to use FLIP-276[1] as an overview design of data consistency > > in Flink Streaming and Batch ETL based on `Timestamp Barrier`. @jinsong > and > > I hope that we can reach an agreement on the overall design in FLINK-276 > > first, and then on the basic of FLIP-276 we can create other FLIPs with > > detailed design according to modules and drive them. Finally, we can > > support data consistency based on Timestamp in Flink. > > > > I have updated FLIP-276, deleted the Checkpoint section, and added the > > overall design of `Timestamp Barrier`. Here I briefly describe the > modules > > of `Timestamp Barrier` as follows > > 1. Generation: JobManager must coordinate all source subtasks and > generate > > a unified timestamp barrier from System Time or Event Time for them > > 2. Checkpoint: Store <checkpoint, timestamp barrier> when the timestamp > > barrier is generated, so that the job can recover the same timestamp > > barrier for the uncompleted checkpoint. > > 3. Replay data: Store <timestamp barrier, offset> for source when it > > broadcasts timestamp barrier, so that the source can replay the same data > > according to the same timestamp barrier. > > 4. Align data: Align data for stateful operator(aggregation, join and > etc.) > > and temporal operator(window) > > 5. Computation: Operator computation for a specific timestamp barrier > based > > on the results of a previous timestamp barrier. > > 6. Output: Operator outputs or commits results when it collects all the > > timestamp barriers, including operators with data buffer or async > > operations. > > > > I also list the main work in Flink and Table Store in FLIP-276. Please > help > > to review the FLIP when you're free and feel free to give any comments. > > > > Looking forward for your feedback, THX > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store > > > > Best, > > Shammon > > > > > > On Tue, Dec 20, 2022 at 10:01 AM Shammon FY <zjur...@gmail.com> wrote: > > > > > Hi Piotr, > > > > > > Thanks for your syncing. I will update the FLIP later and keep this > > > discussion open. Looking forward to your feedback, thanks > > > > > > > > > Best, > > > Shammon > > > > > > > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski <pnowoj...@apache.org> > > > wrote: > > > > > >> Hi Shammon, > > >> > > >> I've tried to sync with Timo, David Moravek and Dawid Wysakowicz about > > >> this > > >> subject. We have only briefly chatted and exchanged some > thoughts/ideas, > > >> but unfortunately we were not able to finish the discussions before > the > > >> holiday season/vacations. Can we get back to this topic in January? > > >> > > >> Best, > > >> Piotrek > > >> > > >> pt., 16 gru 2022 o 10:53 Shammon FY <zjur...@gmail.com> napisał(a): > > >> > > >> > Hi Piotr, > > >> > > > >> > I found there may be several points in our discussion, it will cause > > >> > misunderstanding between us when we focus on different one. I list > each > > >> > point in our discussion as follows > > >> > > > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to guarantee > data > > >> > consistency in the current Flink implementation, and "Watermark" and > > >> > "Aligned Checkpoint cannot do that? > > >> > My answer is "Yes", the "Aligned Checkpoint" is the only one due to > its > > >> > "Align Data" ability, we can do it in the first stage. > > >> > > > >> > > Point2: Can the combination of "Checkpoint Barrier" and > "Watermark" > > >> > support the complete consistency semantics based on "Timestamp" in > the > > >> > current Flink implementation? > > >> > My answer is "No", we need a new "Timestamp Barrier" mechanism to do > > >> that > > >> > which may be upgraded from current "Watermark" or a new mechanism, > we > > >> can > > >> > do it in the next second or third stage. > > >> > > > >> > > Point3: Are the "Checkpoint" and the new "Timestamp Barrier" > > >> completely > > >> > independent? The "Checkpoint" whatever "Aligned" or "Unaligned" or > "Task > > >> > Local" supports the "Exactly-Once" between ETLs, and the "Timestamp > > >> > Barrier" mechanism guarantees data consistency between tables > according > > >> to > > >> > timestamp for queries. > > >> > My answer is "Yes", I totally agree with you. Let "Checkpoint" be > > >> > responsible for fault tolerance and "Timestamp Barrier" for > consistency > > >> > independently. > > >> > > > >> > @Piotr, What do you think? If I am missing or misunderstanding > anything, > > >> > please correct me, thanks > > >> > > > >> > Best, > > >> > Shammon > > >> > > > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski < > pnowoj...@apache.org> > > >> > wrote: > > >> > > > >> > > Hi Shammon, > > >> > > > > >> > > > I don't think we can combine watermarks and checkpoint barriers > > >> > together > > >> > > to > > >> > > > guarantee data consistency. There will be a "Timestamp Barrier" > in > > >> our > > >> > > > system to "commit data", "single etl failover", "low latency > between > > >> > > ETLs" > > >> > > > and "strong data consistency with completed semantics" in the > end. > > >> > > > > >> > > Why do you think so? I've described to you above an alternative > where > > >> we > > >> > > could be using watermarks for data consistency, regardless of what > > >> > > checkpointing/fault tolerance mechanism Flink would be using. Can > you > > >> > > explain what's wrong with that approach? Let me rephrase it: > > >> > > > > >> > > 1. There is an independent mechanism that provides exactly-once > > >> > guarantees, > > >> > > committing records/watermarks/events and taking care of the > failover. > > >> It > > >> > > might be aligned, unaligned or task local checkpointing - this > doesn't > > >> > > matter. Let's just assume we have such a mechanism. > > >> > > 2. There is a watermarking mechanism (it can be some kind of > system > > >> > > versioning re-using watermarks code path if a user didn't > configure > > >> > > watermarks), that takes care of the data consistency. > > >> > > > > >> > > Because watermarks from 2. are also subject to the exactly-once > > >> > guarantees > > >> > > from the 1., once they are committed downstream systems (Flink > jobs or > > >> > > other 3rd party systems) could just easily work with the committed > > >> > > watermarks to provide consistent view/snapshot of the tables. Any > > >> > > downstream system could always check what are the committed > > >> watermarks, > > >> > > select the watermark value (for example min across all used > tables), > > >> and > > >> > > ask every table: please give me all of the data up until the > selected > > >> > > watermark. Or give me all tables in the version for the selected > > >> > watermark. > > >> > > > > >> > > Am I missing something? To me it seems like this way we can fully > > >> > decouple > > >> > > the fault tolerance mechanism from the subject of the data > > >> consistency. > > >> > > > > >> > > Best, > > >> > > Piotrek > > >> > > > > >> > > czw., 15 gru 2022 o 13:01 Shammon FY <zjur...@gmail.com> > napisał(a): > > >> > > > > >> > > > Hi Piotr, > > >> > > > > > >> > > > It's kind of amazing about the image, it's a simple example and > I > > >> have > > >> > to > > >> > > > put it in a document > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink > > >> > > > :) > > >> > > > > > >> > > > > Does it have to be combining watermarks and checkpoint > barriers > > >> > > together? > > >> > > > > > >> > > > It's an interesting question. As we discussed above, what we > need > > >> from > > >> > > > "Checkpoint" is the "Align Data Ability", and from "Watermark" > is > > >> the > > >> > > > "Consistency Semantics", > > >> > > > > > >> > > > 1) Only "Align Data" can reach data consistency when performing > > >> queries > > >> > > on > > >> > > > upstream and downstream tables. I gave an example of "Global > Count > > >> > > Tables" > > >> > > > in our previous discussion. We need a "Align Event" in the > streaming > > >> > > > processing, it's the most basic. > > >> > > > > > >> > > > 2) Only "Timestamp" can provide complete consistency semantics. > You > > >> > gave > > >> > > > some good examples about "Window" and ect operators. > > >> > > > > > >> > > > I don't think we can combine watermarks and checkpoint barriers > > >> > together > > >> > > to > > >> > > > guarantee data consistency. There will be a "Timestamp Barrier" > in > > >> our > > >> > > > system to "commit data", "single etl failover", "low latency > between > > >> > > ETLs" > > >> > > > and "strong data consistency with completed semantics" in the > end. > > >> > > > > > >> > > > At the beginning I think we can do the simplest thing first: > > >> guarantee > > >> > > the > > >> > > > basic data consistency with a "Barrier Mechanism". In the > current > > >> Flink > > >> > > > there's "Aligned Checkpoint" only, that's why we choose > > >> "Checkpoint" in > > >> > > our > > >> > > > FLIP. > > >> > > > > > >> > > > > I don't see an actual connection in the the implementation > steps > > >> > > between > > >> > > > the checkpoint barriers approach and the watermark-like approach > > >> > > > > > >> > > > As I mentioned above, we choose "Checkpoint" to guarantee the > basic > > >> > data > > >> > > > consistency. But as we discussed, the most ideal solution is > > >> "Timestamp > > >> > > > Barrier". After the first stage is completed based on the > > >> "Checkpoint", > > >> > > we > > >> > > > need to evolve it to our ideal solution "Timestamp Barrier" > > >> > > (watermark-like > > >> > > > approach) in the next second or third stage. This does not mean > > >> > upgrading > > >> > > > "Checkpoint Mechanism" in Flink. It means that after we > implement a > > >> new > > >> > > > "Timestamp Barrier" or upgrade "Watermark" to support it, we can > > >> use it > > >> > > > instead of the current "Checkpoint Mechanism" directly in our > > >> > > "MetaService" > > >> > > > and "Table Store". > > >> > > > > > >> > > > In the discussion between @David and me, I summarized the work > of > > >> > > upgrading > > >> > > > "Watermark" to support "Timestamp Barrier". It looks like a big > job > > >> and > > >> > > you > > >> > > > can find the details in our discussion. I think we don't need > to do > > >> > that > > >> > > in > > >> > > > our first stage. > > >> > > > > > >> > > > Also in that discussion (my reply to @David) too, I briefly > > >> summarized > > >> > > the > > >> > > > work that needs to be done to use the new mechanism (Timestamp > > >> Barrier) > > >> > > > after we implement the basic function on "Checkpoint". It seems > that > > >> > the > > >> > > > work is not too big on my side, and it is feasible on the whole. > > >> > > > > > >> > > > Based on the above points, I think we can support basic data > > >> > consistency > > >> > > on > > >> > > > "Checkpoint" in the first stage which is described in FLIP, and > > >> > continue > > >> > > to > > >> > > > evolve it to "Timestamp Barrier" to support low latency between > ETLs > > >> > and > > >> > > > completed semantics in the second or third stage later. What > do you > > >> > > think? > > >> > > > > > >> > > > Best, > > >> > > > Shammon > > >> > > > > > >> > > > > > >> > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski < > > >> pnowoj...@apache.org> > > >> > > > wrote: > > >> > > > > > >> > > > > Hi Shammon, > > >> > > > > > > >> > > > > > The following is a simple example. Data is transferred > between > > >> > ETL1, > > >> > > > ETL2 > > >> > > > > and ETL3 in Intermediate Table by Timestamp. > > >> > > > > > [image: simple_example.jpg] > > >> > > > > > > >> > > > > This time it's your image that doesn't want to load :) > > >> > > > > > > >> > > > > > Timestamp Barrier > > >> > > > > > > >> > > > > Does it have to be combining watermarks and checkpoint > barriers > > >> > > together? > > >> > > > > Can we not achieve the same result with two independent > processes > > >> > > > > checkpointing (regardless if this is a global > aligned/unaligned > > >> > > > checkpoint, > > >> > > > > or a task local checkpoint) plus watermarking? Checkpointing > would > > >> > > > provide > > >> > > > > exactly-once guarantees, and actually committing the results, > and > > >> it > > >> > > > would > > >> > > > > be actually committing the last emitted watermark? From the > > >> > perspective > > >> > > > of > > >> > > > > the sink/table, it shouldn't really matter how the > exactly-once is > > >> > > > > achieved, and whether the job has performed an unaligned > > >> checkpoint > > >> > or > > >> > > > > something completely different. It seems to me that the > sink/table > > >> > > > > could/should be able to understand/work with only the basic > > >> > > information: > > >> > > > > here are records and watermarks (with at that point of time > > >> already > > >> > > fixed > > >> > > > > order), they are committed and will never change. > > >> > > > > > > >> > > > > > However, from the perspective of implementation complexity, > I > > >> > > > personally > > >> > > > > think using Checkpoint in the first phase makes sense, what > do you > > >> > > think? > > >> > > > > > > >> > > > > Maybe I'm missing something, but I don't see an actual > connection > > >> in > > >> > > the > > >> > > > > implementation steps between the checkpoint barriers approach > and > > >> the > > >> > > > > watermark-like approach. They seem to me (from the > perspective of > > >> > Flink > > >> > > > > runtime at least) like two completely different mechanisms. > Not > > >> one > > >> > > > leading > > >> > > > > to the other. > > >> > > > > > > >> > > > > Best, > > >> > > > > Piotrek > > >> > > > > > > >> > > > > > > >> > > > > śr., 14 gru 2022 o 15:19 Shammon FY <zjur...@gmail.com> > > >> napisał(a): > > >> > > > > > > >> > > > > > Hi Piotr, > > >> > > > > > > > >> > > > > > Thanks for your valuable input which makes me consider the > core > > >> > point > > >> > > > of > > >> > > > > > data consistency in deep. I'd like to define the data > > >> consistency > > >> > on > > >> > > > the > > >> > > > > > whole streaming & batch processing as follows and I hope > that we > > >> > can > > >> > > > have > > >> > > > > > an agreement on it: > > >> > > > > > > > >> > > > > > BOutput = Fn(BInput), BInput is a bounded input which is > > >> splitted > > >> > > from > > >> > > > > > unbounded streaming, Fn is the computation of a node or ETL, > > >> > BOutput > > >> > > is > > >> > > > > the > > >> > > > > > bounded output of BInput. All the data in BInput and > BOutput are > > >> > > > > unordered, > > >> > > > > > and BInput and BOutput are data consistent. > > >> > > > > > > > >> > > > > > The key points above include 1) the segment semantics of > > >> BInput; 2) > > >> > > the > > >> > > > > > computation semantics of Fn > > >> > > > > > > > >> > > > > > 1. The segment semantics of BInput > > >> > > > > > a) Transactionality of data. It is necessary to ensure the > > >> semantic > > >> > > > > > transaction of the bounded data set when it is splitted > from the > > >> > > > > unbounded > > >> > > > > > streaming. For example, we cannot split multiple records in > one > > >> > > > > transaction > > >> > > > > > to different bounded data sets. > > >> > > > > > b) Timeliness of data. Some data is related with time, such > as > > >> > > boundary > > >> > > > > > data for a window. It is necessary to consider whether the > > >> bounded > > >> > > data > > >> > > > > set > > >> > > > > > needs to include a watermark which can trigger the window > > >> result. > > >> > > > > > c) Constraints of data. The Timestamp Barrier should perform > > >> some > > >> > > > > specific > > >> > > > > > operations after computation in operators, for example, > force > > >> flush > > >> > > > data. > > >> > > > > > > > >> > > > > > Checkpoint Barrier misses all the semantics above, and we > should > > >> > > > support > > >> > > > > > user to define Timestamp for data on Event Time or System > Time > > >> > > > according > > >> > > > > to > > >> > > > > > the job and computation later. > > >> > > > > > > > >> > > > > > 2. The computation semantics of Fn > > >> > > > > > a) Deterministic computation > > >> > > > > > Most computations are deterministic such as map, filter, > count, > > >> sum > > >> > > and > > >> > > > > > ect. They generate the same unordered result from the same > > >> > unordered > > >> > > > > input > > >> > > > > > every time, and we can easily define data consistency on the > > >> input > > >> > > and > > >> > > > > > output for them. > > >> > > > > > > > >> > > > > > b) Non-deterministic computation > > >> > > > > > Some computations are non-deterministic. They will produce > > >> > different > > >> > > > > > results from the same input every time. I try to divide them > > >> into > > >> > the > > >> > > > > > following types: > > >> > > > > > 1) Non-deterministic computation semantics, such as rank > > >> operator. > > >> > > When > > >> > > > > it > > >> > > > > > computes multiple times (for example, failover), the first > or > > >> last > > >> > > > output > > >> > > > > > results can both be the final result which will cause > different > > >> > > > failover > > >> > > > > > handlers for downstream jobs. I will expand it later. > > >> > > > > > 2) Non-deterministic computation optimization, such as async > > >> io. It > > >> > > is > > >> > > > > > necessary to sync these operations when the barrier of input > > >> > arrives. > > >> > > > > > 3) Deviation caused by data segmentat and computation > semantics, > > >> > such > > >> > > > as > > >> > > > > > Window. This requires that the users should customize the > data > > >> > > > > segmentation > > >> > > > > > according to their needs correctly. > > >> > > > > > > > >> > > > > > Checkpoint Barrier matches a) and Timestamp Barrier can > match > > >> all > > >> > a) > > >> > > > and > > >> > > > > > b). > > >> > > > > > > > >> > > > > > We define data consistency of BInput and BOutput based all > > >> above. > > >> > The > > >> > > > > > BOutput of upstream ETL will be the BInput of the next ETL, > and > > >> > > > multiple > > >> > > > > > ETL jobs form a complex "ETL Topology". > > >> > > > > > > > >> > > > > > Based on the above definitions, I'd like to give a general > > >> proposal > > >> > > > with > > >> > > > > > "Timetamp Barrier" in my mind, it's not very detailed and > please > > >> > help > > >> > > > to > > >> > > > > > review it and feel free to comment @David, @Piotr > > >> > > > > > > > >> > > > > > 1. Data segment with Timestamp > > >> > > > > > a) Users can define the Timestamp Barrier with System Time, > > >> Event > > >> > > Time. > > >> > > > > > b) Source nodes generate the same Timestamp Barrier after > > >> reading > > >> > > data > > >> > > > > > from RootTable > > >> > > > > > c) There is a same Timetamp data in each record according to > > >> > > Timestamp > > >> > > > > > Barrier, such as (a, T), (b, T), (c, T), (T, barrier) > > >> > > > > > > > >> > > > > > 2. Computation with Timestamp > > >> > > > > > a) Records are unordered with the same Timestamp. Stateless > > >> > operators > > >> > > > > such > > >> > > > > > as map/flatmap/filter can process data without aligning > > >> Timestamp > > >> > > > > Barrier, > > >> > > > > > which is different from Checkpoint Barrier. > > >> > > > > > b) Records between Timestamp are ordered. Stateful operators > > >> must > > >> > > align > > >> > > > > > data and compute by each Timestamp, then compute by Timetamp > > >> > > sequence. > > >> > > > > > c) Stateful operators will output results of specific > Timestamp > > >> > after > > >> > > > > > computation. > > >> > > > > > d) Sink operator "commit records" with specific Timestamp > and > > >> > report > > >> > > > the > > >> > > > > > status to JobManager > > >> > > > > > > > >> > > > > > 3. Read data with Timestamp > > >> > > > > > a) Downstream ETL reads data according to Timestamp after > > >> upstream > > >> > > ETL > > >> > > > > > "commit" it. > > >> > > > > > b) Stateful operators interact with state when computing > data of > > >> > > > > > Timestamp, but they won't trigger checkpoint for every > > >> Timestamp. > > >> > > > > Therefore > > >> > > > > > source ETL job can generate Timestamp every few seconds or > even > > >> > > > hundreds > > >> > > > > of > > >> > > > > > milliseconds > > >> > > > > > c) Based on Timestamp the delay between ETL jobs will be > very > > >> > small, > > >> > > > and > > >> > > > > > in the best case the E2E latency maybe only tens of seconds. > > >> > > > > > > > >> > > > > > 4. Failover and Recovery > > >> > > > > > ETL jobs are cascaded through the Intermediate Table. After > a > > >> > single > > >> > > > ETL > > >> > > > > > job fails, it needs to replay the input data and recompute > the > > >> > > results. > > >> > > > > As > > >> > > > > > you mentioned, whether the cascaded ETL jobs are restarted > > >> depends > > >> > on > > >> > > > the > > >> > > > > > determinacy of the intermediate data between them. > > >> > > > > > a) An ETL job will rollback and reread data from upstream > ETL by > > >> > > > specific > > >> > > > > > Timestamp according to the Checkpoint. > > >> > > > > > b) According to the management of Checkpoint and Timestamp, > ETL > > >> can > > >> > > > > replay > > >> > > > > > all Timestamp and data after failover, which means BInput > is the > > >> > same > > >> > > > > > before and after failover. > > >> > > > > > > > >> > > > > > c) For deterministic Fn, it generates the same BOutput from > the > > >> > same > > >> > > > > BInput > > >> > > > > > 1) If there's no data of the specific Timestamp in the sink > > >> table, > > >> > > ETL > > >> > > > > > just "commit" it as normal. > > >> > > > > > 2) If the Timestamp data exists in the sink table, ETL can > just > > >> > > discard > > >> > > > > > the new data. > > >> > > > > > > > >> > > > > > d) For non-deterministic Fn, it generates different BOutput > from > > >> > the > > >> > > > same > > >> > > > > > BInput before and after failover. For example, BOutput1 > before > > >> > > failover > > >> > > > > and > > >> > > > > > BOutput2 after failover. The state in ETL is consistent with > > >> > > BOutput2. > > >> > > > > > There are two cases according to users' requirements > > >> > > > > > 1) Users can accept BOutput1 as the final output and > downstream > > >> > ETLs > > >> > > > > don't > > >> > > > > > need to restart. Sink in ETL can discard BOutput2 directly > if > > >> the > > >> > > > > Timestamp > > >> > > > > > exists in the sink table. > > >> > > > > > 2) Users only accept BOutput2 as the final output, then all > the > > >> > > > > downstream > > >> > > > > > ETLs and Intermediate Table should rollback to specific > > >> Timestamp, > > >> > > the > > >> > > > > > downstream ETLs should be restarted too. > > >> > > > > > > > >> > > > > > The following is a simple example. Data is transferred > between > > >> > ETL1, > > >> > > > ETL2 > > >> > > > > > and ETL3 in Intermediate Table by Timestamp. > > >> > > > > > [image: simple_example.jpg] > > >> > > > > > > > >> > > > > > Besides Timestamp, there's a big challenge in Intermediate > > >> Table. > > >> > It > > >> > > > > > should support a highly implemented "commit Timestamp > snapshot" > > >> > with > > >> > > > high > > >> > > > > > throughput, which requires the Table Store to enhance > streaming > > >> > > > > > capabilities like pulsar or kafka. > > >> > > > > > > > >> > > > > > In this FLIP, we plan to implement the proposal with > Checkpoint, > > >> > the > > >> > > > > above > > >> > > > > > Timestamp can be replaced by Checkpoint. Of course, > Checkpoint > > >> has > > >> > > some > > >> > > > > > problems. I think we have reached some consensus in the > > >> discussion > > >> > > > about > > >> > > > > > the Checkpoint problems, including data segment semantics, > flush > > >> > data > > >> > > > of > > >> > > > > > some operators, and the increase of E2E delay. However, > from the > > >> > > > > > perspective of implementation complexity, I personally think > > >> using > > >> > > > > > Checkpoint in the first phase makes sense, what do you > think? > > >> > > > > > > > >> > > > > > Finally, I think I misunderstood the "Rolling Checkpoint" > and > > >> "All > > >> > at > > >> > > > > once > > >> > > > > > Checkpoint" in my last explanation which you and @David > > >> mentioned. > > >> > I > > >> > > > > > thought their differences were mainly to select different > table > > >> > > > versions > > >> > > > > > for queries. According to your reply, I think it is whether > > >> there > > >> > are > > >> > > > > > multiple "rolling checkpoints" in each ETL job, right? If I > > >> > > understand > > >> > > > > > correctly, the "Rolling Checkpoint" is a good idea, and we > can > > >> > > > guarantee > > >> > > > > > "Strong Data Consistency" between multiple tables in > MetaService > > >> > for > > >> > > > > > queries. Thanks. > > >> > > > > > > > >> > > > > > Best, > > >> > > > > > Shammon > > >> > > > > > > > >> > > > > > > > >> > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski < > > >> > pnowoj...@apache.org > > >> > > > > > >> > > > > > wrote: > > >> > > > > > > > >> > > > > >> Hi Shammon, > > >> > > > > >> > > >> > > > > >> Thanks for the explanations, I think I understand the > problem > > >> > better > > >> > > > > now. > > >> > > > > >> I have a couple of follow up questions, but first: > > >> > > > > >> > > >> > > > > >> >> 3. I'm pretty sure there are counter examples, where > your > > >> > > proposed > > >> > > > > >> mechanism of using checkpoints (even aligned!) will produce > > >> > > > > >> inconsistent data from the perspective of the event time. > > >> > > > > >> >> a) For example what if one of your "ETL" jobs, has the > > >> > following > > >> > > > > DAG: > > >> > > > > >> >> > > >> > > > > >> >> Even if you use aligned checkpoints for committing the > > >> data to > > >> > > the > > >> > > > > >> sink table, the watermarks of "Window1" and "Window2" are > > >> > completely > > >> > > > > >> independent. The sink table might easily have data from the > > >> > > > Src1/Window1 > > >> > > > > >> from the event time T1 and Src2/Window2 from later event > time > > >> T2. > > >> > > > > >> >> b) I think the same applies if you have two completely > > >> > > > > >> independent ETL jobs writing either to the same sink > table, or > > >> two > > >> > > to > > >> > > > > >> different sink tables (that are both later used in the same > > >> > > downstream > > >> > > > > job). > > >> > > > > >> > > > >> > > > > >> > Thank you for your feedback. I cannot see the DAG in 3.a > in > > >> your > > >> > > > > reply, > > >> > > > > >> > > >> > > > > >> I've attached the image directly. I hope you can see it > now. > > >> > > > > >> > > >> > > > > >> Basically what I meant is that if you have a topology like > > >> (from > > >> > the > > >> > > > > >> attached image): > > >> > > > > >> > > >> > > > > >> window1 = src1.keyBy(...).window(...) > > >> > > > > >> window2 = src2.keyBy(...).window(...) > > >> > > > > >> window1.join(window2, ...).addSink(sink) > > >> > > > > >> > > >> > > > > >> or with even simpler (note no keyBy between `src` and > > >> `process`): > > >> > > > > >> > > >> > > > > >> src.process(some_function_that_buffers_data)..addSink(sink) > > >> > > > > >> > > >> > > > > >> you will have the same problem. Generally speaking if > there is > > >> an > > >> > > > > >> operator buffering some data, and if the data are not > flushed > > >> on > > >> > > every > > >> > > > > >> checkpoint (any windowed or temporal operator, > > >> AsyncWaitOperator, > > >> > > CEP, > > >> > > > > >> ...), you can design a graph that will produce > "inconsistent" > > >> data > > >> > > as > > >> > > > > part > > >> > > > > >> of a checkpoint. > > >> > > > > >> > > >> > > > > >> Apart from that a couple of other questions/issues. > > >> > > > > >> > > >> > > > > >> > 1) Global Checkpoint Commit: a) "rolling fashion" or b) > > >> > altogether > > >> > > > > >> > > >> > > > > >> Do we need to support the "altogether" one? Rolling > > >> checkpoint, as > > >> > > > it's > > >> > > > > >> more independent, I could see it scale much better, and > avoid a > > >> > lot > > >> > > of > > >> > > > > >> problems that I mentioned before. > > >> > > > > >> > > >> > > > > >> > 1) Checkpoint VS Watermark > > >> > > > > >> > > > >> > > > > >> > 1. Stateful Computation is aligned according to Timestamp > > >> > Barrier > > >> > > > > >> > > >> > > > > >> Indeed the biggest obstacle I see here, is that we would > indeed > > >> > most > > >> > > > > >> likely have: > > >> > > > > >> > > >> > > > > >> > b) Similar to the window operator, align data in memory > > >> > according > > >> > > to > > >> > > > > >> Timestamp. > > >> > > > > >> > > >> > > > > >> for every operator. > > >> > > > > >> > > >> > > > > >> > 4. Failover supports Timestamp fine-grained data recovery > > >> > > > > >> > > > >> > > > > >> > As we mentioned in the FLIP, each ETL is a complex single > > >> node. > > >> > A > > >> > > > > single > > >> > > > > >> > ETL job failover should not cause the failure of the > entire > > >> "ETL > > >> > > > > >> Topology". > > >> > > > > >> > > >> > > > > >> I don't understand this point. Regardless if we are using > > >> > > > > >> rolling checkpoints, all at once checkpoints or > watermarks, I > > >> see > > >> > > the > > >> > > > > same > > >> > > > > >> problems with non determinism, if we want to preserve the > > >> > > requirement > > >> > > > to > > >> > > > > >> not fail over the whole topology at once. > > >> > > > > >> > > >> > > > > >> Both Watermarks and "rolling checkpoint" I think have the > same > > >> > > issue, > > >> > > > > >> that either require deterministic logic, or global > failover, or > > >> > > > > downstream > > >> > > > > >> jobs can only work on the already committed by the upstream > > >> > records. > > >> > > > But > > >> > > > > >> working with only "committed records" would either brake > > >> > consistency > > >> > > > > >> between different jobs, or would cause huge delay in > > >> checkpointing > > >> > > and > > >> > > > > e2e > > >> > > > > >> latency, as: > > >> > > > > >> 1. upstream job has to produce some data, downstream can > not > > >> > process > > >> > > > it, > > >> > > > > >> downstream can not process this data yet > > >> > > > > >> 2. checkpoint 42 is triggered on the upstream job > > >> > > > > >> 3. checkpoint 42 is completed on the upstream job, data > > >> processed > > >> > > > since > > >> > > > > >> last checkpoint has been committed > > >> > > > > >> 4. upstream job can continue producing more data > > >> > > > > >> 5. only now downstream can start processing the data > produced > > >> in > > >> > 1., > > >> > > > but > > >> > > > > >> it can not read the not-yet-committed data from 4. > > >> > > > > >> 6. once downstream finishes processing data from 1., it can > > >> > trigger > > >> > > > > >> checkpoint 42 > > >> > > > > >> > > >> > > > > >> The "all at once checkpoint", I can see only working with > > >> global > > >> > > > > failover > > >> > > > > >> of everything. > > >> > > > > >> > > >> > > > > >> This is assuming exactly-once mode. at-least-once would be > much > > >> > > > easier. > > >> > > > > >> > > >> > > > > >> Best, > > >> > > > > >> Piotrek > > >> > > > > >> > > >> > > > > >> wt., 13 gru 2022 o 08:57 Shammon FY <zjur...@gmail.com> > > >> > napisał(a): > > >> > > > > >> > > >> > > > > >>> Hi David, > > >> > > > > >>> > > >> > > > > >>> Thanks for the comments from you and @Piotr. I'd like to > > >> explain > > >> > > the > > >> > > > > >>> details about the FLIP first. > > >> > > > > >>> > > >> > > > > >>> 1) Global Checkpoint Commit: a) "rolling fashion" or b) > > >> > altogether > > >> > > > > >>> > > >> > > > > >>> This mainly depends on the needs of users. Users can > decide > > >> the > > >> > > data > > >> > > > > >>> version of tables in their queries according to different > > >> > > > requirements > > >> > > > > >>> for > > >> > > > > >>> data consistency and freshness. Since we manage multiple > > >> versions > > >> > > for > > >> > > > > >>> each > > >> > > > > >>> table, this will not bring too much complexity to the > system. > > >> We > > >> > > only > > >> > > > > >>> need > > >> > > > > >>> to support different strategies when calculating table > > >> versions > > >> > for > > >> > > > > >>> query. > > >> > > > > >>> So we give this decision to users, who can use > > >> "consistency.type" > > >> > > to > > >> > > > > set > > >> > > > > >>> different consistency in "Catalog". We can continue to > refine > > >> > this > > >> > > > > later. > > >> > > > > >>> For example, dynamic parameters support different > consistency > > >> > > > > >>> requirements > > >> > > > > >>> for each query > > >> > > > > >>> > > >> > > > > >>> 2) MetaService module > > >> > > > > >>> > > >> > > > > >>> Many Flink streaming jobs use application mode, and they > are > > >> > > > > independent > > >> > > > > >>> of > > >> > > > > >>> each other. So we currently assume that MetaService is an > > >> > > independent > > >> > > > > >>> node. > > >> > > > > >>> In the first phase, it will be started in standalone, and > HA > > >> will > > >> > > be > > >> > > > > >>> supported later. This node will reuse many Flink modules, > > >> > including > > >> > > > > REST, > > >> > > > > >>> Gateway-RpcServer, etc. We hope that the core functions of > > >> > > > MetaService > > >> > > > > >>> can > > >> > > > > >>> be developed as a component. When Flink subsequently uses > a > > >> large > > >> > > > > session > > >> > > > > >>> cluster to support various computations, it can be > integrated > > >> > into > > >> > > > the > > >> > > > > >>> "ResourceManager" as a plug-in component. > > >> > > > > >>> > > >> > > > > >>> Besides above, I'd like to describe the Checkpoint and > > >> Watermark > > >> > > > > >>> mechanisms > > >> > > > > >>> in detail as follows. > > >> > > > > >>> > > >> > > > > >>> 1) Checkpoint VS Watermark > > >> > > > > >>> > > >> > > > > >>> As you mentioned, I think it's very correct that what we > want > > >> in > > >> > > the > > >> > > > > >>> Checkpoint is to align streaming computation and data > > >> according > > >> > to > > >> > > > > >>> certain > > >> > > > > >>> semantics. Timestamp is a very ideal solution. To achieve > this > > >> > > goal, > > >> > > > we > > >> > > > > >>> can > > >> > > > > >>> think of the following functions that need to be > supported in > > >> the > > >> > > > > >>> Watermark > > >> > > > > >>> mechanism: > > >> > > > > >>> > > >> > > > > >>> 1. Stateful Computation is aligned according to Timestamp > > >> Barrier > > >> > > > > >>> > > >> > > > > >>> As the "three tables example" we discussed above, we need > to > > >> > align > > >> > > > the > > >> > > > > >>> stateful operator computation according to the barrier to > > >> ensure > > >> > > the > > >> > > > > >>> consistency of the result data. In order to align the > > >> > computation, > > >> > > > > there > > >> > > > > >>> are two ways in my mind > > >> > > > > >>> > > >> > > > > >>> a) Similar to the Aligned Checkpoint Barrier. Timestamp > > >> Barrier > > >> > > > aligns > > >> > > > > >>> data > > >> > > > > >>> according to the channel, which will lead to backpressure > just > > >> > like > > >> > > > the > > >> > > > > >>> aligned checkpoint. It seems not a good idea. > > >> > > > > >>> > > >> > > > > >>> b) Similar to the window operator, align data in memory > > >> according > > >> > > to > > >> > > > > >>> Timestamp. Two steps need to be supported here: first, > data is > > >> > > > aligned > > >> > > > > by > > >> > > > > >>> timestamp for state operators; secondly, Timestamp is > strictly > > >> > > > > >>> sequential, > > >> > > > > >>> global aggregation operators need to perform aggregation > in > > >> > > timestamp > > >> > > > > >>> order > > >> > > > > >>> and output the final results. > > >> > > > > >>> > > >> > > > > >>> 2. Coordinate multiple source nodes to assign unified > > >> Timestamp > > >> > > > > Barriers > > >> > > > > >>> > > >> > > > > >>> Since the stateful operator needs to be aligned according > to > > >> the > > >> > > > > >>> Timestamp > > >> > > > > >>> Barrier, source subtasks of multiple jobs should generate > the > > >> > same > > >> > > > > >>> Timestamp Barrier. ETL jobs consuming RootTable should > > >> interact > > >> > > with > > >> > > > > >>> "MetaService" to generate the same Timestamp T1, T2, T3 > ... > > >> and > > >> > so > > >> > > > on. > > >> > > > > >>> > > >> > > > > >>> 3. JobManager needs to manage the completed Timestamp > Barrier > > >> > > > > >>> > > >> > > > > >>> When the Timestamp Barrier of the ETL job has been > completed, > > >> it > > >> > > > means > > >> > > > > >>> that > > >> > > > > >>> the data of the specified Timestamp can be queried by > users. > > >> > > > JobManager > > >> > > > > >>> needs to summarize its Timestamp processing and report the > > >> > > completed > > >> > > > > >>> Timestamp and data snapshots to the MetaServer. > > >> > > > > >>> > > >> > > > > >>> 4. Failover supports Timestamp fine-grained data recovery > > >> > > > > >>> > > >> > > > > >>> As we mentioned in the FLIP, each ETL is a complex single > > >> node. A > > >> > > > > single > > >> > > > > >>> ETL job failover should not cause the failure of the > entire > > >> "ETL > > >> > > > > >>> Topology". > > >> > > > > >>> This requires that the result data of Timestamp generated > by > > >> > > upstream > > >> > > > > ETL > > >> > > > > >>> should be deterministic. > > >> > > > > >>> > > >> > > > > >>> a) The determinacy of Timestamp, that is, before and > after ETL > > >> > job > > >> > > > > >>> failover, the same Timestamp sequence must be generated. > Each > > >> > > > > Checkpoint > > >> > > > > >>> needs to record the included Timestamp list, especially > the > > >> > source > > >> > > > node > > >> > > > > >>> of > > >> > > > > >>> the RootTable. After Failover, it needs to regenerate > > >> Timestamp > > >> > > > > according > > >> > > > > >>> to the Timestamp list. > > >> > > > > >>> > > >> > > > > >>> b) The determinacy of Timestamp data, that is, the same > > >> Timestamp > > >> > > > needs > > >> > > > > >>> to > > >> > > > > >>> replay the same data before and after Failover, and > generate > > >> the > > >> > > same > > >> > > > > >>> results in Sink Table. Each Timestamp must save start and > end > > >> > > offsets > > >> > > > > (or > > >> > > > > >>> snapshot id) of RootTable. After failover, the source > nodes > > >> need > > >> > to > > >> > > > > >>> replay > > >> > > > > >>> the data according to the offset to ensure that the data > of > > >> each > > >> > > > > >>> Timestamp > > >> > > > > >>> is consistent before and after Failover. > > >> > > > > >>> > > >> > > > > >>> For the specific requirements and complexity, please help > to > > >> > review > > >> > > > > when > > >> > > > > >>> you are free @David @Piotr, thanks :) > > >> > > > > >>> > > >> > > > > >>> 2) Evolution from Checkpoint to Timestamp Mechanism > > >> > > > > >>> > > >> > > > > >>> You give a very important question in your reply which I > > >> missed > > >> > > > before: > > >> > > > > >>> if > > >> > > > > >>> Aligned Checkpoint is used in the first stage, how > complex is > > >> the > > >> > > > > >>> evolution > > >> > > > > >>> from Checkpoint to Timestamp later? I made a general > > >> comparison > > >> > > here, > > >> > > > > >>> which > > >> > > > > >>> may not be very detailed. There are three roles in the > whole > > >> > > system: > > >> > > > > >>> MetaService, Flink ETL Job and Table Store. > > >> > > > > >>> > > >> > > > > >>> a) MetaService > > >> > > > > >>> > > >> > > > > >>> It manages the data consistency among multiple ETL jobs, > > >> > including > > >> > > > > >>> coordinating the Barrier for the Source ETL nodes, > setting the > > >> > > > starting > > >> > > > > >>> Barrier for ETL job startup, and calculating the Table > version > > >> > for > > >> > > > > >>> queries > > >> > > > > >>> according to different strategies. It has little to do > with > > >> > > > Checkpoint > > >> > > > > in > > >> > > > > >>> fact, we can pay attention to it when designing the API > and > > >> > > > > implementing > > >> > > > > >>> the functions. > > >> > > > > >>> > > >> > > > > >>> b) Flink ETL Job > > >> > > > > >>> > > >> > > > > >>> At present, the workload is relatively small and we need > to > > >> > trigger > > >> > > > > >>> checkpoints in CheckpointCoordinator manually by > > >> SplitEnumerator. > > >> > > > > >>> > > >> > > > > >>> c) Table Store > > >> > > > > >>> > > >> > > > > >>> Table Store mainly provides the ability to write and read > > >> data. > > >> > > > > >>> > > >> > > > > >>> c.1) Write data. At present, Table Store generates > snapshots > > >> > > > according > > >> > > > > to > > >> > > > > >>> two phases in Flink. When using Checkpoint as consistency > > >> > > management, > > >> > > > > we > > >> > > > > >>> need to write checkpoint information to snapshots. After > using > > >> > > > > Timestamp > > >> > > > > >>> Barrier, the snapshot in Table Store may be disassembled > more > > >> > > finely, > > >> > > > > and > > >> > > > > >>> we need to write Timestamp information to the data file. A > > >> > > > > "checkpointed > > >> > > > > >>> snapshot" may contain multiple "Timestamp snapshots". > > >> > > > > >>> > > >> > > > > >>> c.2) Read data. The SplitEnumerator that reads data from > the > > >> > Table > > >> > > > > Store > > >> > > > > >>> will manage multiple splits according to the version > number. > > >> > After > > >> > > > the > > >> > > > > >>> specified splits are completed, it sends a Barrier > command to > > >> > > > trigger a > > >> > > > > >>> checkpoint in the ETL job. The source node will broadcast > the > > >> > > > > checkpoint > > >> > > > > >>> barrier downstream after receiving it. When using > Timestamp > > >> > > Barrier, > > >> > > > > the > > >> > > > > >>> overall process is similar, but the SplitEnumerator does > not > > >> need > > >> > > to > > >> > > > > >>> trigger a checkpoint to the Flink ETL, and the Source node > > >> needs > > >> > to > > >> > > > > >>> support > > >> > > > > >>> broadcasting Timestamp Barrier to the downstream at that > time. > > >> > > > > >>> > > >> > > > > >>> From the above overall, the evolution complexity from > > >> Checkpoint > > >> > to > > >> > > > > >>> Timestamp seems controllable, but the specific > implementation > > >> > needs > > >> > > > > >>> careful > > >> > > > > >>> design, and the concept and features of Checkpoint should > not > > >> be > > >> > > > > >>> introduced > > >> > > > > >>> too much into relevant interfaces and functions. > > >> > > > > >>> > > >> > > > > >>> What do you think of it? Looking forward to your feedback, > > >> thanks > > >> > > > > >>> > > >> > > > > >>> Best, > > >> > > > > >>> Shammon > > >> > > > > >>> > > >> > > > > >>> > > >> > > > > >>> > > >> > > > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek < > > >> d...@apache.org> > > >> > > > > wrote: > > >> > > > > >>> > > >> > > > > >>> > Hi Shammon, > > >> > > > > >>> > > > >> > > > > >>> > I'm starting to see what you're trying to achieve, and > it's > > >> > > really > > >> > > > > >>> > exciting. I share Piotr's concerns about e2e latency and > > >> > > disability > > >> > > > > to > > >> > > > > >>> use > > >> > > > > >>> > unaligned checkpoints. > > >> > > > > >>> > > > >> > > > > >>> > I have a couple of questions that are not clear to me > from > > >> > going > > >> > > > over > > >> > > > > >>> the > > >> > > > > >>> > FLIP: > > >> > > > > >>> > > > >> > > > > >>> > 1) Global Checkpoint Commit > > >> > > > > >>> > > > >> > > > > >>> > Are you planning on committing the checkpoints in a) a > > >> "rolling > > >> > > > > >>> fashion" - > > >> > > > > >>> > one pipeline after another, or b) altogether - once the > data > > >> > have > > >> > > > > been > > >> > > > > >>> > processed by all pipelines? > > >> > > > > >>> > > > >> > > > > >>> > Option a) would be eventually consistent (for batch > queries, > > >> > > you'd > > >> > > > > >>> need to > > >> > > > > >>> > use the last checkpoint produced by the most downstream > > >> table), > > >> > > > > >>> whereas b) > > >> > > > > >>> > would be strongly consistent at the cost of increasing > the > > >> e2e > > >> > > > > latency > > >> > > > > >>> even > > >> > > > > >>> > more. > > >> > > > > >>> > > > >> > > > > >>> > I feel that option a) is what this should be headed for. > > >> > > > > >>> > > > >> > > > > >>> > 2) MetaService > > >> > > > > >>> > > > >> > > > > >>> > Should this be a new general Flink component or one > > >> specific to > > >> > > the > > >> > > > > >>> Flink > > >> > > > > >>> > Table Store? > > >> > > > > >>> > > > >> > > > > >>> > 3) Follow-ups > > >> > > > > >>> > > > >> > > > > >>> > From the above discussion, there is a consensus that, > in the > > >> > > ideal > > >> > > > > >>> case, > > >> > > > > >>> > watermarks would be a way to go, but there is some > > >> underlying > > >> > > > > mechanism > > >> > > > > >>> > missing. It would be great to discuss this option in > more > > >> > detail > > >> > > to > > >> > > > > >>> compare > > >> > > > > >>> > the solutions in terms of implementation cost, maybe it > > >> could > > >> > not > > >> > > > be > > >> > > > > as > > >> > > > > >>> > complex. > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > > > >>> > All in all, I don't feel that checkpoints are suitable > for > > >> > > > providing > > >> > > > > >>> > consistent table versioning between multiple pipelines. > The > > >> > main > > >> > > > > >>> reason is > > >> > > > > >>> > that they are designed to be a fault tolerance > mechanism. > > >> > > Somewhere > > >> > > > > >>> between > > >> > > > > >>> > the lines, you've already noted that the primitive > you're > > >> > looking > > >> > > > for > > >> > > > > >>> is > > >> > > > > >>> > cross-pipeline barrier alignment, which is the > mechanism a > > >> > subset > > >> > > > of > > >> > > > > >>> > currently supported checkpointing implementations > happen to > > >> be > > >> > > > using. > > >> > > > > >>> Is > > >> > > > > >>> > that correct? > > >> > > > > >>> > > > >> > > > > >>> > My biggest concern is that tying this with a > "side-effect" > > >> of > > >> > the > > >> > > > > >>> > checkpointing mechanism could block us from evolving it > > >> > further. > > >> > > > > >>> > > > >> > > > > >>> > Best, > > >> > > > > >>> > D. > > >> > > > > >>> > > > >> > > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY < > > >> zjur...@gmail.com> > > >> > > > > wrote: > > >> > > > > >>> > > > >> > > > > >>> > > Hi Piotr, > > >> > > > > >>> > > > > >> > > > > >>> > > Thank you for your feedback. I cannot see the DAG in > 3.a > > >> in > > >> > > your > > >> > > > > >>> reply, > > >> > > > > >>> > but > > >> > > > > >>> > > I'd like to answer some questions first. > > >> > > > > >>> > > > > >> > > > > >>> > > Your understanding is very correct. We want to align > the > > >> data > > >> > > > > >>> versions of > > >> > > > > >>> > > all intermediate tables through checkpoint mechanism > in > > >> > Flink. > > >> > > > I'm > > >> > > > > >>> sorry > > >> > > > > >>> > > that I have omitted some default constraints in FLIP, > > >> > including > > >> > > > > only > > >> > > > > >>> > > supporting aligned checkpoints; one table can only be > > >> written > > >> > > by > > >> > > > > one > > >> > > > > >>> ETL > > >> > > > > >>> > > job. I will add these later. > > >> > > > > >>> > > > > >> > > > > >>> > > Why can't the watermark mechanism achieve the data > > >> > consistency > > >> > > we > > >> > > > > >>> wanted? > > >> > > > > >>> > > For example, there are 3 tables, Table1 is word table, > > >> Table2 > > >> > > is > > >> > > > > >>> > word->cnt > > >> > > > > >>> > > table and Table3 is cnt1->cnt2 table. > > >> > > > > >>> > > > > >> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO Table2 > SELECT > > >> > word, > > >> > > > > >>> count(*) > > >> > > > > >>> > > FROM Table1 GROUP BY word > > >> > > > > >>> > > > > >> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO Table3 > SELECT > > >> cnt, > > >> > > > > >>> count(*) > > >> > > > > >>> > FROM > > >> > > > > >>> > > Table2 GROUP BY cnt > > >> > > > > >>> > > > > >> > > > > >>> > > ETL1 has 2 subtasks to read multiple buckets from > Table1, > > >> > where > > >> > > > > >>> subtask1 > > >> > > > > >>> > > reads streaming data as [a, b, c, a, d, a, b, c, d > ...] > > >> and > > >> > > > > subtask2 > > >> > > > > >>> > reads > > >> > > > > >>> > > streaming data as [a, c, d, q, a, v, c, d ...]. > > >> > > > > >>> > > > > >> > > > > >>> > > 1. Unbounded streaming data is divided into multiple > sets > > >> > > > according > > >> > > > > >>> to > > >> > > > > >>> > some > > >> > > > > >>> > > semantic requirements. The most extreme may be one > set for > > >> > each > > >> > > > > data. > > >> > > > > >>> > > Assume that the sets of subtask1 and subtask2 > separated by > > >> > the > > >> > > > same > > >> > > > > >>> > > semantics are [a, b, c, a, d] and [a, c, d, q], > > >> respectively. > > >> > > > > >>> > > > > >> > > > > >>> > > 2. After the above two sets are computed by ETL1, the > > >> result > > >> > > data > > >> > > > > >>> > generated > > >> > > > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d, 2), (q, > 1)]. > > >> > > > > >>> > > > > >> > > > > >>> > > 3. The result data generated in Table 3 after the > data in > > >> > > Table 2 > > >> > > > > is > > >> > > > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3, 1)] > > >> > > > > >>> > > > > >> > > > > >>> > > We want to align the data of Table1, Table2 and > Table3 and > > >> > > manage > > >> > > > > the > > >> > > > > >>> > data > > >> > > > > >>> > > versions. When users execute OLAP/Batch queries join > on > > >> these > > >> > > > > >>> tables, the > > >> > > > > >>> > > following consistency data can be found > > >> > > > > >>> > > > > >> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q] > > >> > > > > >>> > > > > >> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2], [q, 1] > > >> > > > > >>> > > > > >> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1] > > >> > > > > >>> > > > > >> > > > > >>> > > Users can perform query: SELECT t1.word, t2.cnt, > t3.cnt2 > > >> from > > >> > > > > Table1 > > >> > > > > >>> t1 > > >> > > > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on t1.word=t2.word and > > >> > > > > t2.cnt=t3.cnt1; > > >> > > > > >>> > > > > >> > > > > >>> > > In the view of users, the data is consistent on a > unified > > >> > > > "version" > > >> > > > > >>> > between > > >> > > > > >>> > > Table1, Table2 and Table3. > > >> > > > > >>> > > > > >> > > > > >>> > > In the current Flink implementation, the aligned > > >> checkpoint > > >> > can > > >> > > > > >>> achieve > > >> > > > > >>> > the > > >> > > > > >>> > > above capabilities (let's ignore the segmentation > > >> semantics > > >> > of > > >> > > > > >>> checkpoint > > >> > > > > >>> > > first). Because the Checkpoint Barrier will align the > data > > >> > when > > >> > > > > >>> > performing > > >> > > > > >>> > > the global Count aggregation, we can associate the > > >> snapshot > > >> > > with > > >> > > > > the > > >> > > > > >>> > > checkpoint in the Table Store, query the specified > > >> snapshot > > >> > of > > >> > > > > >>> > > Table1/Table2/Table3 through the checkpoint, and > achieve > > >> the > > >> > > > > >>> consistency > > >> > > > > >>> > > requirements of the above unified "version". > > >> > > > > >>> > > > > >> > > > > >>> > > Current watermark mechanism in Flink cannot achieve > the > > >> above > > >> > > > > >>> > consistency. > > >> > > > > >>> > > For example, we use watermark to divide data into > multiple > > >> > sets > > >> > > > in > > >> > > > > >>> > subtask1 > > >> > > > > >>> > > and subtask2 as followed > > >> > > > > >>> > > > > >> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a, T1), (d, > T1)], > > >> > T1, > > >> > > > [(a, > > >> > > > > >>> T2), > > >> > > > > >>> > > (b, T2), (c, T2), (d, T2)], T2 > > >> > > > > >>> > > > > >> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1, > > >> .... > > >> > > > > >>> > > > > >> > > > > >>> > > As Flink watermark does not have barriers and cannot > align > > >> > > data, > > >> > > > > ETL1 > > >> > > > > >>> > Count > > >> > > > > >>> > > operator may compute the data of subtask1 first: [(a, > T1), > > >> > (b, > > >> > > > T1), > > >> > > > > >>> (c, > > >> > > > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b, T2)], then > > >> compute > > >> > > the > > >> > > > > >>> data of > > >> > > > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1, > which > > >> is > > >> > > not > > >> > > > > >>> possible > > >> > > > > >>> > > in aligned checkpoint. > > >> > > > > >>> > > > > >> > > > > >>> > > In this order, the result output to Table2 after the > Count > > >> > > > > >>> aggregation > > >> > > > > >>> > will > > >> > > > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1), (a, 2, T1), > (a, 3, > > >> > T2), > > >> > > > (b, > > >> > > > > >>> 2, > > >> > > > > >>> > T2), > > >> > > > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1, T1), which > can > > >> be > > >> > > > > >>> simplified > > >> > > > > >>> > as: > > >> > > > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4, T1), (c, > 2, > > >> T1), > > >> > > (d, > > >> > > > 1, > > >> > > > > >>> T1), > > >> > > > > >>> > > (q, 1, T1)] > > >> > > > > >>> > > > > >> > > > > >>> > > There's no (a, 3, T1), we have been unable to query > > >> > consistent > > >> > > > data > > >> > > > > >>> > results > > >> > > > > >>> > > on Table1 and Table2 according to T1. Table 3 has the > same > > >> > > > problem. > > >> > > > > >>> > > > > >> > > > > >>> > > In addition to using Checkpoint Barrier, the other > > >> > > implementation > > >> > > > > >>> > > supporting watermark above is to convert Count > aggregation > > >> > into > > >> > > > > >>> Window > > >> > > > > >>> > > Count. After the global Count is converted into window > > >> > > operator, > > >> > > > it > > >> > > > > >>> needs > > >> > > > > >>> > > to support cross window data computation. Similar to > the > > >> data > > >> > > > > >>> > relationship > > >> > > > > >>> > > between the previous and the current Checkpoint, it is > > >> > > equivalent > > >> > > > > to > > >> > > > > >>> > > introducing the Watermark Barrier, which requires > > >> adjustments > > >> > > to > > >> > > > > the > > >> > > > > >>> > > current Flink Watermark mechanism. > > >> > > > > >>> > > > > >> > > > > >>> > > Besides the above global aggregation, there are window > > >> > > operators > > >> > > > in > > >> > > > > >>> > Flink. > > >> > > > > >>> > > I don't know if my understanding is correct(I cannot > see > > >> the > > >> > > DAG > > >> > > > in > > >> > > > > >>> your > > >> > > > > >>> > > example), please correct me if it's wrong. I think you > > >> raise > > >> > a > > >> > > > very > > >> > > > > >>> > > important and interesting question: how to define data > > >> > > > consistency > > >> > > > > in > > >> > > > > >>> > > different window computations which will generate > > >> different > > >> > > > > >>> timestamps of > > >> > > > > >>> > > the same data. This situation also occurs when using > event > > >> > time > > >> > > > to > > >> > > > > >>> align > > >> > > > > >>> > > data. At present, what I can think of is to store > these > > >> > > > information > > >> > > > > >>> in > > >> > > > > >>> > > Table Store, users can perform filter or join on data > with > > >> > > them. > > >> > > > > This > > >> > > > > >>> > FLIP > > >> > > > > >>> > > is our first phase, and the specific implementation of > > >> this > > >> > > will > > >> > > > be > > >> > > > > >>> > > designed and considered in the next phase and FLIP. > > >> > > > > >>> > > > > >> > > > > >>> > > Although the Checkpoint Barrier can achieve the most > basic > > >> > > > > >>> consistency, > > >> > > > > >>> > as > > >> > > > > >>> > > you mentioned, using the Checkpoint mechanism will > cause > > >> many > > >> > > > > >>> problems, > > >> > > > > >>> > > including the increase of checkpoint time for multiple > > >> > cascade > > >> > > > > jobs, > > >> > > > > >>> the > > >> > > > > >>> > > increase of E2E data freshness time (several minutes > or > > >> even > > >> > > > dozens > > >> > > > > >>> of > > >> > > > > >>> > > minutes), and the increase of the overall system > > >> complexity. > > >> > At > > >> > > > the > > >> > > > > >>> same > > >> > > > > >>> > > time, the semantics of Checkpoint data segmentation is > > >> > unclear. > > >> > > > > >>> > > > > >> > > > > >>> > > The current FLIP is the first phase of our whole > proposal, > > >> > and > > >> > > > you > > >> > > > > >>> can > > >> > > > > >>> > find > > >> > > > > >>> > > the follow-up plan in our future worker. In the first > > >> stage, > > >> > we > > >> > > > do > > >> > > > > >>> not > > >> > > > > >>> > want > > >> > > > > >>> > > to modify the Flink mechanism. We'd like to realize > basic > > >> > > system > > >> > > > > >>> > functions > > >> > > > > >>> > > based on existing mechanisms in Flink, including the > > >> > > relationship > > >> > > > > >>> > > management of ETL and tables, and the basic data > > >> consistency, > > >> > > so > > >> > > > we > > >> > > > > >>> > choose > > >> > > > > >>> > > Global Checkpoint in our FLIP. > > >> > > > > >>> > > > > >> > > > > >>> > > We agree with you very much that event time is more > > >> suitable > > >> > > for > > >> > > > > data > > >> > > > > >>> > > consistency management. We'd like consider this > matter in > > >> the > > >> > > > > second > > >> > > > > >>> or > > >> > > > > >>> > > third stage after the current FLIP. We hope to > improve the > > >> > > > > watermark > > >> > > > > >>> > > mechanism in Flink to support barriers. As you > mentioned > > >> in > > >> > > your > > >> > > > > >>> reply, > > >> > > > > >>> > we > > >> > > > > >>> > > can achieve data consistency based on timestamp, while > > >> > > > maintaining > > >> > > > > >>> E2E > > >> > > > > >>> > data > > >> > > > > >>> > > freshness of seconds or even milliseconds for 10+ > cascaded > > >> > > jobs. > > >> > > > > >>> > > > > >> > > > > >>> > > What do you think? Thanks > > >> > > > > >>> > > > > >> > > > > >>> > > Best, > > >> > > > > >>> > > Shammon > > >> > > > > >>> > > > > >> > > > > >>> > > > > >> > > > > >>> > > > > >> > > > > >>> > > > > >> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr Nowojski < > > >> > > > > pnowoj...@apache.org> > > >> > > > > >>> > > wrote: > > >> > > > > >>> > > > > >> > > > > >>> > > > Hi Shammon, > > >> > > > > >>> > > > > > >> > > > > >>> > > > Do I understand it correctly, that you effectively > want > > >> to > > >> > > > expand > > >> > > > > >>> the > > >> > > > > >>> > > > checkpoint alignment mechanism across many different > > >> jobs > > >> > and > > >> > > > > hand > > >> > > > > >>> over > > >> > > > > >>> > > > checkpoint barriers from upstream to downstream jobs > > >> using > > >> > > the > > >> > > > > >>> > > intermediate > > >> > > > > >>> > > > tables? > > >> > > > > >>> > > > > > >> > > > > >>> > > > Re the watermarks for the "Rejected Alternatives". I > > >> don't > > >> > > > > >>> understand > > >> > > > > >>> > why > > >> > > > > >>> > > > this has been rejected. Could you elaborate on this > > >> point? > > >> > > Here > > >> > > > > >>> are a > > >> > > > > >>> > > > couple of my thoughts on this matter, but please > > >> correct me > > >> > > if > > >> > > > > I'm > > >> > > > > >>> > wrong, > > >> > > > > >>> > > > as I haven't dived deeper into this topic. > > >> > > > > >>> > > > > > >> > > > > >>> > > > > As shown above, there are 2 watermarks T1 and T2, > T1 < > > >> > T2. > > >> > > > > >>> > > > > The StreamTask reads data in order: > > >> > > > > >>> > > > V11,V12,V21,T1(channel1),V13,T1(channel2). > > >> > > > > >>> > > > > At this time, StreamTask will confirm that > watermark > > >> T1 > > >> > is > > >> > > > > >>> completed, > > >> > > > > >>> > > > but the data beyond > > >> > > > > >>> > > > > T1 has been processed(V13) and the results are > > >> written to > > >> > > the > > >> > > > > >>> sink > > >> > > > > >>> > > > table. > > >> > > > > >>> > > > > > >> > > > > >>> > > > 1. I see the same "problem" with unaligned > checkpoints > > >> in > > >> > > your > > >> > > > > >>> current > > >> > > > > >>> > > > proposal. > > >> > > > > >>> > > > 2. I don't understand why this is a problem? Just > store > > >> in > > >> > > the > > >> > > > > >>> "sink > > >> > > > > >>> > > > table" what's the watermark (T1), and downstream > jobs > > >> > should > > >> > > > > >>> process > > >> > > > > >>> > the > > >> > > > > >>> > > > data with that "watermark" anyway. Record "V13" > should > > >> be > > >> > > > treated > > >> > > > > >>> as > > >> > > > > >>> > > > "early" data. Downstream jobs if: > > >> > > > > >>> > > > a) they are streaming jobs, for example they should > > >> > > aggregate > > >> > > > it > > >> > > > > >>> in > > >> > > > > >>> > > > windowed/temporal state, but they shouldn't produce > the > > >> > > result > > >> > > > > that > > >> > > > > >>> > > > contains it, as the watermark T2 was not yet > processed. > > >> Or > > >> > > they > > >> > > > > >>> would > > >> > > > > >>> > > just > > >> > > > > >>> > > > pass that record as "early" data. > > >> > > > > >>> > > > b) they are batch jobs, it looks to me like batch > jobs > > >> > > > shouldn't > > >> > > > > >>> take > > >> > > > > >>> > > > "all available data", but only consider "all the > data > > >> until > > >> > > > some > > >> > > > > >>> > > > watermark", for example the latest available: T1 > > >> > > > > >>> > > > > > >> > > > > >>> > > > 3. I'm pretty sure there are counter examples, where > > >> your > > >> > > > > proposed > > >> > > > > >>> > > > mechanism of using checkpoints (even aligned!) will > > >> produce > > >> > > > > >>> > > > inconsistent data from the perspective of the event > > >> time. > > >> > > > > >>> > > > a) For example what if one of your "ETL" jobs, > has the > > >> > > > > following > > >> > > > > >>> DAG: > > >> > > > > >>> > > > [image: flip276.jpg] > > >> > > > > >>> > > > Even if you use aligned checkpoints for > committing the > > >> > data > > >> > > > to > > >> > > > > >>> the > > >> > > > > >>> > sink > > >> > > > > >>> > > > table, the watermarks of "Window1" and "Window2" are > > >> > > completely > > >> > > > > >>> > > > independent. The sink table might easily have data > from > > >> the > > >> > > > > >>> > Src1/Window1 > > >> > > > > >>> > > > from the event time T1 and Src2/Window2 from later > event > > >> > time > > >> > > > T2. > > >> > > > > >>> > > > b) I think the same applies if you have two > completely > > >> > > > > >>> independent > > >> > > > > >>> > ETL > > >> > > > > >>> > > > jobs writing either to the same sink table, or two > to > > >> > > different > > >> > > > > >>> sink > > >> > > > > >>> > > tables > > >> > > > > >>> > > > (that are both later used in the same downstream > job). > > >> > > > > >>> > > > > > >> > > > > >>> > > > 4a) I'm not sure if I like the idea of centralising > the > > >> > whole > > >> > > > > >>> system in > > >> > > > > >>> > > > this way. If you have 10 jobs, the likelihood of the > > >> > > checkpoint > > >> > > > > >>> failure > > >> > > > > >>> > > > will be 10 times higher, and/or the duration of the > > >> > > checkpoint > > >> > > > > can > > >> > > > > >>> be > > >> > > > > >>> > > much > > >> > > > > >>> > > > much longer (especially under backpressure). And > this is > > >> > > > actually > > >> > > > > >>> > > already a > > >> > > > > >>> > > > limitation of Apache Flink (global checkpoints are > more > > >> > prone > > >> > > > to > > >> > > > > >>> fail > > >> > > > > >>> > the > > >> > > > > >>> > > > larger the scale), so I would be anxious about > making it > > >> > > > > >>> potentially > > >> > > > > >>> > > even a > > >> > > > > >>> > > > larger issue. > > >> > > > > >>> > > > 4b) I'm also worried about increased complexity of > the > > >> > system > > >> > > > > after > > >> > > > > >>> > > adding > > >> > > > > >>> > > > the global checkpoint, and additional (single?) > point of > > >> > > > failure. > > >> > > > > >>> > > > 5. Such a design would also not work if we ever > wanted > > >> to > > >> > > have > > >> > > > > task > > >> > > > > >>> > local > > >> > > > > >>> > > > checkpoints. > > >> > > > > >>> > > > > > >> > > > > >>> > > > All in all, it seems to me like actually the > watermarks > > >> and > > >> > > > even > > >> > > > > >>> time > > >> > > > > >>> > are > > >> > > > > >>> > > > the better concept in this context that should have > been > > >> > used > > >> > > > for > > >> > > > > >>> > > > synchronising and data consistency across the whole > > >> system. > > >> > > > > >>> > > > > > >> > > > > >>> > > > Best, > > >> > > > > >>> > > > Piotrek > > >> > > > > >>> > > > > > >> > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY < > zjur...@gmail.com> > > >> > > > > >>> napisał(a): > > >> > > > > >>> > > > > > >> > > > > >>> > > >> Hi @Martijn > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> Thanks for your comments, and I'd like to reply to > them > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> 1. It sounds good to me, I'll update the content > > >> structure > > >> > > in > > >> > > > > FLIP > > >> > > > > >>> > later > > >> > > > > >>> > > >> and give the problems first. > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> 2. "Each ETL job creates snapshots with checkpoint > > >> info on > > >> > > > sink > > >> > > > > >>> tables > > >> > > > > >>> > > in > > >> > > > > >>> > > >> Table Store" -> That reads like you're proposing > that > > >> > > > snapshots > > >> > > > > >>> need > > >> > > > > >>> > to > > >> > > > > >>> > > >> be > > >> > > > > >>> > > >> written to Table Store? > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> Yes. To support the data consistency in the FLIP, > we > > >> need > > >> > to > > >> > > > get > > >> > > > > >>> > through > > >> > > > > >>> > > >> checkpoints in Flink and snapshots in store, this > > >> > requires a > > >> > > > > close > > >> > > > > >>> > > >> combination of Flink and store implementation. In > the > > >> > first > > >> > > > > stage > > >> > > > > >>> we > > >> > > > > >>> > > plan > > >> > > > > >>> > > >> to implement it based on Flink and Table Store > only, > > >> > > snapshots > > >> > > > > >>> written > > >> > > > > >>> > > to > > >> > > > > >>> > > >> external storage don't support consistency. > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> 3. If you introduce a MetaService, it becomes the > > >> single > > >> > > point > > >> > > > > of > > >> > > > > >>> > > failure > > >> > > > > >>> > > >> because it coordinates everything. But I can't find > > >> > anything > > >> > > > in > > >> > > > > >>> the > > >> > > > > >>> > FLIP > > >> > > > > >>> > > >> on > > >> > > > > >>> > > >> making the MetaService high available or how to > deal > > >> with > > >> > > > > >>> failovers > > >> > > > > >>> > > there. > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> I think you raise a very important problem and I > > >> missed it > > >> > > in > > >> > > > > >>> FLIP. > > >> > > > > >>> > The > > >> > > > > >>> > > >> MetaService is a single point and should support > > >> failover, > > >> > > we > > >> > > > > >>> will do > > >> > > > > >>> > it > > >> > > > > >>> > > >> in > > >> > > > > >>> > > >> future in the first stage we only support > standalone > > >> mode, > > >> > > THX > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> 4. The FLIP states under Rejected Alternatives > > >> "Currently > > >> > > > > >>> watermark in > > >> > > > > >>> > > >> Flink cannot align data." which is not true, given > that > > >> > > there > > >> > > > is > > >> > > > > >>> > > FLIP-182 > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> > > >> > > > > >>> > > > > >> > > > > >>> > > > >> > > > > >>> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> Watermark alignment in FLIP-182 is different from > > >> > > requirements > > >> > > > > >>> > > "watermark > > >> > > > > >>> > > >> align data" in our FLIP. FLIP-182 aims to fix > watermark > > >> > > > > >>> generation in > > >> > > > > >>> > > >> different sources for "slight imbalance or data > skew", > > >> > which > > >> > > > > >>> means in > > >> > > > > >>> > > some > > >> > > > > >>> > > >> cases the source must generate watermark even if > they > > >> > should > > >> > > > > not. > > >> > > > > >>> When > > >> > > > > >>> > > the > > >> > > > > >>> > > >> operator collects watermarks, the data processing > is as > > >> > > > > described > > >> > > > > >>> in > > >> > > > > >>> > our > > >> > > > > >>> > > >> FLIP, and the data cannot be aligned through the > > >> barrier > > >> > > like > > >> > > > > >>> > > Checkpoint. > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> 5. Given the MetaService role, it feels like this > is > > >> > > > > introducing a > > >> > > > > >>> > tight > > >> > > > > >>> > > >> dependency between Flink and the Table Store. How > > >> > pluggable > > >> > > is > > >> > > > > >>> this > > >> > > > > >>> > > >> solution, given the changes that need to be made to > > >> Flink > > >> > in > > >> > > > > >>> order to > > >> > > > > >>> > > >> support this? > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> This is a good question, and I will try to expand > it. > > >> Most > > >> > > of > > >> > > > > the > > >> > > > > >>> work > > >> > > > > >>> > > >> will > > >> > > > > >>> > > >> be completed in the Table Store, such as the new > > >> > > > SplitEnumerator > > >> > > > > >>> and > > >> > > > > >>> > > >> Source > > >> > > > > >>> > > >> implementation. The changes in Flink are as > followed: > > >> > > > > >>> > > >> 1) Flink job should put its job id in context when > > >> > creating > > >> > > > > >>> > source/sink > > >> > > > > >>> > > to > > >> > > > > >>> > > >> help MetaService to create relationship between > source > > >> and > > >> > > > sink > > >> > > > > >>> > tables, > > >> > > > > >>> > > >> it's tiny > > >> > > > > >>> > > >> 2) Notify a listener when job is terminated in > Flink, > > >> and > > >> > > the > > >> > > > > >>> listener > > >> > > > > >>> > > >> implementation in Table Store will send "delete > event" > > >> to > > >> > > > > >>> MetaService. > > >> > > > > >>> > > >> 3) The changes are related to Flink Checkpoint > includes > > >> > > > > >>> > > >> a) Support triggering checkpoint with checkpoint > id > > >> by > > >> > > > > >>> > SplitEnumerator > > >> > > > > >>> > > >> b) Create the SplitEnumerator in Table Store > with a > > >> > > strategy > > >> > > > > to > > >> > > > > >>> > > perform > > >> > > > > >>> > > >> the specific checkpoint when all > "SplitEnumerator"s in > > >> the > > >> > > job > > >> > > > > >>> manager > > >> > > > > >>> > > >> trigger it. > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> Best, > > >> > > > > >>> > > >> Shammon > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn Visser < > > >> > > > > >>> > martijnvis...@apache.org > > >> > > > > >>> > > > > > >> > > > > >>> > > >> wrote: > > >> > > > > >>> > > >> > > >> > > > > >>> > > >> > Hi all, > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > A couple of first comments on this: > > >> > > > > >>> > > >> > 1. I'm missing the problem statement in the > overall > > >> > > > > >>> introduction. It > > >> > > > > >>> > > >> > immediately goes into proposal mode, I would > like to > > >> > first > > >> > > > > read > > >> > > > > >>> what > > >> > > > > >>> > > is > > >> > > > > >>> > > >> the > > >> > > > > >>> > > >> > actual problem, before diving into solutions. > > >> > > > > >>> > > >> > 2. "Each ETL job creates snapshots with > checkpoint > > >> info > > >> > on > > >> > > > > sink > > >> > > > > >>> > tables > > >> > > > > >>> > > >> in > > >> > > > > >>> > > >> > Table Store" -> That reads like you're proposing > > >> that > > >> > > > > snapshots > > >> > > > > >>> > need > > >> > > > > >>> > > >> to be > > >> > > > > >>> > > >> > written to Table Store? > > >> > > > > >>> > > >> > 3. If you introduce a MetaService, it becomes the > > >> single > > >> > > > point > > >> > > > > >>> of > > >> > > > > >>> > > >> failure > > >> > > > > >>> > > >> > because it coordinates everything. But I can't > find > > >> > > anything > > >> > > > > in > > >> > > > > >>> the > > >> > > > > >>> > > >> FLIP on > > >> > > > > >>> > > >> > making the MetaService high available or how to > deal > > >> > with > > >> > > > > >>> failovers > > >> > > > > >>> > > >> there. > > >> > > > > >>> > > >> > 4. The FLIP states under Rejected Alternatives > > >> > "Currently > > >> > > > > >>> watermark > > >> > > > > >>> > in > > >> > > > > >>> > > >> > Flink cannot align data." which is not true, > given > > >> that > > >> > > > there > > >> > > > > is > > >> > > > > >>> > > >> FLIP-182 > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > > >> > > > > >>> > > > > >> > > > > >>> > > > >> > > > > >>> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > 5. Given the MetaService role, it feels like > this is > > >> > > > > >>> introducing a > > >> > > > > >>> > > tight > > >> > > > > >>> > > >> > dependency between Flink and the Table Store. How > > >> > > pluggable > > >> > > > is > > >> > > > > >>> this > > >> > > > > >>> > > >> > solution, given the changes that need to be made > to > > >> > Flink > > >> > > in > > >> > > > > >>> order > > >> > > > > >>> > to > > >> > > > > >>> > > >> > support this? > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > Best regards, > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > Martijn > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon FY < > > >> > > > zjur...@gmail.com> > > >> > > > > >>> > wrote: > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > > Hi devs: > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > I'd like to start a discussion about FLIP-276: > Data > > >> > > > > >>> Consistency of > > >> > > > > >>> > > >> > > Streaming and Batch ETL in Flink and Table > > >> Store[1]. > > >> > In > > >> > > > the > > >> > > > > >>> whole > > >> > > > > >>> > > data > > >> > > > > >>> > > >> > > stream processing, there are consistency > problems > > >> such > > >> > > as > > >> > > > > how > > >> > > > > >>> to > > >> > > > > >>> > > >> manage > > >> > > > > >>> > > >> > the > > >> > > > > >>> > > >> > > dependencies of multiple jobs and tables, how > to > > >> > define > > >> > > > and > > >> > > > > >>> handle > > >> > > > > >>> > > E2E > > >> > > > > >>> > > >> > > delays, and how to ensure the data consistency > of > > >> > > queries > > >> > > > on > > >> > > > > >>> > flowing > > >> > > > > >>> > > >> > data? > > >> > > > > >>> > > >> > > This FLIP aims to support data consistency and > > >> answer > > >> > > > these > > >> > > > > >>> > > questions. > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > I'v discussed the details of this FLIP with > > >> @Jingsong > > >> > > Lee > > >> > > > > and > > >> > > > > >>> > > >> @libenchao > > >> > > > > >>> > > >> > > offline several times. We hope to support data > > >> > > consistency > > >> > > > > of > > >> > > > > >>> > > queries > > >> > > > > >>> > > >> on > > >> > > > > >>> > > >> > > tables, managing relationships between Flink > jobs > > >> and > > >> > > > tables > > >> > > > > >>> and > > >> > > > > >>> > > >> revising > > >> > > > > >>> > > >> > > tables on streaming in Flink and Table Store to > > >> > improve > > >> > > > the > > >> > > > > >>> whole > > >> > > > > >>> > > data > > >> > > > > >>> > > >> > > stream processing. > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > Looking forward to your feedback. > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > [1] > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > > >> > > > > >>> > > > > >> > > > > >>> > > > >> > > > > >>> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > Best, > > >> > > > > >>> > > >> > > Shammon > > >> > > > > >>> > > >> > > > > >> > > > > >>> > > >> > > > >> > > > > >>> > > >> > > >> > > > > >>> > > > > > >> > > > > >>> > > > > >> > > > > >>> > > > >> > > > > >>> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > >