Hi all Thanks for all the feedback. If there are no more questions related to this FLIP, we'd like to start the next work. After discussed with @JingsongLee, we will create some FLIPs to provide detailed design for `MetaService` and `Timestamp Barrier Mechanism`. Please feel free to comment in this thread if you have any questions later. THX
Best, Shammon On Tue, Feb 7, 2023 at 11:53 AM Shammon FY <zjur...@gmail.com> wrote: > Hi Piotr > > Thanks for your feedback. > > > - stateless operators, could completely ignore the issue and process the > records normally, as they are doing right now > > - stateful operators, should either: > > - if the business doesn't require ordering, they could process the > records immediately > > - or buffer the records internally, like currently windowed/temporal > operators are doing. Non windowed joins/aggregations could also work in a > similar manner, like pre-aggregate data per each "epoch" (as demarcated by > timestamp barriers). > > - sinks implementation would have to match what external system support: > > - if the external system requires ordered writes (something like > Kafka topic?), the sinks would have to buffer the writes until a "timestamp > barrier" arrives > > - some sinks might support writing the data simultaneously to > different "epochs". For example writing files bucketed by each epoch. Each > bucket/epoch could be committed independently > > It sounds good to me and I totally agree with the proposal. We need to > give users more choices to meet different business needs and storage > support. I have updated the key points in the FLINK section[1] > > > Ok, I get it now. Indeed the terminology is confusing. Maybe we > shouldn't say that the timestamp barrier has been committed, but that all > records for given "epoch" have been processed/written, but not yet > committed, so they can still be rolled-back? > > Nice! According to your suggestion, I have updated the FLIP for "epoch" as: > 1. It is PROCESSED when records are written to a table > 2. It is WRITTEN when the records are in a snapshot > 3. It is PRECOMMIT when all tables are PROCESSED but not WRITTEN > 4. It is COMMIT when all tables are WRITTEN > Records not WRITTEN in a table will be rolled back due to job failure. > > > > Why do we need to do that? Only to disallow this? To forbid writing from > two jobs into a single table? If so, can we not push this responsibility > down to the connector? Like sink/source operator coordinators should > negotiate with respective external systems if the given read/write is > allowed? So if there is a need for such meta service, Flink doesn't need to > know about it? > > As I mentioned, MetaService will do some atomic operations to check and > disallow some operations when jobs are submitted concurrently. But I'm > sorry that I may not have explained the relationship between it and > sink/source clearly. Generally speaking, the interactive between Flink and > MetaService is as: > 1. When the Client submits a flink job (streaming&batch), it interacts > with MetaService through Catalog in CatalogManager, including getting the > table version, registering the source/link table relationship for ETL. > 2. When the flink job is running, JobManager collects data processing > progress (Timestamp Barrier and Checkpoint) from source/link subtasks and > reports them to MetaService. > We can implement the above functions in a MetaService node. Of course, it > can also be based on an atomic system (such as Zookeeper), with Client and > JobManager doing their own work. > > Of course, source and sink also need some special work, such as reading > timestamp barrier, collecting timestamp barrier, writing timestamp barrier, > etc. But source/sink subtasks will not interact with MetaService directly. > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-GlobalTimestampBarrierMechanism > > > Best, > Shammon > > > On Tue, Feb 7, 2023 at 1:26 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> Thanks for the answers. >> >> >> Are you proposing that all of the inputs to stateful operators would >> have to be sorted? >> >> >> > Records in stream don't need to be sorted, but it should be managed by >> `Timestamp Barrier`, which means >> > 1. Records belonging to a specific `Timestamp Barrier` are disordered. >> > 2. Computations in different timestamp barriers are ordered. For the >> above >> > example, each stateful subtask can start computation for T2 only after >> it >> > finishes computation for T1. Subtasks are independent of each other. >> >> Wouldn't that add significant latency to processing the records? You would >> basically introduce a batch processing concept in Flink? >> >> Have you considered some alternative solutions? Like for example letting >> each operator/function/sink to take care of the data disorder? For >> example: >> - stateless operators, could completely ignore the issue and process the >> records normally, as they are doing right now >> - stateful operators, should either: >> - if the business doesn't require ordering, they could process the >> records immediately >> - or buffer the records internally, like currently windowed/temporal >> operators are doing. Non windowed joins/aggregations could also work in a >> similar manner, like pre-aggregate data per each "epoch" (as demarcated by >> timestamp barriers). >> - sinks implementation would have to match what external system support: >> - if the external system requires ordered writes (something like Kafka >> topic?), the sinks would have to buffer the writes until a "timestamp >> barrier" arrives >> - some sinks might support writing the data simultaneously to >> different >> "epochs". For example writing files bucketed by each epoch. Each >> bucket/epoch could be committed independently >> >> This way, latency would be behaving very much like it currently does in >> Flink. For example if we have a following streaming SQL: >> >> INSERT INTO alerts_with_user SELECT * FROM alerts a, users u WHERE >> a.user_id = u.id >> >> If there is some lag in the users table, alerts would be still generated. >> Downstream applications could process and react to newly generated >> `alerts_with_user`, while at the same time, we could have a consistent >> view >> across those three tables (users, alerts, alerts_with_user) if needed. >> >> > I call the data of the timetamp barrier "committed" if the data >> > is written to a table according to the barrier without a snapshot, and >> the >> > data may be "rolled back" due to job failure. (sorry that the >> "committed" >> > here may not be appropriate) >> >> Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't >> say that the timestamp barrier has been committed, but that all records >> for >> given "epoch" have been processed/written, but not yet committed, so they >> can still be rolled-back? >> >> > For example, when multiple jobs start at the same time and register >> themselves in `MetaService`, >> > it needs to serially check whether they write to the same table >> >> Why do we need to do that? Only to disallow this? To forbid writing from >> two jobs into a single table? If so, can we not push this responsibility >> down to the connector? Like sink/source operator coordinators should >> negotiate with respective external systems if the given read/write is >> allowed? So if there is a need for such meta service, Flink doesn't need >> to >> know about it? >> >> Best, >> Piotrek >> >> pon., 6 lut 2023 o 10:44 Shammon FY <zjur...@gmail.com> napisał(a): >> >> > Hi Piotr, >> > >> > Thanks for your feedback. In general, I think `Timesamp Barrier` is a >> > special `Watermark` that all sources send watermarks with the same >> > timestamp as `Timestamp Barrier` and aggregation operators will align >> data >> > by it. For example, all source subtasks are assigned two unified >> watermarks >> > T1 and T2, T1 < T2. All records with timestamp <= T1 will be aligned by >> T1, >> > and records with timestamp (T1, T2] will be aligned by T2. >> > >> > > Are you proposing that all of the inputs to stateful operators would >> have >> > to be sorted? >> > >> > Records in stream don't need to be sorted, but it should be managed by >> > `Timestamp Barrier`, which means >> > 1. Records belonging to a specific `Timestamp Barrier` are disordered. >> > 2. Computations in different timestamp barriers are ordered. For the >> above >> > example, each stateful subtask can start computation for T2 only after >> it >> > finishes computation for T1. Subtasks are independent of each other. >> > >> > > Can you explain why do you need those 3 states? Why can committed >> records >> > be rolled back? >> > >> > Here I try to define the states of data in tables according to Timestamp >> > Barrier and Snapshot, and I found that the 3 states are incomplete. For >> > example, there is timestamp barrier T associated with checkpoint P, and >> > sink operator will create snapshot S for P in tables. The data states in >> > tables are as follows >> > 1. Sink finishes writing data of timestamp barrier T to a table, but >> > snapshot P is not created in the table and T is not finished in all >> tables. >> > 2. Sink finishes writing data of timestamp barrier T to a table, creates >> > snapshot P according to checkpoint C, but the T1 is not finished in all >> > tables. >> > 3. Timestamp barrier T is finished in all tables, but snapshot P is not >> > created in all tables. >> > 4. Timestamp barrier T is finished in all tables, and snapshot P is >> created >> > in all tables too. >> > >> > Currently users can only get data from snapshots in Table Store and >> other >> > storages such as Iceberg. Users can get different "versioned" data from >> > tables according to their data freshness and consistency requirements. >> > I think we should support getting data with a timestamp barrier even >> before >> > the sink operator finishes creating the snapshot in the future. In this >> > situation, I call the data of the timetamp barrier "committed" if the >> data >> > is written to a table according to the barrier without a snapshot, and >> the >> > data may be "rolled back" due to job failure. (sorry that the >> "committed" >> > here may not be appropriate) >> > >> > > I'm not sure if I follow. Generally speaking, why do we need >> MetaService >> > at all? Why can we only support writes to and reads from TableStore, and >> > not any source/sink that implements some specific interface? >> > >> > It's a good point. I added a `MetaService` node in FLIP mainly to >> perform >> > some atomic operations. For example, when multiple jobs start at the >> same >> > time and register themselves in `MetaService`, it needs to serially >> check >> > whether they write to the same table. If we do not use an >> > independent `MetaService Node`, we may need to introduce some other >> "atomic >> > dependency" such as ZooKeeper. But removing `MetaService Node` can make >> the >> > system more flexible, I think it's also valuable. Maybe we can carefully >> > design MetaService API and support different deployment modes in the >> next >> > FLIP? WDYT? >> > >> > >> > Best, >> > Shammon >> > >> > >> > On Fri, Feb 3, 2023 at 10:43 PM Piotr Nowojski <pnowoj...@apache.org> >> > wrote: >> > >> > > Hi Shammon, >> > > >> > > Thanks for pushing the topic further. I'm not sure how this new >> proposal >> > is >> > > supposed to be working? How should timestamp barrier interplay with >> event >> > > time and watermarks? Or is timestamp barrier supposed to completely >> > replace >> > > watermarks? >> > > >> > > > stateful and temporal operators should align them (records) >> according >> > to >> > > their timestamp field. >> > > >> > > Are you proposing that all of the inputs to stateful operators would >> have >> > > to be sorted? >> > > >> > > > There're three states in a table for specific transaction : >> PreCommit, >> > > Commit and Snapshot >> > > >> > > Can you explain why do you need those 3 states? Why can committed >> records >> > > be rolled back? >> > > >> > > >> 10. Have you considered proposing a general consistency mechanism >> > > instead >> > > >> of restricting it to TableStore+ETL graphs? For example, it seems >> to >> > me >> > > to >> > > >> be possible and valuable to define instead the contract that >> > > sources/sinks >> > > >> need to implement in order to participate in globally consistent >> > > snapshots. >> > > > >> > > > A general consistency mechanism is cool! In my mind, the overall >> > > > `consistency system` consists of three components: Streaming & Batch >> > ETL, >> > > > Streaming & Batch Storage and MetaService. MetaService is decoupled >> > from >> > > > Storage Layer, but it stores consistency information in persistent >> > > storage. >> > > > It can be started as an independent node or a component in a large >> > Flink >> > > > cluster. In the FLIP we use TableStore as the Storage Layer. As you >> > > > mentioned, we plan to implement specific source and sink on the >> > > TableStore >> > > > in the first phase, and may consider other storage in the future >> > > >> > > I'm not sure if I follow. Generally speaking, why do we need >> MetaService >> > at >> > > all? Why can we only support writes to and reads from TableStore, and >> not >> > > any source/sink that implements some specific interface? >> > > >> > > Best, >> > > Piotrek >> > > >> > > niedz., 29 sty 2023 o 12:11 Shammon FY <zjur...@gmail.com> >> napisał(a): >> > > >> > > > Hi @Vicky >> > > > >> > > > Thank you for your suggestions about consistency and they're very >> nice >> > to >> > > > me! >> > > > >> > > > I have updated the examples and consistency types[1] in FLIP. In >> > > general, I >> > > > regard the Timestamp Barrier processing as a transaction and divide >> the >> > > > data consistency supported in FLIP into three types >> > > > >> > > > 1. Read Uncommitted: Read data from tables even when a transaction >> is >> > not >> > > > committed. >> > > > 2. Read Committed: Read data from tables according to the committed >> > > > transaction. >> > > > 3. Repeatable Read: Read data from tables according to the committed >> > > > transaction in snapshots. >> > > > >> > > > You can get more information from the updated FLIP. Looking forward >> to >> > > your >> > > > feedback, THX >> > > > >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-DataConsistencyType >> > > > >> > > > Best, >> > > > Shammon >> > > > >> > > > >> > > > On Sat, Jan 28, 2023 at 4:42 AM Vasiliki Papavasileiou >> > > > <vpapavasile...@confluent.io.invalid> wrote: >> > > > >> > > > > Hi Shammon, >> > > > > >> > > > > >> > > > > Thank you for opening this FLIP which is very interesting and >> such an >> > > > > important feature to add to the Flink ecosystem. I have a couple >> of >> > > > > suggestions/questions: >> > > > > >> > > > > >> > > > > >> > > > > - >> > > > > >> > > > > Consistency is a very broad term with different meanings. There >> > are >> > > > many >> > > > > variations between the two extremes of weak and strong >> consistency >> > > > that >> > > > > tradeoff latency for consistency. >> https://jepsen.io/consistency >> > It >> > > > > would >> > > > > be great if we could devise an approach that allows the user to >> > > choose >> > > > > which consistency level they want to use for a query. >> > > > > >> > > > > >> > > > > Example: In your figure where you have a DAG, assume a user >> queries >> > > only >> > > > > Table1 for a specific key. Then, a failure happens and the table >> > > restores >> > > > > from a checkpoint. The user issues the same query, looking up the >> > same >> > > > key. >> > > > > What value does she see? With monotonic-reads, the system >> guarantees >> > > that >> > > > > she will only see the same or newer values but not older, hence >> will >> > > not >> > > > > experience time-travel. This is a very useful property for a >> system >> > to >> > > > have >> > > > > albeit it is at the weaker-end of consistency guarantees. But it >> is a >> > > > good >> > > > > stepping stone. >> > > > > >> > > > > >> > > > > Another example, assume the user queries Table1 for key K1 and >> gets >> > the >> > > > > value V11. Then, she queries Table2 that is derived from Table1 >> for >> > the >> > > > > same key, K1, that returns value V21. What is the relationship >> > between >> > > > V21 >> > > > > and V11? Is V21 derived from V11 or can it be an older value V1 >> (the >> > > > > previous value of K1)? What if value V21 is not yet in table >> Table2? >> > > What >> > > > > should she see when she queries Table1? Should she see the key >> V11 or >> > > > not? >> > > > > Should the requirement be that a record is not visible in any of >> the >> > > > tables >> > > > > in a DAG unless it is available in all of them? >> > > > > >> > > > > >> > > > > >> > > > > - >> > > > > >> > > > > It would we good to have a set of examples with consistency >> > > anomalies >> > > > > that can happen (like the examples above) and what consistency >> > > levels >> > > > we >> > > > > want the system to offer to prevent them. >> > > > > Moreover, for each such example, it would be good to have a >> > > > description >> > > > > of how the approach (Timestamp Barriers) will work in practice >> to >> > > > > prevent >> > > > > such anomalies. >> > > > > >> > > > > >> > > > > Thank you, >> > > > > Vicky >> > > > > >> > > > > >> > > > > On Fri, Jan 27, 2023 at 4:46 PM John Roesler <vvcep...@apache.org >> > >> > > > wrote: >> > > > > >> > > > > > Hello Shammon and all, >> > > > > > >> > > > > > Thanks for this FLIP! I've been working toward this kind of >> global >> > > > > > consistency across large scale data infrastructure for a long >> time, >> > > and >> > > > > > it's fantastic to see a high-profile effort like this come into >> > play. >> > > > > > >> > > > > > I have been lurking in the discussion for a while and delaying >> my >> > > > > response >> > > > > > while I collected my thoughts. However, I've realized at some >> > point, >> > > > > > delaying more is not as useful as just asking a few questions, >> so >> > I'm >> > > > > sorry >> > > > > > if some of this seems beside the point. I'll number these to not >> > > > collide >> > > > > > with prior discussion points: >> > > > > > >> > > > > > 10. Have you considered proposing a general consistency >> mechanism >> > > > instead >> > > > > > of restricting it to TableStore+ETL graphs? For example, it >> seems >> > to >> > > me >> > > > > to >> > > > > > be possible and valuable to define instead the contract that >> > > > > sources/sinks >> > > > > > need to implement in order to participate in globally consistent >> > > > > snapshots. >> > > > > > >> > > > > > 11. It seems like this design is assuming that the "ETL >> Topology" >> > > under >> > > > > > the envelope of the consistency model is a well-ordered set of >> > jobs, >> > > > but >> > > > > I >> > > > > > suspect this is not the case for many organizations. It may be >> > > > > > aspirational, but I think the gold-standard here would be to >> > provide >> > > an >> > > > > > entire organization with a consistency model spanning a loosely >> > > coupled >> > > > > > ecosystem of jobs and data flows spanning teams and systems that >> > are >> > > > > > organizationally far apart. >> > > > > > >> > > > > > I realize that may be kind of abstract. Here's some examples of >> > > what's >> > > > on >> > > > > > my mind here: >> > > > > > >> > > > > > 11a. Engineering may operate one Flink cluster, and some other >> org, >> > > > like >> > > > > > Finance may operate another. In most cases, those are separate >> > > domains >> > > > > that >> > > > > > don't typically get mixed together in jobs, but some people, >> like >> > the >> > > > > CEO, >> > > > > > would still benefit from being able to make a consistent query >> that >> > > > spans >> > > > > > arbitrary contexts within the business. How well can a feature >> like >> > > > this >> > > > > > transcend a single Flink infrastructure? Does it make sense to >> > > > consider a >> > > > > > model in which snapshots from different domains can be >> composable? >> > > > > > >> > > > > > 11b. Some groups may have a relatively stable set of >> long-running >> > > jobs, >> > > > > > while others (like data science, skunkworks, etc) may adopt a >> more >> > > > > > experimental, iterative approach with lots of jobs entering and >> > > exiting >> > > > > the >> > > > > > ecosystem over time. It's still valuable to have them >> participate >> > in >> > > > the >> > > > > > consistency model, but it seems like the consistency system will >> > have >> > > > to >> > > > > > deal with more chaos than I see in the design. For example, how >> can >> > > > this >> > > > > > feature tolerate things like zombie jobs (which are registered >> in >> > the >> > > > > > system, but fail to check in for a long time, and then come back >> > > > later). >> > > > > > >> > > > > > 12. I didn't see any statements about patterns like cycles in >> the >> > ETL >> > > > > > Topology. I'm aware that there are fundamental constraints on >> how >> > > well >> > > > > > cyclic topologies can be supported by a distributed snapshot >> > > algorithm. >> > > > > > However, there are a range of approaches/compromises that we can >> > > apply >> > > > to >> > > > > > cyclic topologies. At the very least, we can state that we will >> > > detect >> > > > > > cycles and produce a warning, etc. >> > > > > > >> > > > > > 13. I'm not sure how heavily you're waiting the query syntax >> part >> > of >> > > > the >> > > > > > proposal, so please feel free to defer this point. It looked to >> me >> > > like >> > > > > the >> > > > > > proposal assumes people want to query either the latest >> consistent >> > > > > snapshot >> > > > > > or the latest inconsistent state. However, it seems like >> there's a >> > > > > > significant opportunity to maintain a manifest of historical >> > > snapshots >> > > > > and >> > > > > > allow people to query as of old points in time. That can be >> > valuable >> > > > for >> > > > > > individuals answering data questions, building products, and >> > > crucially >> > > > > > supporting auditability use cases. To that latter point, it >> seems >> > > nice >> > > > to >> > > > > > provide not only a mechanism to query arbitrary snapshots, but >> also >> > > to >> > > > > > define a TTL/GC model that allows users to keep hourly snapshots >> > for >> > > N >> > > > > > hours, daily snapshots for N days, weekly snapshots for N weeks, >> > and >> > > > the >> > > > > > same for monthly, quarterly, and yearly snapshots. >> > > > > > >> > > > > > Ok, that's all I have for now :) I'd also like to understand >> some >> > > > > > lower-level details, but I wanted to get these high-level >> questions >> > > off >> > > > > my >> > > > > > chest. >> > > > > > >> > > > > > Thanks again for the FLIP! >> > > > > > -John >> > > > > > >> > > > > > On 2023/01/13 11:43:28 Shammon FY wrote: >> > > > > > > Hi Piotr, >> > > > > > > >> > > > > > > I discussed with @jinsong lee about `Timestamp Barrier` and >> > > `Aligned >> > > > > > > Checkpoint` for data consistency in FLIP, we think there are >> many >> > > > > defects >> > > > > > > indeed in using `Aligned Checkpoint` to support data >> consistency >> > as >> > > > you >> > > > > > > mentioned. >> > > > > > > >> > > > > > > According to our historical discussion, I think we have >> reached >> > an >> > > > > > > agreement on an important point: we finally need `Timestamp >> > Barrier >> > > > > > > Mechanism` to support data consistency. But according to our >> > > > (@jinsong >> > > > > > lee >> > > > > > > and I) opinions, the total design and implementation based on >> > > > > 'Timestamp >> > > > > > > Barrier' will be too complex, and it's also too big in one >> FLIP. >> > > > > > > >> > > > > > > So we‘d like to use FLIP-276[1] as an overview design of data >> > > > > consistency >> > > > > > > in Flink Streaming and Batch ETL based on `Timestamp Barrier`. >> > > > @jinsong >> > > > > > and >> > > > > > > I hope that we can reach an agreement on the overall design in >> > > > > FLINK-276 >> > > > > > > first, and then on the basic of FLIP-276 we can create other >> > FLIPs >> > > > with >> > > > > > > detailed design according to modules and drive them. Finally, >> we >> > > can >> > > > > > > support data consistency based on Timestamp in Flink. >> > > > > > > >> > > > > > > I have updated FLIP-276, deleted the Checkpoint section, and >> > added >> > > > the >> > > > > > > overall design of `Timestamp Barrier`. Here I briefly >> describe >> > the >> > > > > > modules >> > > > > > > of `Timestamp Barrier` as follows >> > > > > > > 1. Generation: JobManager must coordinate all source subtasks >> and >> > > > > > generate >> > > > > > > a unified timestamp barrier from System Time or Event Time for >> > them >> > > > > > > 2. Checkpoint: Store <checkpoint, timestamp barrier> when the >> > > > timestamp >> > > > > > > barrier is generated, so that the job can recover the same >> > > timestamp >> > > > > > > barrier for the uncompleted checkpoint. >> > > > > > > 3. Replay data: Store <timestamp barrier, offset> for source >> when >> > > it >> > > > > > > broadcasts timestamp barrier, so that the source can replay >> the >> > > same >> > > > > data >> > > > > > > according to the same timestamp barrier. >> > > > > > > 4. Align data: Align data for stateful operator(aggregation, >> join >> > > and >> > > > > > etc.) >> > > > > > > and temporal operator(window) >> > > > > > > 5. Computation: Operator computation for a specific timestamp >> > > barrier >> > > > > > based >> > > > > > > on the results of a previous timestamp barrier. >> > > > > > > 6. Output: Operator outputs or commits results when it >> collects >> > all >> > > > the >> > > > > > > timestamp barriers, including operators with data buffer or >> async >> > > > > > > operations. >> > > > > > > >> > > > > > > I also list the main work in Flink and Table Store in >> FLIP-276. >> > > > Please >> > > > > > help >> > > > > > > to review the FLIP when you're free and feel free to give any >> > > > comments. >> > > > > > > >> > > > > > > Looking forward for your feedback, THX >> > > > > > > >> > > > > > > [1] >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store >> > > > > > > >> > > > > > > Best, >> > > > > > > Shammon >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Dec 20, 2022 at 10:01 AM Shammon FY < >> zjur...@gmail.com> >> > > > wrote: >> > > > > > > >> > > > > > > > Hi Piotr, >> > > > > > > > >> > > > > > > > Thanks for your syncing. I will update the FLIP later and >> keep >> > > this >> > > > > > > > discussion open. Looking forward to your feedback, thanks >> > > > > > > > >> > > > > > > > >> > > > > > > > Best, >> > > > > > > > Shammon >> > > > > > > > >> > > > > > > > >> > > > > > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski < >> > > > > pnowoj...@apache.org> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > >> Hi Shammon, >> > > > > > > >> >> > > > > > > >> I've tried to sync with Timo, David Moravek and Dawid >> > Wysakowicz >> > > > > about >> > > > > > > >> this >> > > > > > > >> subject. We have only briefly chatted and exchanged some >> > > > > > thoughts/ideas, >> > > > > > > >> but unfortunately we were not able to finish the >> discussions >> > > > before >> > > > > > the >> > > > > > > >> holiday season/vacations. Can we get back to this topic in >> > > > January? >> > > > > > > >> >> > > > > > > >> Best, >> > > > > > > >> Piotrek >> > > > > > > >> >> > > > > > > >> pt., 16 gru 2022 o 10:53 Shammon FY <zjur...@gmail.com> >> > > > napisał(a): >> > > > > > > >> >> > > > > > > >> > Hi Piotr, >> > > > > > > >> > >> > > > > > > >> > I found there may be several points in our discussion, it >> > will >> > > > > cause >> > > > > > > >> > misunderstanding between us when we focus on different >> one. >> > I >> > > > list >> > > > > > each >> > > > > > > >> > point in our discussion as follows >> > > > > > > >> > >> > > > > > > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to >> > > > guarantee >> > > > > > data >> > > > > > > >> > consistency in the current Flink implementation, and >> > > "Watermark" >> > > > > and >> > > > > > > >> > "Aligned Checkpoint cannot do that? >> > > > > > > >> > My answer is "Yes", the "Aligned Checkpoint" is the only >> one >> > > due >> > > > > to >> > > > > > its >> > > > > > > >> > "Align Data" ability, we can do it in the first stage. >> > > > > > > >> > >> > > > > > > >> > > Point2: Can the combination of "Checkpoint Barrier" and >> > > > > > "Watermark" >> > > > > > > >> > support the complete consistency semantics based on >> > > "Timestamp" >> > > > in >> > > > > > the >> > > > > > > >> > current Flink implementation? >> > > > > > > >> > My answer is "No", we need a new "Timestamp Barrier" >> > mechanism >> > > > to >> > > > > do >> > > > > > > >> that >> > > > > > > >> > which may be upgraded from current "Watermark" or a new >> > > > mechanism, >> > > > > > we >> > > > > > > >> can >> > > > > > > >> > do it in the next second or third stage. >> > > > > > > >> > >> > > > > > > >> > > Point3: Are the "Checkpoint" and the new "Timestamp >> > Barrier" >> > > > > > > >> completely >> > > > > > > >> > independent? The "Checkpoint" whatever "Aligned" or >> > > "Unaligned" >> > > > or >> > > > > > "Task >> > > > > > > >> > Local" supports the "Exactly-Once" between ETLs, and the >> > > > > "Timestamp >> > > > > > > >> > Barrier" mechanism guarantees data consistency between >> > tables >> > > > > > according >> > > > > > > >> to >> > > > > > > >> > timestamp for queries. >> > > > > > > >> > My answer is "Yes", I totally agree with you. Let >> > "Checkpoint" >> > > > be >> > > > > > > >> > responsible for fault tolerance and "Timestamp Barrier" >> for >> > > > > > consistency >> > > > > > > >> > independently. >> > > > > > > >> > >> > > > > > > >> > @Piotr, What do you think? If I am missing or >> > misunderstanding >> > > > > > anything, >> > > > > > > >> > please correct me, thanks >> > > > > > > >> > >> > > > > > > >> > Best, >> > > > > > > >> > Shammon >> > > > > > > >> > >> > > > > > > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski < >> > > > > > pnowoj...@apache.org> >> > > > > > > >> > wrote: >> > > > > > > >> > >> > > > > > > >> > > Hi Shammon, >> > > > > > > >> > > >> > > > > > > >> > > > I don't think we can combine watermarks and >> checkpoint >> > > > > barriers >> > > > > > > >> > together >> > > > > > > >> > > to >> > > > > > > >> > > > guarantee data consistency. There will be a >> "Timestamp >> > > > > Barrier" >> > > > > > in >> > > > > > > >> our >> > > > > > > >> > > > system to "commit data", "single etl failover", "low >> > > latency >> > > > > > between >> > > > > > > >> > > ETLs" >> > > > > > > >> > > > and "strong data consistency with completed >> semantics" >> > in >> > > > the >> > > > > > end. >> > > > > > > >> > > >> > > > > > > >> > > Why do you think so? I've described to you above an >> > > > alternative >> > > > > > where >> > > > > > > >> we >> > > > > > > >> > > could be using watermarks for data consistency, >> regardless >> > > of >> > > > > what >> > > > > > > >> > > checkpointing/fault tolerance mechanism Flink would be >> > > using. >> > > > > Can >> > > > > > you >> > > > > > > >> > > explain what's wrong with that approach? Let me >> rephrase >> > it: >> > > > > > > >> > > >> > > > > > > >> > > 1. There is an independent mechanism that provides >> > > > exactly-once >> > > > > > > >> > guarantees, >> > > > > > > >> > > committing records/watermarks/events and taking care of >> > the >> > > > > > failover. >> > > > > > > >> It >> > > > > > > >> > > might be aligned, unaligned or task local >> checkpointing - >> > > this >> > > > > > doesn't >> > > > > > > >> > > matter. Let's just assume we have such a mechanism. >> > > > > > > >> > > 2. There is a watermarking mechanism (it can be some >> kind >> > of >> > > > > > system >> > > > > > > >> > > versioning re-using watermarks code path if a user >> didn't >> > > > > > configure >> > > > > > > >> > > watermarks), that takes care of the data consistency. >> > > > > > > >> > > >> > > > > > > >> > > Because watermarks from 2. are also subject to the >> > > > exactly-once >> > > > > > > >> > guarantees >> > > > > > > >> > > from the 1., once they are committed downstream systems >> > > (Flink >> > > > > > jobs or >> > > > > > > >> > > other 3rd party systems) could just easily work with >> the >> > > > > committed >> > > > > > > >> > > watermarks to provide consistent view/snapshot of the >> > > tables. >> > > > > Any >> > > > > > > >> > > downstream system could always check what are the >> > committed >> > > > > > > >> watermarks, >> > > > > > > >> > > select the watermark value (for example min across all >> > used >> > > > > > tables), >> > > > > > > >> and >> > > > > > > >> > > ask every table: please give me all of the data up >> until >> > the >> > > > > > selected >> > > > > > > >> > > watermark. Or give me all tables in the version for the >> > > > selected >> > > > > > > >> > watermark. >> > > > > > > >> > > >> > > > > > > >> > > Am I missing something? To me it seems like this way we >> > can >> > > > > fully >> > > > > > > >> > decouple >> > > > > > > >> > > the fault tolerance mechanism from the subject of the >> data >> > > > > > > >> consistency. >> > > > > > > >> > > >> > > > > > > >> > > Best, >> > > > > > > >> > > Piotrek >> > > > > > > >> > > >> > > > > > > >> > > czw., 15 gru 2022 o 13:01 Shammon FY < >> zjur...@gmail.com> >> > > > > > napisał(a): >> > > > > > > >> > > >> > > > > > > >> > > > Hi Piotr, >> > > > > > > >> > > > >> > > > > > > >> > > > It's kind of amazing about the image, it's a simple >> > > example >> > > > > and >> > > > > > I >> > > > > > > >> have >> > > > > > > >> > to >> > > > > > > >> > > > put it in a document >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink >> > > > > > > >> > > > :) >> > > > > > > >> > > > >> > > > > > > >> > > > > Does it have to be combining watermarks and >> checkpoint >> > > > > > barriers >> > > > > > > >> > > together? >> > > > > > > >> > > > >> > > > > > > >> > > > It's an interesting question. As we discussed above, >> > what >> > > we >> > > > > > need >> > > > > > > >> from >> > > > > > > >> > > > "Checkpoint" is the "Align Data Ability", and from >> > > > "Watermark" >> > > > > > is >> > > > > > > >> the >> > > > > > > >> > > > "Consistency Semantics", >> > > > > > > >> > > > >> > > > > > > >> > > > 1) Only "Align Data" can reach data consistency when >> > > > > performing >> > > > > > > >> queries >> > > > > > > >> > > on >> > > > > > > >> > > > upstream and downstream tables. I gave an example of >> > > "Global >> > > > > > Count >> > > > > > > >> > > Tables" >> > > > > > > >> > > > in our previous discussion. We need a "Align Event" >> in >> > the >> > > > > > streaming >> > > > > > > >> > > > processing, it's the most basic. >> > > > > > > >> > > > >> > > > > > > >> > > > 2) Only "Timestamp" can provide complete consistency >> > > > > semantics. >> > > > > > You >> > > > > > > >> > gave >> > > > > > > >> > > > some good examples about "Window" and ect operators. >> > > > > > > >> > > > >> > > > > > > >> > > > I don't think we can combine watermarks and >> checkpoint >> > > > > barriers >> > > > > > > >> > together >> > > > > > > >> > > to >> > > > > > > >> > > > guarantee data consistency. There will be a >> "Timestamp >> > > > > Barrier" >> > > > > > in >> > > > > > > >> our >> > > > > > > >> > > > system to "commit data", "single etl failover", "low >> > > latency >> > > > > > between >> > > > > > > >> > > ETLs" >> > > > > > > >> > > > and "strong data consistency with completed >> semantics" >> > in >> > > > the >> > > > > > end. >> > > > > > > >> > > > >> > > > > > > >> > > > At the beginning I think we can do the simplest thing >> > > first: >> > > > > > > >> guarantee >> > > > > > > >> > > the >> > > > > > > >> > > > basic data consistency with a "Barrier Mechanism". In >> > the >> > > > > > current >> > > > > > > >> Flink >> > > > > > > >> > > > there's "Aligned Checkpoint" only, that's why we >> choose >> > > > > > > >> "Checkpoint" in >> > > > > > > >> > > our >> > > > > > > >> > > > FLIP. >> > > > > > > >> > > > >> > > > > > > >> > > > > I don't see an actual connection in the the >> > > implementation >> > > > > > steps >> > > > > > > >> > > between >> > > > > > > >> > > > the checkpoint barriers approach and the >> watermark-like >> > > > > approach >> > > > > > > >> > > > >> > > > > > > >> > > > As I mentioned above, we choose "Checkpoint" to >> > guarantee >> > > > the >> > > > > > basic >> > > > > > > >> > data >> > > > > > > >> > > > consistency. But as we discussed, the most ideal >> > solution >> > > is >> > > > > > > >> "Timestamp >> > > > > > > >> > > > Barrier". After the first stage is completed based on >> > the >> > > > > > > >> "Checkpoint", >> > > > > > > >> > > we >> > > > > > > >> > > > need to evolve it to our ideal solution "Timestamp >> > > Barrier" >> > > > > > > >> > > (watermark-like >> > > > > > > >> > > > approach) in the next second or third stage. This >> does >> > not >> > > > > mean >> > > > > > > >> > upgrading >> > > > > > > >> > > > "Checkpoint Mechanism" in Flink. It means that after >> we >> > > > > > implement a >> > > > > > > >> new >> > > > > > > >> > > > "Timestamp Barrier" or upgrade "Watermark" to support >> > it, >> > > we >> > > > > can >> > > > > > > >> use it >> > > > > > > >> > > > instead of the current "Checkpoint Mechanism" >> directly >> > in >> > > > our >> > > > > > > >> > > "MetaService" >> > > > > > > >> > > > and "Table Store". >> > > > > > > >> > > > >> > > > > > > >> > > > In the discussion between @David and me, I summarized >> > the >> > > > work >> > > > > > of >> > > > > > > >> > > upgrading >> > > > > > > >> > > > "Watermark" to support "Timestamp Barrier". It looks >> > like >> > > a >> > > > > big >> > > > > > job >> > > > > > > >> and >> > > > > > > >> > > you >> > > > > > > >> > > > can find the details in our discussion. I think we >> don't >> > > > need >> > > > > > to do >> > > > > > > >> > that >> > > > > > > >> > > in >> > > > > > > >> > > > our first stage. >> > > > > > > >> > > > >> > > > > > > >> > > > Also in that discussion (my reply to @David) too, I >> > > briefly >> > > > > > > >> summarized >> > > > > > > >> > > the >> > > > > > > >> > > > work that needs to be done to use the new mechanism >> > > > (Timestamp >> > > > > > > >> Barrier) >> > > > > > > >> > > > after we implement the basic function on >> "Checkpoint". >> > It >> > > > > seems >> > > > > > that >> > > > > > > >> > the >> > > > > > > >> > > > work is not too big on my side, and it is feasible on >> > the >> > > > > whole. >> > > > > > > >> > > > >> > > > > > > >> > > > Based on the above points, I think we can support >> basic >> > > data >> > > > > > > >> > consistency >> > > > > > > >> > > on >> > > > > > > >> > > > "Checkpoint" in the first stage which is described in >> > > FLIP, >> > > > > and >> > > > > > > >> > continue >> > > > > > > >> > > to >> > > > > > > >> > > > evolve it to "Timestamp Barrier" to support low >> latency >> > > > > between >> > > > > > ETLs >> > > > > > > >> > and >> > > > > > > >> > > > completed semantics in the second or third stage >> later. >> > > > What >> > > > > > do you >> > > > > > > >> > > think? >> > > > > > > >> > > > >> > > > > > > >> > > > Best, >> > > > > > > >> > > > Shammon >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski < >> > > > > > > >> pnowoj...@apache.org> >> > > > > > > >> > > > wrote: >> > > > > > > >> > > > >> > > > > > > >> > > > > Hi Shammon, >> > > > > > > >> > > > > >> > > > > > > >> > > > > > The following is a simple example. Data is >> > transferred >> > > > > > between >> > > > > > > >> > ETL1, >> > > > > > > >> > > > ETL2 >> > > > > > > >> > > > > and ETL3 in Intermediate Table by Timestamp. >> > > > > > > >> > > > > > [image: simple_example.jpg] >> > > > > > > >> > > > > >> > > > > > > >> > > > > This time it's your image that doesn't want to >> load :) >> > > > > > > >> > > > > >> > > > > > > >> > > > > > Timestamp Barrier >> > > > > > > >> > > > > >> > > > > > > >> > > > > Does it have to be combining watermarks and >> checkpoint >> > > > > > barriers >> > > > > > > >> > > together? >> > > > > > > >> > > > > Can we not achieve the same result with two >> > independent >> > > > > > processes >> > > > > > > >> > > > > checkpointing (regardless if this is a global >> > > > > > aligned/unaligned >> > > > > > > >> > > > checkpoint, >> > > > > > > >> > > > > or a task local checkpoint) plus watermarking? >> > > > Checkpointing >> > > > > > would >> > > > > > > >> > > > provide >> > > > > > > >> > > > > exactly-once guarantees, and actually committing >> the >> > > > > results, >> > > > > > and >> > > > > > > >> it >> > > > > > > >> > > > would >> > > > > > > >> > > > > be actually committing the last emitted watermark? >> > From >> > > > the >> > > > > > > >> > perspective >> > > > > > > >> > > > of >> > > > > > > >> > > > > the sink/table, it shouldn't really matter how the >> > > > > > exactly-once is >> > > > > > > >> > > > > achieved, and whether the job has performed an >> > unaligned >> > > > > > > >> checkpoint >> > > > > > > >> > or >> > > > > > > >> > > > > something completely different. It seems to me that >> > the >> > > > > > sink/table >> > > > > > > >> > > > > could/should be able to understand/work with only >> the >> > > > basic >> > > > > > > >> > > information: >> > > > > > > >> > > > > here are records and watermarks (with at that >> point of >> > > > time >> > > > > > > >> already >> > > > > > > >> > > fixed >> > > > > > > >> > > > > order), they are committed and will never change. >> > > > > > > >> > > > > >> > > > > > > >> > > > > > However, from the perspective of implementation >> > > > > complexity, >> > > > > > I >> > > > > > > >> > > > personally >> > > > > > > >> > > > > think using Checkpoint in the first phase makes >> sense, >> > > > what >> > > > > > do you >> > > > > > > >> > > think? >> > > > > > > >> > > > > >> > > > > > > >> > > > > Maybe I'm missing something, but I don't see an >> actual >> > > > > > connection >> > > > > > > >> in >> > > > > > > >> > > the >> > > > > > > >> > > > > implementation steps between the checkpoint >> barriers >> > > > > approach >> > > > > > and >> > > > > > > >> the >> > > > > > > >> > > > > watermark-like approach. They seem to me (from the >> > > > > > perspective of >> > > > > > > >> > Flink >> > > > > > > >> > > > > runtime at least) like two completely different >> > > > mechanisms. >> > > > > > Not >> > > > > > > >> one >> > > > > > > >> > > > leading >> > > > > > > >> > > > > to the other. >> > > > > > > >> > > > > >> > > > > > > >> > > > > Best, >> > > > > > > >> > > > > Piotrek >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > śr., 14 gru 2022 o 15:19 Shammon FY < >> > zjur...@gmail.com> >> > > > > > > >> napisał(a): >> > > > > > > >> > > > > >> > > > > > > >> > > > > > Hi Piotr, >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Thanks for your valuable input which makes me >> > consider >> > > > the >> > > > > > core >> > > > > > > >> > point >> > > > > > > >> > > > of >> > > > > > > >> > > > > > data consistency in deep. I'd like to define the >> > data >> > > > > > > >> consistency >> > > > > > > >> > on >> > > > > > > >> > > > the >> > > > > > > >> > > > > > whole streaming & batch processing as follows >> and I >> > > hope >> > > > > > that we >> > > > > > > >> > can >> > > > > > > >> > > > have >> > > > > > > >> > > > > > an agreement on it: >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > BOutput = Fn(BInput), BInput is a bounded input >> > which >> > > is >> > > > > > > >> splitted >> > > > > > > >> > > from >> > > > > > > >> > > > > > unbounded streaming, Fn is the computation of a >> node >> > > or >> > > > > ETL, >> > > > > > > >> > BOutput >> > > > > > > >> > > is >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > bounded output of BInput. All the data in BInput >> and >> > > > > > BOutput are >> > > > > > > >> > > > > unordered, >> > > > > > > >> > > > > > and BInput and BOutput are data consistent. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > The key points above include 1) the segment >> > semantics >> > > of >> > > > > > > >> BInput; 2) >> > > > > > > >> > > the >> > > > > > > >> > > > > > computation semantics of Fn >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 1. The segment semantics of BInput >> > > > > > > >> > > > > > a) Transactionality of data. It is necessary to >> > ensure >> > > > the >> > > > > > > >> semantic >> > > > > > > >> > > > > > transaction of the bounded data set when it is >> > > splitted >> > > > > > from the >> > > > > > > >> > > > > unbounded >> > > > > > > >> > > > > > streaming. For example, we cannot split multiple >> > > records >> > > > > in >> > > > > > one >> > > > > > > >> > > > > transaction >> > > > > > > >> > > > > > to different bounded data sets. >> > > > > > > >> > > > > > b) Timeliness of data. Some data is related with >> > time, >> > > > > such >> > > > > > as >> > > > > > > >> > > boundary >> > > > > > > >> > > > > > data for a window. It is necessary to consider >> > whether >> > > > the >> > > > > > > >> bounded >> > > > > > > >> > > data >> > > > > > > >> > > > > set >> > > > > > > >> > > > > > needs to include a watermark which can trigger >> the >> > > > window >> > > > > > > >> result. >> > > > > > > >> > > > > > c) Constraints of data. The Timestamp Barrier >> should >> > > > > perform >> > > > > > > >> some >> > > > > > > >> > > > > specific >> > > > > > > >> > > > > > operations after computation in operators, for >> > > example, >> > > > > > force >> > > > > > > >> flush >> > > > > > > >> > > > data. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Checkpoint Barrier misses all the semantics >> above, >> > and >> > > > we >> > > > > > should >> > > > > > > >> > > > support >> > > > > > > >> > > > > > user to define Timestamp for data on Event Time >> or >> > > > System >> > > > > > Time >> > > > > > > >> > > > according >> > > > > > > >> > > > > to >> > > > > > > >> > > > > > the job and computation later. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 2. The computation semantics of Fn >> > > > > > > >> > > > > > a) Deterministic computation >> > > > > > > >> > > > > > Most computations are deterministic such as map, >> > > filter, >> > > > > > count, >> > > > > > > >> sum >> > > > > > > >> > > and >> > > > > > > >> > > > > > ect. They generate the same unordered result from >> > the >> > > > same >> > > > > > > >> > unordered >> > > > > > > >> > > > > input >> > > > > > > >> > > > > > every time, and we can easily define data >> > consistency >> > > on >> > > > > the >> > > > > > > >> input >> > > > > > > >> > > and >> > > > > > > >> > > > > > output for them. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > b) Non-deterministic computation >> > > > > > > >> > > > > > Some computations are non-deterministic. They >> will >> > > > produce >> > > > > > > >> > different >> > > > > > > >> > > > > > results from the same input every time. I try to >> > > divide >> > > > > them >> > > > > > > >> into >> > > > > > > >> > the >> > > > > > > >> > > > > > following types: >> > > > > > > >> > > > > > 1) Non-deterministic computation semantics, such >> as >> > > rank >> > > > > > > >> operator. >> > > > > > > >> > > When >> > > > > > > >> > > > > it >> > > > > > > >> > > > > > computes multiple times (for example, failover), >> the >> > > > first >> > > > > > or >> > > > > > > >> last >> > > > > > > >> > > > output >> > > > > > > >> > > > > > results can both be the final result which will >> > cause >> > > > > > different >> > > > > > > >> > > > failover >> > > > > > > >> > > > > > handlers for downstream jobs. I will expand it >> > later. >> > > > > > > >> > > > > > 2) Non-deterministic computation optimization, >> such >> > as >> > > > > async >> > > > > > > >> io. It >> > > > > > > >> > > is >> > > > > > > >> > > > > > necessary to sync these operations when the >> barrier >> > of >> > > > > input >> > > > > > > >> > arrives. >> > > > > > > >> > > > > > 3) Deviation caused by data segmentat and >> > computation >> > > > > > semantics, >> > > > > > > >> > such >> > > > > > > >> > > > as >> > > > > > > >> > > > > > Window. This requires that the users should >> > customize >> > > > the >> > > > > > data >> > > > > > > >> > > > > segmentation >> > > > > > > >> > > > > > according to their needs correctly. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Checkpoint Barrier matches a) and Timestamp >> Barrier >> > > can >> > > > > > match >> > > > > > > >> all >> > > > > > > >> > a) >> > > > > > > >> > > > and >> > > > > > > >> > > > > > b). >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > We define data consistency of BInput and BOutput >> > based >> > > > all >> > > > > > > >> above. >> > > > > > > >> > The >> > > > > > > >> > > > > > BOutput of upstream ETL will be the BInput of the >> > next >> > > > > ETL, >> > > > > > and >> > > > > > > >> > > > multiple >> > > > > > > >> > > > > > ETL jobs form a complex "ETL Topology". >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Based on the above definitions, I'd like to give >> a >> > > > general >> > > > > > > >> proposal >> > > > > > > >> > > > with >> > > > > > > >> > > > > > "Timetamp Barrier" in my mind, it's not very >> > detailed >> > > > and >> > > > > > please >> > > > > > > >> > help >> > > > > > > >> > > > to >> > > > > > > >> > > > > > review it and feel free to comment @David, @Piotr >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 1. Data segment with Timestamp >> > > > > > > >> > > > > > a) Users can define the Timestamp Barrier with >> > System >> > > > > Time, >> > > > > > > >> Event >> > > > > > > >> > > Time. >> > > > > > > >> > > > > > b) Source nodes generate the same Timestamp >> Barrier >> > > > after >> > > > > > > >> reading >> > > > > > > >> > > data >> > > > > > > >> > > > > > from RootTable >> > > > > > > >> > > > > > c) There is a same Timetamp data in each record >> > > > according >> > > > > to >> > > > > > > >> > > Timestamp >> > > > > > > >> > > > > > Barrier, such as (a, T), (b, T), (c, T), (T, >> > barrier) >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 2. Computation with Timestamp >> > > > > > > >> > > > > > a) Records are unordered with the same Timestamp. >> > > > > Stateless >> > > > > > > >> > operators >> > > > > > > >> > > > > such >> > > > > > > >> > > > > > as map/flatmap/filter can process data without >> > > aligning >> > > > > > > >> Timestamp >> > > > > > > >> > > > > Barrier, >> > > > > > > >> > > > > > which is different from Checkpoint Barrier. >> > > > > > > >> > > > > > b) Records between Timestamp are ordered. >> Stateful >> > > > > operators >> > > > > > > >> must >> > > > > > > >> > > align >> > > > > > > >> > > > > > data and compute by each Timestamp, then compute >> by >> > > > > Timetamp >> > > > > > > >> > > sequence. >> > > > > > > >> > > > > > c) Stateful operators will output results of >> > specific >> > > > > > Timestamp >> > > > > > > >> > after >> > > > > > > >> > > > > > computation. >> > > > > > > >> > > > > > d) Sink operator "commit records" with specific >> > > > Timestamp >> > > > > > and >> > > > > > > >> > report >> > > > > > > >> > > > the >> > > > > > > >> > > > > > status to JobManager >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 3. Read data with Timestamp >> > > > > > > >> > > > > > a) Downstream ETL reads data according to >> Timestamp >> > > > after >> > > > > > > >> upstream >> > > > > > > >> > > ETL >> > > > > > > >> > > > > > "commit" it. >> > > > > > > >> > > > > > b) Stateful operators interact with state when >> > > computing >> > > > > > data of >> > > > > > > >> > > > > > Timestamp, but they won't trigger checkpoint for >> > every >> > > > > > > >> Timestamp. >> > > > > > > >> > > > > Therefore >> > > > > > > >> > > > > > source ETL job can generate Timestamp every few >> > > seconds >> > > > or >> > > > > > even >> > > > > > > >> > > > hundreds >> > > > > > > >> > > > > of >> > > > > > > >> > > > > > milliseconds >> > > > > > > >> > > > > > c) Based on Timestamp the delay between ETL jobs >> > will >> > > be >> > > > > > very >> > > > > > > >> > small, >> > > > > > > >> > > > and >> > > > > > > >> > > > > > in the best case the E2E latency maybe only tens >> of >> > > > > seconds. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 4. Failover and Recovery >> > > > > > > >> > > > > > ETL jobs are cascaded through the Intermediate >> > Table. >> > > > > After >> > > > > > a >> > > > > > > >> > single >> > > > > > > >> > > > ETL >> > > > > > > >> > > > > > job fails, it needs to replay the input data and >> > > > recompute >> > > > > > the >> > > > > > > >> > > results. >> > > > > > > >> > > > > As >> > > > > > > >> > > > > > you mentioned, whether the cascaded ETL jobs are >> > > > restarted >> > > > > > > >> depends >> > > > > > > >> > on >> > > > > > > >> > > > the >> > > > > > > >> > > > > > determinacy of the intermediate data between >> them. >> > > > > > > >> > > > > > a) An ETL job will rollback and reread data from >> > > > upstream >> > > > > > ETL by >> > > > > > > >> > > > specific >> > > > > > > >> > > > > > Timestamp according to the Checkpoint. >> > > > > > > >> > > > > > b) According to the management of Checkpoint and >> > > > > Timestamp, >> > > > > > ETL >> > > > > > > >> can >> > > > > > > >> > > > > replay >> > > > > > > >> > > > > > all Timestamp and data after failover, which >> means >> > > > BInput >> > > > > > is the >> > > > > > > >> > same >> > > > > > > >> > > > > > before and after failover. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > c) For deterministic Fn, it generates the same >> > BOutput >> > > > > from >> > > > > > the >> > > > > > > >> > same >> > > > > > > >> > > > > BInput >> > > > > > > >> > > > > > 1) If there's no data of the specific Timestamp >> in >> > the >> > > > > sink >> > > > > > > >> table, >> > > > > > > >> > > ETL >> > > > > > > >> > > > > > just "commit" it as normal. >> > > > > > > >> > > > > > 2) If the Timestamp data exists in the sink >> table, >> > ETL >> > > > can >> > > > > > just >> > > > > > > >> > > discard >> > > > > > > >> > > > > > the new data. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > d) For non-deterministic Fn, it generates >> different >> > > > > BOutput >> > > > > > from >> > > > > > > >> > the >> > > > > > > >> > > > same >> > > > > > > >> > > > > > BInput before and after failover. For example, >> > > BOutput1 >> > > > > > before >> > > > > > > >> > > failover >> > > > > > > >> > > > > and >> > > > > > > >> > > > > > BOutput2 after failover. The state in ETL is >> > > consistent >> > > > > with >> > > > > > > >> > > BOutput2. >> > > > > > > >> > > > > > There are two cases according to users' >> requirements >> > > > > > > >> > > > > > 1) Users can accept BOutput1 as the final output >> and >> > > > > > downstream >> > > > > > > >> > ETLs >> > > > > > > >> > > > > don't >> > > > > > > >> > > > > > need to restart. Sink in ETL can discard BOutput2 >> > > > directly >> > > > > > if >> > > > > > > >> the >> > > > > > > >> > > > > Timestamp >> > > > > > > >> > > > > > exists in the sink table. >> > > > > > > >> > > > > > 2) Users only accept BOutput2 as the final >> output, >> > > then >> > > > > all >> > > > > > the >> > > > > > > >> > > > > downstream >> > > > > > > >> > > > > > ETLs and Intermediate Table should rollback to >> > > specific >> > > > > > > >> Timestamp, >> > > > > > > >> > > the >> > > > > > > >> > > > > > downstream ETLs should be restarted too. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > The following is a simple example. Data is >> > transferred >> > > > > > between >> > > > > > > >> > ETL1, >> > > > > > > >> > > > ETL2 >> > > > > > > >> > > > > > and ETL3 in Intermediate Table by Timestamp. >> > > > > > > >> > > > > > [image: simple_example.jpg] >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Besides Timestamp, there's a big challenge in >> > > > Intermediate >> > > > > > > >> Table. >> > > > > > > >> > It >> > > > > > > >> > > > > > should support a highly implemented "commit >> > Timestamp >> > > > > > snapshot" >> > > > > > > >> > with >> > > > > > > >> > > > high >> > > > > > > >> > > > > > throughput, which requires the Table Store to >> > enhance >> > > > > > streaming >> > > > > > > >> > > > > > capabilities like pulsar or kafka. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > In this FLIP, we plan to implement the proposal >> with >> > > > > > Checkpoint, >> > > > > > > >> > the >> > > > > > > >> > > > > above >> > > > > > > >> > > > > > Timestamp can be replaced by Checkpoint. Of >> course, >> > > > > > Checkpoint >> > > > > > > >> has >> > > > > > > >> > > some >> > > > > > > >> > > > > > problems. I think we have reached some consensus >> in >> > > the >> > > > > > > >> discussion >> > > > > > > >> > > > about >> > > > > > > >> > > > > > the Checkpoint problems, including data segment >> > > > semantics, >> > > > > > flush >> > > > > > > >> > data >> > > > > > > >> > > > of >> > > > > > > >> > > > > > some operators, and the increase of E2E delay. >> > > However, >> > > > > > from the >> > > > > > > >> > > > > > perspective of implementation complexity, I >> > personally >> > > > > think >> > > > > > > >> using >> > > > > > > >> > > > > > Checkpoint in the first phase makes sense, what >> do >> > you >> > > > > > think? >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Finally, I think I misunderstood the "Rolling >> > > > Checkpoint" >> > > > > > and >> > > > > > > >> "All >> > > > > > > >> > at >> > > > > > > >> > > > > once >> > > > > > > >> > > > > > Checkpoint" in my last explanation which you and >> > > @David >> > > > > > > >> mentioned. >> > > > > > > >> > I >> > > > > > > >> > > > > > thought their differences were mainly to select >> > > > different >> > > > > > table >> > > > > > > >> > > > versions >> > > > > > > >> > > > > > for queries. According to your reply, I think it >> is >> > > > > whether >> > > > > > > >> there >> > > > > > > >> > are >> > > > > > > >> > > > > > multiple "rolling checkpoints" in each ETL job, >> > right? >> > > > If >> > > > > I >> > > > > > > >> > > understand >> > > > > > > >> > > > > > correctly, the "Rolling Checkpoint" is a good >> idea, >> > > and >> > > > we >> > > > > > can >> > > > > > > >> > > > guarantee >> > > > > > > >> > > > > > "Strong Data Consistency" between multiple >> tables in >> > > > > > MetaService >> > > > > > > >> > for >> > > > > > > >> > > > > > queries. Thanks. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Best, >> > > > > > > >> > > > > > Shammon >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski < >> > > > > > > >> > pnowoj...@apache.org >> > > > > > > >> > > > >> > > > > > > >> > > > > > wrote: >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> Hi Shammon, >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Thanks for the explanations, I think I >> understand >> > the >> > > > > > problem >> > > > > > > >> > better >> > > > > > > >> > > > > now. >> > > > > > > >> > > > > >> I have a couple of follow up questions, but >> first: >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> 3. I'm pretty sure there are counter >> examples, >> > > where >> > > > > > your >> > > > > > > >> > > proposed >> > > > > > > >> > > > > >> mechanism of using checkpoints (even aligned!) >> will >> > > > > produce >> > > > > > > >> > > > > >> inconsistent data from the perspective of the >> event >> > > > time. >> > > > > > > >> > > > > >> >> a) For example what if one of your "ETL" >> jobs, >> > > has >> > > > > the >> > > > > > > >> > following >> > > > > > > >> > > > > DAG: >> > > > > > > >> > > > > >> >> >> > > > > > > >> > > > > >> >> Even if you use aligned checkpoints for >> > > committing >> > > > > the >> > > > > > > >> data to >> > > > > > > >> > > the >> > > > > > > >> > > > > >> sink table, the watermarks of "Window1" and >> > "Window2" >> > > > are >> > > > > > > >> > completely >> > > > > > > >> > > > > >> independent. The sink table might easily have >> data >> > > from >> > > > > the >> > > > > > > >> > > > Src1/Window1 >> > > > > > > >> > > > > >> from the event time T1 and Src2/Window2 from >> later >> > > > event >> > > > > > time >> > > > > > > >> T2. >> > > > > > > >> > > > > >> >> b) I think the same applies if you have two >> > > > > completely >> > > > > > > >> > > > > >> independent ETL jobs writing either to the same >> > sink >> > > > > > table, or >> > > > > > > >> two >> > > > > > > >> > > to >> > > > > > > >> > > > > >> different sink tables (that are both later used >> in >> > > the >> > > > > same >> > > > > > > >> > > downstream >> > > > > > > >> > > > > job). >> > > > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > Thank you for your feedback. I cannot see the >> DAG >> > > in >> > > > > 3.a >> > > > > > in >> > > > > > > >> your >> > > > > > > >> > > > > reply, >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> I've attached the image directly. I hope you can >> > see >> > > it >> > > > > > now. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Basically what I meant is that if you have a >> > topology >> > > > > like >> > > > > > > >> (from >> > > > > > > >> > the >> > > > > > > >> > > > > >> attached image): >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> window1 = src1.keyBy(...).window(...) >> > > > > > > >> > > > > >> window2 = src2.keyBy(...).window(...) >> > > > > > > >> > > > > >> window1.join(window2, ...).addSink(sink) >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> or with even simpler (note no keyBy between >> `src` >> > and >> > > > > > > >> `process`): >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > src.process(some_function_that_buffers_data)..addSink(sink) >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> you will have the same problem. Generally >> speaking >> > if >> > > > > > there is >> > > > > > > >> an >> > > > > > > >> > > > > >> operator buffering some data, and if the data >> are >> > not >> > > > > > flushed >> > > > > > > >> on >> > > > > > > >> > > every >> > > > > > > >> > > > > >> checkpoint (any windowed or temporal operator, >> > > > > > > >> AsyncWaitOperator, >> > > > > > > >> > > CEP, >> > > > > > > >> > > > > >> ...), you can design a graph that will produce >> > > > > > "inconsistent" >> > > > > > > >> data >> > > > > > > >> > > as >> > > > > > > >> > > > > part >> > > > > > > >> > > > > >> of a checkpoint. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Apart from that a couple of other >> questions/issues. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> > 1) Global Checkpoint Commit: a) "rolling >> fashion" >> > > or >> > > > b) >> > > > > > > >> > altogether >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Do we need to support the "altogether" one? >> Rolling >> > > > > > > >> checkpoint, as >> > > > > > > >> > > > it's >> > > > > > > >> > > > > >> more independent, I could see it scale much >> better, >> > > and >> > > > > > avoid a >> > > > > > > >> > lot >> > > > > > > >> > > of >> > > > > > > >> > > > > >> problems that I mentioned before. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> > 1) Checkpoint VS Watermark >> > > > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > 1. Stateful Computation is aligned according >> to >> > > > > Timestamp >> > > > > > > >> > Barrier >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Indeed the biggest obstacle I see here, is that >> we >> > > > would >> > > > > > indeed >> > > > > > > >> > most >> > > > > > > >> > > > > >> likely have: >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> > b) Similar to the window operator, align data >> in >> > > > memory >> > > > > > > >> > according >> > > > > > > >> > > to >> > > > > > > >> > > > > >> Timestamp. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> for every operator. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> > 4. Failover supports Timestamp fine-grained >> data >> > > > > recovery >> > > > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > As we mentioned in the FLIP, each ETL is a >> > complex >> > > > > single >> > > > > > > >> node. >> > > > > > > >> > A >> > > > > > > >> > > > > single >> > > > > > > >> > > > > >> > ETL job failover should not cause the failure >> of >> > > the >> > > > > > entire >> > > > > > > >> "ETL >> > > > > > > >> > > > > >> Topology". >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> I don't understand this point. Regardless if we >> are >> > > > using >> > > > > > > >> > > > > >> rolling checkpoints, all at once checkpoints or >> > > > > > watermarks, I >> > > > > > > >> see >> > > > > > > >> > > the >> > > > > > > >> > > > > same >> > > > > > > >> > > > > >> problems with non determinism, if we want to >> > preserve >> > > > the >> > > > > > > >> > > requirement >> > > > > > > >> > > > to >> > > > > > > >> > > > > >> not fail over the whole topology at once. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Both Watermarks and "rolling checkpoint" I think >> > have >> > > > the >> > > > > > same >> > > > > > > >> > > issue, >> > > > > > > >> > > > > >> that either require deterministic logic, or >> global >> > > > > > failover, or >> > > > > > > >> > > > > downstream >> > > > > > > >> > > > > >> jobs can only work on the already committed by >> the >> > > > > upstream >> > > > > > > >> > records. >> > > > > > > >> > > > But >> > > > > > > >> > > > > >> working with only "committed records" would >> either >> > > > brake >> > > > > > > >> > consistency >> > > > > > > >> > > > > >> between different jobs, or would cause huge >> delay >> > in >> > > > > > > >> checkpointing >> > > > > > > >> > > and >> > > > > > > >> > > > > e2e >> > > > > > > >> > > > > >> latency, as: >> > > > > > > >> > > > > >> 1. upstream job has to produce some data, >> > downstream >> > > > can >> > > > > > not >> > > > > > > >> > process >> > > > > > > >> > > > it, >> > > > > > > >> > > > > >> downstream can not process this data yet >> > > > > > > >> > > > > >> 2. checkpoint 42 is triggered on the upstream >> job >> > > > > > > >> > > > > >> 3. checkpoint 42 is completed on the upstream >> job, >> > > data >> > > > > > > >> processed >> > > > > > > >> > > > since >> > > > > > > >> > > > > >> last checkpoint has been committed >> > > > > > > >> > > > > >> 4. upstream job can continue producing more data >> > > > > > > >> > > > > >> 5. only now downstream can start processing the >> > data >> > > > > > produced >> > > > > > > >> in >> > > > > > > >> > 1., >> > > > > > > >> > > > but >> > > > > > > >> > > > > >> it can not read the not-yet-committed data from >> 4. >> > > > > > > >> > > > > >> 6. once downstream finishes processing data from >> > 1., >> > > it >> > > > > can >> > > > > > > >> > trigger >> > > > > > > >> > > > > >> checkpoint 42 >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> The "all at once checkpoint", I can see only >> > working >> > > > with >> > > > > > > >> global >> > > > > > > >> > > > > failover >> > > > > > > >> > > > > >> of everything. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> This is assuming exactly-once mode. >> at-least-once >> > > would >> > > > > be >> > > > > > much >> > > > > > > >> > > > easier. >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> Best, >> > > > > > > >> > > > > >> Piotrek >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> wt., 13 gru 2022 o 08:57 Shammon FY < >> > > zjur...@gmail.com >> > > > > >> > > > > > > >> > napisał(a): >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >>> Hi David, >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> Thanks for the comments from you and @Piotr. >> I'd >> > > like >> > > > to >> > > > > > > >> explain >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> details about the FLIP first. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 1) Global Checkpoint Commit: a) "rolling >> fashion" >> > or >> > > > b) >> > > > > > > >> > altogether >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> This mainly depends on the needs of users. >> Users >> > can >> > > > > > decide >> > > > > > > >> the >> > > > > > > >> > > data >> > > > > > > >> > > > > >>> version of tables in their queries according to >> > > > > different >> > > > > > > >> > > > requirements >> > > > > > > >> > > > > >>> for >> > > > > > > >> > > > > >>> data consistency and freshness. Since we manage >> > > > multiple >> > > > > > > >> versions >> > > > > > > >> > > for >> > > > > > > >> > > > > >>> each >> > > > > > > >> > > > > >>> table, this will not bring too much complexity >> to >> > > the >> > > > > > system. >> > > > > > > >> We >> > > > > > > >> > > only >> > > > > > > >> > > > > >>> need >> > > > > > > >> > > > > >>> to support different strategies when >> calculating >> > > table >> > > > > > > >> versions >> > > > > > > >> > for >> > > > > > > >> > > > > >>> query. >> > > > > > > >> > > > > >>> So we give this decision to users, who can use >> > > > > > > >> "consistency.type" >> > > > > > > >> > > to >> > > > > > > >> > > > > set >> > > > > > > >> > > > > >>> different consistency in "Catalog". We can >> > continue >> > > to >> > > > > > refine >> > > > > > > >> > this >> > > > > > > >> > > > > later. >> > > > > > > >> > > > > >>> For example, dynamic parameters support >> different >> > > > > > consistency >> > > > > > > >> > > > > >>> requirements >> > > > > > > >> > > > > >>> for each query >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 2) MetaService module >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> Many Flink streaming jobs use application mode, >> > and >> > > > they >> > > > > > are >> > > > > > > >> > > > > independent >> > > > > > > >> > > > > >>> of >> > > > > > > >> > > > > >>> each other. So we currently assume that >> > MetaService >> > > is >> > > > > an >> > > > > > > >> > > independent >> > > > > > > >> > > > > >>> node. >> > > > > > > >> > > > > >>> In the first phase, it will be started in >> > > standalone, >> > > > > and >> > > > > > HA >> > > > > > > >> will >> > > > > > > >> > > be >> > > > > > > >> > > > > >>> supported later. This node will reuse many >> Flink >> > > > > modules, >> > > > > > > >> > including >> > > > > > > >> > > > > REST, >> > > > > > > >> > > > > >>> Gateway-RpcServer, etc. We hope that the core >> > > > functions >> > > > > of >> > > > > > > >> > > > MetaService >> > > > > > > >> > > > > >>> can >> > > > > > > >> > > > > >>> be developed as a component. When Flink >> > subsequently >> > > > > uses >> > > > > > a >> > > > > > > >> large >> > > > > > > >> > > > > session >> > > > > > > >> > > > > >>> cluster to support various computations, it >> can be >> > > > > > integrated >> > > > > > > >> > into >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> "ResourceManager" as a plug-in component. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> Besides above, I'd like to describe the >> Checkpoint >> > > and >> > > > > > > >> Watermark >> > > > > > > >> > > > > >>> mechanisms >> > > > > > > >> > > > > >>> in detail as follows. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 1) Checkpoint VS Watermark >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> As you mentioned, I think it's very correct >> that >> > > what >> > > > we >> > > > > > want >> > > > > > > >> in >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> Checkpoint is to align streaming computation >> and >> > > data >> > > > > > > >> according >> > > > > > > >> > to >> > > > > > > >> > > > > >>> certain >> > > > > > > >> > > > > >>> semantics. Timestamp is a very ideal solution. >> To >> > > > > achieve >> > > > > > this >> > > > > > > >> > > goal, >> > > > > > > >> > > > we >> > > > > > > >> > > > > >>> can >> > > > > > > >> > > > > >>> think of the following functions that need to >> be >> > > > > > supported in >> > > > > > > >> the >> > > > > > > >> > > > > >>> Watermark >> > > > > > > >> > > > > >>> mechanism: >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 1. Stateful Computation is aligned according to >> > > > > Timestamp >> > > > > > > >> Barrier >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> As the "three tables example" we discussed >> above, >> > we >> > > > > need >> > > > > > to >> > > > > > > >> > align >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> stateful operator computation according to the >> > > barrier >> > > > > to >> > > > > > > >> ensure >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> consistency of the result data. In order to >> align >> > > the >> > > > > > > >> > computation, >> > > > > > > >> > > > > there >> > > > > > > >> > > > > >>> are two ways in my mind >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> a) Similar to the Aligned Checkpoint Barrier. >> > > > Timestamp >> > > > > > > >> Barrier >> > > > > > > >> > > > aligns >> > > > > > > >> > > > > >>> data >> > > > > > > >> > > > > >>> according to the channel, which will lead to >> > > > > backpressure >> > > > > > just >> > > > > > > >> > like >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> aligned checkpoint. It seems not a good idea. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> b) Similar to the window operator, align data >> in >> > > > memory >> > > > > > > >> according >> > > > > > > >> > > to >> > > > > > > >> > > > > >>> Timestamp. Two steps need to be supported here: >> > > first, >> > > > > > data is >> > > > > > > >> > > > aligned >> > > > > > > >> > > > > by >> > > > > > > >> > > > > >>> timestamp for state operators; secondly, >> Timestamp >> > > is >> > > > > > strictly >> > > > > > > >> > > > > >>> sequential, >> > > > > > > >> > > > > >>> global aggregation operators need to perform >> > > > aggregation >> > > > > > in >> > > > > > > >> > > timestamp >> > > > > > > >> > > > > >>> order >> > > > > > > >> > > > > >>> and output the final results. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 2. Coordinate multiple source nodes to assign >> > > unified >> > > > > > > >> Timestamp >> > > > > > > >> > > > > Barriers >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> Since the stateful operator needs to be aligned >> > > > > according >> > > > > > to >> > > > > > > >> the >> > > > > > > >> > > > > >>> Timestamp >> > > > > > > >> > > > > >>> Barrier, source subtasks of multiple jobs >> should >> > > > > generate >> > > > > > the >> > > > > > > >> > same >> > > > > > > >> > > > > >>> Timestamp Barrier. ETL jobs consuming RootTable >> > > should >> > > > > > > >> interact >> > > > > > > >> > > with >> > > > > > > >> > > > > >>> "MetaService" to generate the same Timestamp >> T1, >> > T2, >> > > > T3 >> > > > > > ... >> > > > > > > >> and >> > > > > > > >> > so >> > > > > > > >> > > > on. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 3. JobManager needs to manage the completed >> > > Timestamp >> > > > > > Barrier >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> When the Timestamp Barrier of the ETL job has >> been >> > > > > > completed, >> > > > > > > >> it >> > > > > > > >> > > > means >> > > > > > > >> > > > > >>> that >> > > > > > > >> > > > > >>> the data of the specified Timestamp can be >> queried >> > > by >> > > > > > users. >> > > > > > > >> > > > JobManager >> > > > > > > >> > > > > >>> needs to summarize its Timestamp processing and >> > > report >> > > > > the >> > > > > > > >> > > completed >> > > > > > > >> > > > > >>> Timestamp and data snapshots to the MetaServer. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 4. Failover supports Timestamp fine-grained >> data >> > > > > recovery >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> As we mentioned in the FLIP, each ETL is a >> complex >> > > > > single >> > > > > > > >> node. A >> > > > > > > >> > > > > single >> > > > > > > >> > > > > >>> ETL job failover should not cause the failure >> of >> > the >> > > > > > entire >> > > > > > > >> "ETL >> > > > > > > >> > > > > >>> Topology". >> > > > > > > >> > > > > >>> This requires that the result data of Timestamp >> > > > > generated >> > > > > > by >> > > > > > > >> > > upstream >> > > > > > > >> > > > > ETL >> > > > > > > >> > > > > >>> should be deterministic. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> a) The determinacy of Timestamp, that is, >> before >> > and >> > > > > > after ETL >> > > > > > > >> > job >> > > > > > > >> > > > > >>> failover, the same Timestamp sequence must be >> > > > generated. >> > > > > > Each >> > > > > > > >> > > > > Checkpoint >> > > > > > > >> > > > > >>> needs to record the included Timestamp list, >> > > > especially >> > > > > > the >> > > > > > > >> > source >> > > > > > > >> > > > node >> > > > > > > >> > > > > >>> of >> > > > > > > >> > > > > >>> the RootTable. After Failover, it needs to >> > > regenerate >> > > > > > > >> Timestamp >> > > > > > > >> > > > > according >> > > > > > > >> > > > > >>> to the Timestamp list. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> b) The determinacy of Timestamp data, that is, >> the >> > > > same >> > > > > > > >> Timestamp >> > > > > > > >> > > > needs >> > > > > > > >> > > > > >>> to >> > > > > > > >> > > > > >>> replay the same data before and after Failover, >> > and >> > > > > > generate >> > > > > > > >> the >> > > > > > > >> > > same >> > > > > > > >> > > > > >>> results in Sink Table. Each Timestamp must save >> > > start >> > > > > and >> > > > > > end >> > > > > > > >> > > offsets >> > > > > > > >> > > > > (or >> > > > > > > >> > > > > >>> snapshot id) of RootTable. After failover, the >> > > source >> > > > > > nodes >> > > > > > > >> need >> > > > > > > >> > to >> > > > > > > >> > > > > >>> replay >> > > > > > > >> > > > > >>> the data according to the offset to ensure that >> > the >> > > > data >> > > > > > of >> > > > > > > >> each >> > > > > > > >> > > > > >>> Timestamp >> > > > > > > >> > > > > >>> is consistent before and after Failover. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> For the specific requirements and complexity, >> > please >> > > > > help >> > > > > > to >> > > > > > > >> > review >> > > > > > > >> > > > > when >> > > > > > > >> > > > > >>> you are free @David @Piotr, thanks :) >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> 2) Evolution from Checkpoint to Timestamp >> > Mechanism >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> You give a very important question in your >> reply >> > > > which I >> > > > > > > >> missed >> > > > > > > >> > > > before: >> > > > > > > >> > > > > >>> if >> > > > > > > >> > > > > >>> Aligned Checkpoint is used in the first stage, >> how >> > > > > > complex is >> > > > > > > >> the >> > > > > > > >> > > > > >>> evolution >> > > > > > > >> > > > > >>> from Checkpoint to Timestamp later? I made a >> > general >> > > > > > > >> comparison >> > > > > > > >> > > here, >> > > > > > > >> > > > > >>> which >> > > > > > > >> > > > > >>> may not be very detailed. There are three >> roles in >> > > the >> > > > > > whole >> > > > > > > >> > > system: >> > > > > > > >> > > > > >>> MetaService, Flink ETL Job and Table Store. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> a) MetaService >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> It manages the data consistency among multiple >> ETL >> > > > jobs, >> > > > > > > >> > including >> > > > > > > >> > > > > >>> coordinating the Barrier for the Source ETL >> nodes, >> > > > > > setting the >> > > > > > > >> > > > starting >> > > > > > > >> > > > > >>> Barrier for ETL job startup, and calculating >> the >> > > Table >> > > > > > version >> > > > > > > >> > for >> > > > > > > >> > > > > >>> queries >> > > > > > > >> > > > > >>> according to different strategies. It has >> little >> > to >> > > do >> > > > > > with >> > > > > > > >> > > > Checkpoint >> > > > > > > >> > > > > in >> > > > > > > >> > > > > >>> fact, we can pay attention to it when designing >> > the >> > > > API >> > > > > > and >> > > > > > > >> > > > > implementing >> > > > > > > >> > > > > >>> the functions. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> b) Flink ETL Job >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> At present, the workload is relatively small >> and >> > we >> > > > need >> > > > > > to >> > > > > > > >> > trigger >> > > > > > > >> > > > > >>> checkpoints in CheckpointCoordinator manually >> by >> > > > > > > >> SplitEnumerator. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> c) Table Store >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> Table Store mainly provides the ability to >> write >> > and >> > > > > read >> > > > > > > >> data. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> c.1) Write data. At present, Table Store >> generates >> > > > > > snapshots >> > > > > > > >> > > > according >> > > > > > > >> > > > > to >> > > > > > > >> > > > > >>> two phases in Flink. When using Checkpoint as >> > > > > consistency >> > > > > > > >> > > management, >> > > > > > > >> > > > > we >> > > > > > > >> > > > > >>> need to write checkpoint information to >> snapshots. >> > > > After >> > > > > > using >> > > > > > > >> > > > > Timestamp >> > > > > > > >> > > > > >>> Barrier, the snapshot in Table Store may be >> > > > disassembled >> > > > > > more >> > > > > > > >> > > finely, >> > > > > > > >> > > > > and >> > > > > > > >> > > > > >>> we need to write Timestamp information to the >> data >> > > > > file. A >> > > > > > > >> > > > > "checkpointed >> > > > > > > >> > > > > >>> snapshot" may contain multiple "Timestamp >> > > snapshots". >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> c.2) Read data. The SplitEnumerator that reads >> > data >> > > > from >> > > > > > the >> > > > > > > >> > Table >> > > > > > > >> > > > > Store >> > > > > > > >> > > > > >>> will manage multiple splits according to the >> > version >> > > > > > number. >> > > > > > > >> > After >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> specified splits are completed, it sends a >> Barrier >> > > > > > command to >> > > > > > > >> > > > trigger a >> > > > > > > >> > > > > >>> checkpoint in the ETL job. The source node will >> > > > > broadcast >> > > > > > the >> > > > > > > >> > > > > checkpoint >> > > > > > > >> > > > > >>> barrier downstream after receiving it. When >> using >> > > > > > Timestamp >> > > > > > > >> > > Barrier, >> > > > > > > >> > > > > the >> > > > > > > >> > > > > >>> overall process is similar, but the >> > SplitEnumerator >> > > > does >> > > > > > not >> > > > > > > >> need >> > > > > > > >> > > to >> > > > > > > >> > > > > >>> trigger a checkpoint to the Flink ETL, and the >> > > Source >> > > > > node >> > > > > > > >> needs >> > > > > > > >> > to >> > > > > > > >> > > > > >>> support >> > > > > > > >> > > > > >>> broadcasting Timestamp Barrier to the >> downstream >> > at >> > > > that >> > > > > > time. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> From the above overall, the evolution >> complexity >> > > from >> > > > > > > >> Checkpoint >> > > > > > > >> > to >> > > > > > > >> > > > > >>> Timestamp seems controllable, but the specific >> > > > > > implementation >> > > > > > > >> > needs >> > > > > > > >> > > > > >>> careful >> > > > > > > >> > > > > >>> design, and the concept and features of >> Checkpoint >> > > > > should >> > > > > > not >> > > > > > > >> be >> > > > > > > >> > > > > >>> introduced >> > > > > > > >> > > > > >>> too much into relevant interfaces and >> functions. >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> What do you think of it? Looking forward to >> your >> > > > > feedback, >> > > > > > > >> thanks >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> Best, >> > > > > > > >> > > > > >>> Shammon >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek >> < >> > > > > > > >> d...@apache.org> >> > > > > > > >> > > > > wrote: >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >>> > Hi Shammon, >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > I'm starting to see what you're trying to >> > achieve, >> > > > and >> > > > > > it's >> > > > > > > >> > > really >> > > > > > > >> > > > > >>> > exciting. I share Piotr's concerns about e2e >> > > latency >> > > > > and >> > > > > > > >> > > disability >> > > > > > > >> > > > > to >> > > > > > > >> > > > > >>> use >> > > > > > > >> > > > > >>> > unaligned checkpoints. >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > I have a couple of questions that are not >> clear >> > to >> > > > me >> > > > > > from >> > > > > > > >> > going >> > > > > > > >> > > > over >> > > > > > > >> > > > > >>> the >> > > > > > > >> > > > > >>> > FLIP: >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > 1) Global Checkpoint Commit >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > Are you planning on committing the >> checkpoints >> > in >> > > > a) a >> > > > > > > >> "rolling >> > > > > > > >> > > > > >>> fashion" - >> > > > > > > >> > > > > >>> > one pipeline after another, or b) altogether >> - >> > > once >> > > > > the >> > > > > > data >> > > > > > > >> > have >> > > > > > > >> > > > > been >> > > > > > > >> > > > > >>> > processed by all pipelines? >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > Option a) would be eventually consistent (for >> > > batch >> > > > > > queries, >> > > > > > > >> > > you'd >> > > > > > > >> > > > > >>> need to >> > > > > > > >> > > > > >>> > use the last checkpoint produced by the most >> > > > > downstream >> > > > > > > >> table), >> > > > > > > >> > > > > >>> whereas b) >> > > > > > > >> > > > > >>> > would be strongly consistent at the cost of >> > > > increasing >> > > > > > the >> > > > > > > >> e2e >> > > > > > > >> > > > > latency >> > > > > > > >> > > > > >>> even >> > > > > > > >> > > > > >>> > more. >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > I feel that option a) is what this should be >> > > headed >> > > > > for. >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > 2) MetaService >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > Should this be a new general Flink component >> or >> > > one >> > > > > > > >> specific to >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> Flink >> > > > > > > >> > > > > >>> > Table Store? >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > 3) Follow-ups >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > From the above discussion, there is a >> consensus >> > > > that, >> > > > > > in the >> > > > > > > >> > > ideal >> > > > > > > >> > > > > >>> case, >> > > > > > > >> > > > > >>> > watermarks would be a way to go, but there is >> > some >> > > > > > > >> underlying >> > > > > > > >> > > > > mechanism >> > > > > > > >> > > > > >>> > missing. It would be great to discuss this >> > option >> > > in >> > > > > > more >> > > > > > > >> > detail >> > > > > > > >> > > to >> > > > > > > >> > > > > >>> compare >> > > > > > > >> > > > > >>> > the solutions in terms of implementation >> cost, >> > > maybe >> > > > > it >> > > > > > > >> could >> > > > > > > >> > not >> > > > > > > >> > > > be >> > > > > > > >> > > > > as >> > > > > > > >> > > > > >>> > complex. >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > All in all, I don't feel that checkpoints are >> > > > suitable >> > > > > > for >> > > > > > > >> > > > providing >> > > > > > > >> > > > > >>> > consistent table versioning between multiple >> > > > > pipelines. >> > > > > > The >> > > > > > > >> > main >> > > > > > > >> > > > > >>> reason is >> > > > > > > >> > > > > >>> > that they are designed to be a fault >> tolerance >> > > > > > mechanism. >> > > > > > > >> > > Somewhere >> > > > > > > >> > > > > >>> between >> > > > > > > >> > > > > >>> > the lines, you've already noted that the >> > primitive >> > > > > > you're >> > > > > > > >> > looking >> > > > > > > >> > > > for >> > > > > > > >> > > > > >>> is >> > > > > > > >> > > > > >>> > cross-pipeline barrier alignment, which is >> the >> > > > > > mechanism a >> > > > > > > >> > subset >> > > > > > > >> > > > of >> > > > > > > >> > > > > >>> > currently supported checkpointing >> > implementations >> > > > > > happen to >> > > > > > > >> be >> > > > > > > >> > > > using. >> > > > > > > >> > > > > >>> Is >> > > > > > > >> > > > > >>> > that correct? >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > My biggest concern is that tying this with a >> > > > > > "side-effect" >> > > > > > > >> of >> > > > > > > >> > the >> > > > > > > >> > > > > >>> > checkpointing mechanism could block us from >> > > evolving >> > > > > it >> > > > > > > >> > further. >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > Best, >> > > > > > > >> > > > > >>> > D. >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY < >> > > > > > > >> zjur...@gmail.com> >> > > > > > > >> > > > > wrote: >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> > > Hi Piotr, >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Thank you for your feedback. I cannot see >> the >> > > DAG >> > > > in >> > > > > > 3.a >> > > > > > > >> in >> > > > > > > >> > > your >> > > > > > > >> > > > > >>> reply, >> > > > > > > >> > > > > >>> > but >> > > > > > > >> > > > > >>> > > I'd like to answer some questions first. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Your understanding is very correct. We >> want to >> > > > align >> > > > > > the >> > > > > > > >> data >> > > > > > > >> > > > > >>> versions of >> > > > > > > >> > > > > >>> > > all intermediate tables through checkpoint >> > > > mechanism >> > > > > > in >> > > > > > > >> > Flink. >> > > > > > > >> > > > I'm >> > > > > > > >> > > > > >>> sorry >> > > > > > > >> > > > > >>> > > that I have omitted some default >> constraints >> > in >> > > > > FLIP, >> > > > > > > >> > including >> > > > > > > >> > > > > only >> > > > > > > >> > > > > >>> > > supporting aligned checkpoints; one table >> can >> > > only >> > > > > be >> > > > > > > >> written >> > > > > > > >> > > by >> > > > > > > >> > > > > one >> > > > > > > >> > > > > >>> ETL >> > > > > > > >> > > > > >>> > > job. I will add these later. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Why can't the watermark mechanism achieve >> the >> > > data >> > > > > > > >> > consistency >> > > > > > > >> > > we >> > > > > > > >> > > > > >>> wanted? >> > > > > > > >> > > > > >>> > > For example, there are 3 tables, Table1 is >> > word >> > > > > table, >> > > > > > > >> Table2 >> > > > > > > >> > > is >> > > > > > > >> > > > > >>> > word->cnt >> > > > > > > >> > > > > >>> > > table and Table3 is cnt1->cnt2 table. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO >> > > Table2 >> > > > > > SELECT >> > > > > > > >> > word, >> > > > > > > >> > > > > >>> count(*) >> > > > > > > >> > > > > >>> > > FROM Table1 GROUP BY word >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO >> > > Table3 >> > > > > > SELECT >> > > > > > > >> cnt, >> > > > > > > >> > > > > >>> count(*) >> > > > > > > >> > > > > >>> > FROM >> > > > > > > >> > > > > >>> > > Table2 GROUP BY cnt >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > ETL1 has 2 subtasks to read multiple >> buckets >> > > from >> > > > > > Table1, >> > > > > > > >> > where >> > > > > > > >> > > > > >>> subtask1 >> > > > > > > >> > > > > >>> > > reads streaming data as [a, b, c, a, d, a, >> b, >> > > c, d >> > > > > > ...] >> > > > > > > >> and >> > > > > > > >> > > > > subtask2 >> > > > > > > >> > > > > >>> > reads >> > > > > > > >> > > > > >>> > > streaming data as [a, c, d, q, a, v, c, d >> > ...]. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 1. Unbounded streaming data is divided into >> > > > multiple >> > > > > > sets >> > > > > > > >> > > > according >> > > > > > > >> > > > > >>> to >> > > > > > > >> > > > > >>> > some >> > > > > > > >> > > > > >>> > > semantic requirements. The most extreme >> may be >> > > one >> > > > > > set for >> > > > > > > >> > each >> > > > > > > >> > > > > data. >> > > > > > > >> > > > > >>> > > Assume that the sets of subtask1 and >> subtask2 >> > > > > > separated by >> > > > > > > >> > the >> > > > > > > >> > > > same >> > > > > > > >> > > > > >>> > > semantics are [a, b, c, a, d] and [a, c, d, >> > q], >> > > > > > > >> respectively. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 2. After the above two sets are computed by >> > > ETL1, >> > > > > the >> > > > > > > >> result >> > > > > > > >> > > data >> > > > > > > >> > > > > >>> > generated >> > > > > > > >> > > > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d, >> 2), >> > > (q, >> > > > > > 1)]. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 3. The result data generated in Table 3 >> after >> > > the >> > > > > > data in >> > > > > > > >> > > Table 2 >> > > > > > > >> > > > > is >> > > > > > > >> > > > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3, >> 1)] >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > We want to align the data of Table1, Table2 >> > and >> > > > > > Table3 and >> > > > > > > >> > > manage >> > > > > > > >> > > > > the >> > > > > > > >> > > > > >>> > data >> > > > > > > >> > > > > >>> > > versions. When users execute OLAP/Batch >> > queries >> > > > join >> > > > > > on >> > > > > > > >> these >> > > > > > > >> > > > > >>> tables, the >> > > > > > > >> > > > > >>> > > following consistency data can be found >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q] >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2], >> [q, >> > > 1] >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1] >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Users can perform query: SELECT t1.word, >> > t2.cnt, >> > > > > > t3.cnt2 >> > > > > > > >> from >> > > > > > > >> > > > > Table1 >> > > > > > > >> > > > > >>> t1 >> > > > > > > >> > > > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on >> > t1.word=t2.word >> > > > and >> > > > > > > >> > > > > t2.cnt=t3.cnt1; >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > In the view of users, the data is >> consistent >> > on >> > > a >> > > > > > unified >> > > > > > > >> > > > "version" >> > > > > > > >> > > > > >>> > between >> > > > > > > >> > > > > >>> > > Table1, Table2 and Table3. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > In the current Flink implementation, the >> > aligned >> > > > > > > >> checkpoint >> > > > > > > >> > can >> > > > > > > >> > > > > >>> achieve >> > > > > > > >> > > > > >>> > the >> > > > > > > >> > > > > >>> > > above capabilities (let's ignore the >> > > segmentation >> > > > > > > >> semantics >> > > > > > > >> > of >> > > > > > > >> > > > > >>> checkpoint >> > > > > > > >> > > > > >>> > > first). Because the Checkpoint Barrier will >> > > align >> > > > > the >> > > > > > data >> > > > > > > >> > when >> > > > > > > >> > > > > >>> > performing >> > > > > > > >> > > > > >>> > > the global Count aggregation, we can >> associate >> > > the >> > > > > > > >> snapshot >> > > > > > > >> > > with >> > > > > > > >> > > > > the >> > > > > > > >> > > > > >>> > > checkpoint in the Table Store, query the >> > > specified >> > > > > > > >> snapshot >> > > > > > > >> > of >> > > > > > > >> > > > > >>> > > Table1/Table2/Table3 through the >> checkpoint, >> > and >> > > > > > achieve >> > > > > > > >> the >> > > > > > > >> > > > > >>> consistency >> > > > > > > >> > > > > >>> > > requirements of the above unified >> "version". >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Current watermark mechanism in Flink cannot >> > > > achieve >> > > > > > the >> > > > > > > >> above >> > > > > > > >> > > > > >>> > consistency. >> > > > > > > >> > > > > >>> > > For example, we use watermark to divide >> data >> > > into >> > > > > > multiple >> > > > > > > >> > sets >> > > > > > > >> > > > in >> > > > > > > >> > > > > >>> > subtask1 >> > > > > > > >> > > > > >>> > > and subtask2 as followed >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a, >> > T1), >> > > > (d, >> > > > > > T1)], >> > > > > > > >> > T1, >> > > > > > > >> > > > [(a, >> > > > > > > >> > > > > >>> T2), >> > > > > > > >> > > > > >>> > > (b, T2), (c, T2), (d, T2)], T2 >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1), >> (q, >> > > T1)], >> > > > > T1, >> > > > > > > >> .... >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > As Flink watermark does not have barriers >> and >> > > > cannot >> > > > > > align >> > > > > > > >> > > data, >> > > > > > > >> > > > > ETL1 >> > > > > > > >> > > > > >>> > Count >> > > > > > > >> > > > > >>> > > operator may compute the data of subtask1 >> > first: >> > > > > [(a, >> > > > > > T1), >> > > > > > > >> > (b, >> > > > > > > >> > > > T1), >> > > > > > > >> > > > > >>> (c, >> > > > > > > >> > > > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b, >> > T2)], >> > > > then >> > > > > > > >> compute >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> data of >> > > > > > > >> > > > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q, >> > T1)], >> > > > T1, >> > > > > > which >> > > > > > > >> is >> > > > > > > >> > > not >> > > > > > > >> > > > > >>> possible >> > > > > > > >> > > > > >>> > > in aligned checkpoint. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > In this order, the result output to Table2 >> > after >> > > > the >> > > > > > Count >> > > > > > > >> > > > > >>> aggregation >> > > > > > > >> > > > > >>> > will >> > > > > > > >> > > > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1), >> (a, 2, >> > > > T1), >> > > > > > (a, 3, >> > > > > > > >> > T2), >> > > > > > > >> > > > (b, >> > > > > > > >> > > > > >>> 2, >> > > > > > > >> > > > > >>> > T2), >> > > > > > > >> > > > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1, >> > T1), >> > > > > which >> > > > > > can >> > > > > > > >> be >> > > > > > > >> > > > > >>> simplified >> > > > > > > >> > > > > >>> > as: >> > > > > > > >> > > > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4, >> > T1), >> > > > (c, >> > > > > > 2, >> > > > > > > >> T1), >> > > > > > > >> > > (d, >> > > > > > > >> > > > 1, >> > > > > > > >> > > > > >>> T1), >> > > > > > > >> > > > > >>> > > (q, 1, T1)] >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > There's no (a, 3, T1), we have been unable >> to >> > > > query >> > > > > > > >> > consistent >> > > > > > > >> > > > data >> > > > > > > >> > > > > >>> > results >> > > > > > > >> > > > > >>> > > on Table1 and Table2 according to T1. >> Table 3 >> > > has >> > > > > the >> > > > > > same >> > > > > > > >> > > > problem. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > In addition to using Checkpoint Barrier, >> the >> > > other >> > > > > > > >> > > implementation >> > > > > > > >> > > > > >>> > > supporting watermark above is to convert >> Count >> > > > > > aggregation >> > > > > > > >> > into >> > > > > > > >> > > > > >>> Window >> > > > > > > >> > > > > >>> > > Count. After the global Count is converted >> > into >> > > > > window >> > > > > > > >> > > operator, >> > > > > > > >> > > > it >> > > > > > > >> > > > > >>> needs >> > > > > > > >> > > > > >>> > > to support cross window data computation. >> > > Similar >> > > > to >> > > > > > the >> > > > > > > >> data >> > > > > > > >> > > > > >>> > relationship >> > > > > > > >> > > > > >>> > > between the previous and the current >> > Checkpoint, >> > > > it >> > > > > is >> > > > > > > >> > > equivalent >> > > > > > > >> > > > > to >> > > > > > > >> > > > > >>> > > introducing the Watermark Barrier, which >> > > requires >> > > > > > > >> adjustments >> > > > > > > >> > > to >> > > > > > > >> > > > > the >> > > > > > > >> > > > > >>> > > current Flink Watermark mechanism. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Besides the above global aggregation, there >> > are >> > > > > window >> > > > > > > >> > > operators >> > > > > > > >> > > > in >> > > > > > > >> > > > > >>> > Flink. >> > > > > > > >> > > > > >>> > > I don't know if my understanding is >> correct(I >> > > > cannot >> > > > > > see >> > > > > > > >> the >> > > > > > > >> > > DAG >> > > > > > > >> > > > in >> > > > > > > >> > > > > >>> your >> > > > > > > >> > > > > >>> > > example), please correct me if it's wrong. >> I >> > > think >> > > > > you >> > > > > > > >> raise >> > > > > > > >> > a >> > > > > > > >> > > > very >> > > > > > > >> > > > > >>> > > important and interesting question: how to >> > > define >> > > > > data >> > > > > > > >> > > > consistency >> > > > > > > >> > > > > in >> > > > > > > >> > > > > >>> > > different window computations which will >> > > generate >> > > > > > > >> different >> > > > > > > >> > > > > >>> timestamps of >> > > > > > > >> > > > > >>> > > the same data. This situation also occurs >> when >> > > > using >> > > > > > event >> > > > > > > >> > time >> > > > > > > >> > > > to >> > > > > > > >> > > > > >>> align >> > > > > > > >> > > > > >>> > > data. At present, what I can think of is to >> > > store >> > > > > > these >> > > > > > > >> > > > information >> > > > > > > >> > > > > >>> in >> > > > > > > >> > > > > >>> > > Table Store, users can perform filter or >> join >> > on >> > > > > data >> > > > > > with >> > > > > > > >> > > them. >> > > > > > > >> > > > > This >> > > > > > > >> > > > > >>> > FLIP >> > > > > > > >> > > > > >>> > > is our first phase, and the specific >> > > > implementation >> > > > > of >> > > > > > > >> this >> > > > > > > >> > > will >> > > > > > > >> > > > be >> > > > > > > >> > > > > >>> > > designed and considered in the next phase >> and >> > > > FLIP. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Although the Checkpoint Barrier can achieve >> > the >> > > > most >> > > > > > basic >> > > > > > > >> > > > > >>> consistency, >> > > > > > > >> > > > > >>> > as >> > > > > > > >> > > > > >>> > > you mentioned, using the Checkpoint >> mechanism >> > > will >> > > > > > cause >> > > > > > > >> many >> > > > > > > >> > > > > >>> problems, >> > > > > > > >> > > > > >>> > > including the increase of checkpoint time >> for >> > > > > multiple >> > > > > > > >> > cascade >> > > > > > > >> > > > > jobs, >> > > > > > > >> > > > > >>> the >> > > > > > > >> > > > > >>> > > increase of E2E data freshness time >> (several >> > > > minutes >> > > > > > or >> > > > > > > >> even >> > > > > > > >> > > > dozens >> > > > > > > >> > > > > >>> of >> > > > > > > >> > > > > >>> > > minutes), and the increase of the overall >> > system >> > > > > > > >> complexity. >> > > > > > > >> > At >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> same >> > > > > > > >> > > > > >>> > > time, the semantics of Checkpoint data >> > > > segmentation >> > > > > is >> > > > > > > >> > unclear. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > The current FLIP is the first phase of our >> > whole >> > > > > > proposal, >> > > > > > > >> > and >> > > > > > > >> > > > you >> > > > > > > >> > > > > >>> can >> > > > > > > >> > > > > >>> > find >> > > > > > > >> > > > > >>> > > the follow-up plan in our future worker. In >> > the >> > > > > first >> > > > > > > >> stage, >> > > > > > > >> > we >> > > > > > > >> > > > do >> > > > > > > >> > > > > >>> not >> > > > > > > >> > > > > >>> > want >> > > > > > > >> > > > > >>> > > to modify the Flink mechanism. We'd like to >> > > > realize >> > > > > > basic >> > > > > > > >> > > system >> > > > > > > >> > > > > >>> > functions >> > > > > > > >> > > > > >>> > > based on existing mechanisms in Flink, >> > including >> > > > the >> > > > > > > >> > > relationship >> > > > > > > >> > > > > >>> > > management of ETL and tables, and the basic >> > data >> > > > > > > >> consistency, >> > > > > > > >> > > so >> > > > > > > >> > > > we >> > > > > > > >> > > > > >>> > choose >> > > > > > > >> > > > > >>> > > Global Checkpoint in our FLIP. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > We agree with you very much that event >> time is >> > > > more >> > > > > > > >> suitable >> > > > > > > >> > > for >> > > > > > > >> > > > > data >> > > > > > > >> > > > > >>> > > consistency management. We'd like consider >> > this >> > > > > > matter in >> > > > > > > >> the >> > > > > > > >> > > > > second >> > > > > > > >> > > > > >>> or >> > > > > > > >> > > > > >>> > > third stage after the current FLIP. We >> hope to >> > > > > > improve the >> > > > > > > >> > > > > watermark >> > > > > > > >> > > > > >>> > > mechanism in Flink to support barriers. As >> you >> > > > > > mentioned >> > > > > > > >> in >> > > > > > > >> > > your >> > > > > > > >> > > > > >>> reply, >> > > > > > > >> > > > > >>> > we >> > > > > > > >> > > > > >>> > > can achieve data consistency based on >> > timestamp, >> > > > > while >> > > > > > > >> > > > maintaining >> > > > > > > >> > > > > >>> E2E >> > > > > > > >> > > > > >>> > data >> > > > > > > >> > > > > >>> > > freshness of seconds or even milliseconds >> for >> > > 10+ >> > > > > > cascaded >> > > > > > > >> > > jobs. >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > What do you think? Thanks >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > Best, >> > > > > > > >> > > > > >>> > > Shammon >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr >> Nowojski >> > < >> > > > > > > >> > > > > pnowoj...@apache.org> >> > > > > > > >> > > > > >>> > > wrote: >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > > Hi Shammon, >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > Do I understand it correctly, that you >> > > > effectively >> > > > > > want >> > > > > > > >> to >> > > > > > > >> > > > expand >> > > > > > > >> > > > > >>> the >> > > > > > > >> > > > > >>> > > > checkpoint alignment mechanism across >> many >> > > > > different >> > > > > > > >> jobs >> > > > > > > >> > and >> > > > > > > >> > > > > hand >> > > > > > > >> > > > > >>> over >> > > > > > > >> > > > > >>> > > > checkpoint barriers from upstream to >> > > downstream >> > > > > jobs >> > > > > > > >> using >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> > > intermediate >> > > > > > > >> > > > > >>> > > > tables? >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > Re the watermarks for the "Rejected >> > > > > Alternatives". I >> > > > > > > >> don't >> > > > > > > >> > > > > >>> understand >> > > > > > > >> > > > > >>> > why >> > > > > > > >> > > > > >>> > > > this has been rejected. Could you >> elaborate >> > on >> > > > > this >> > > > > > > >> point? >> > > > > > > >> > > Here >> > > > > > > >> > > > > >>> are a >> > > > > > > >> > > > > >>> > > > couple of my thoughts on this matter, but >> > > please >> > > > > > > >> correct me >> > > > > > > >> > > if >> > > > > > > >> > > > > I'm >> > > > > > > >> > > > > >>> > wrong, >> > > > > > > >> > > > > >>> > > > as I haven't dived deeper into this >> topic. >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > > As shown above, there are 2 watermarks >> T1 >> > > and >> > > > > T2, >> > > > > > T1 < >> > > > > > > >> > T2. >> > > > > > > >> > > > > >>> > > > > The StreamTask reads data in order: >> > > > > > > >> > > > > >>> > > > >> V11,V12,V21,T1(channel1),V13,T1(channel2). >> > > > > > > >> > > > > >>> > > > > At this time, StreamTask will confirm >> that >> > > > > > watermark >> > > > > > > >> T1 >> > > > > > > >> > is >> > > > > > > >> > > > > >>> completed, >> > > > > > > >> > > > > >>> > > > but the data beyond >> > > > > > > >> > > > > >>> > > > > T1 has been processed(V13) and the >> results >> > > are >> > > > > > > >> written to >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> sink >> > > > > > > >> > > > > >>> > > > table. >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > 1. I see the same "problem" with >> unaligned >> > > > > > checkpoints >> > > > > > > >> in >> > > > > > > >> > > your >> > > > > > > >> > > > > >>> current >> > > > > > > >> > > > > >>> > > > proposal. >> > > > > > > >> > > > > >>> > > > 2. I don't understand why this is a >> problem? >> > > > Just >> > > > > > store >> > > > > > > >> in >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> "sink >> > > > > > > >> > > > > >>> > > > table" what's the watermark (T1), and >> > > downstream >> > > > > > jobs >> > > > > > > >> > should >> > > > > > > >> > > > > >>> process >> > > > > > > >> > > > > >>> > the >> > > > > > > >> > > > > >>> > > > data with that "watermark" anyway. Record >> > > "V13" >> > > > > > should >> > > > > > > >> be >> > > > > > > >> > > > treated >> > > > > > > >> > > > > >>> as >> > > > > > > >> > > > > >>> > > > "early" data. Downstream jobs if: >> > > > > > > >> > > > > >>> > > > a) they are streaming jobs, for example >> > they >> > > > > should >> > > > > > > >> > > aggregate >> > > > > > > >> > > > it >> > > > > > > >> > > > > >>> in >> > > > > > > >> > > > > >>> > > > windowed/temporal state, but they >> shouldn't >> > > > > produce >> > > > > > the >> > > > > > > >> > > result >> > > > > > > >> > > > > that >> > > > > > > >> > > > > >>> > > > contains it, as the watermark T2 was not >> yet >> > > > > > processed. >> > > > > > > >> Or >> > > > > > > >> > > they >> > > > > > > >> > > > > >>> would >> > > > > > > >> > > > > >>> > > just >> > > > > > > >> > > > > >>> > > > pass that record as "early" data. >> > > > > > > >> > > > > >>> > > > b) they are batch jobs, it looks to me >> like >> > > > batch >> > > > > > jobs >> > > > > > > >> > > > shouldn't >> > > > > > > >> > > > > >>> take >> > > > > > > >> > > > > >>> > > > "all available data", but only consider >> "all >> > > the >> > > > > > data >> > > > > > > >> until >> > > > > > > >> > > > some >> > > > > > > >> > > > > >>> > > > watermark", for example the latest >> > available: >> > > T1 >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > 3. I'm pretty sure there are counter >> > examples, >> > > > > where >> > > > > > > >> your >> > > > > > > >> > > > > proposed >> > > > > > > >> > > > > >>> > > > mechanism of using checkpoints (even >> > aligned!) >> > > > > will >> > > > > > > >> produce >> > > > > > > >> > > > > >>> > > > inconsistent data from the perspective of >> > the >> > > > > event >> > > > > > > >> time. >> > > > > > > >> > > > > >>> > > > a) For example what if one of your >> "ETL" >> > > jobs, >> > > > > > has the >> > > > > > > >> > > > > following >> > > > > > > >> > > > > >>> DAG: >> > > > > > > >> > > > > >>> > > > [image: flip276.jpg] >> > > > > > > >> > > > > >>> > > > Even if you use aligned checkpoints for >> > > > > > committing the >> > > > > > > >> > data >> > > > > > > >> > > > to >> > > > > > > >> > > > > >>> the >> > > > > > > >> > > > > >>> > sink >> > > > > > > >> > > > > >>> > > > table, the watermarks of "Window1" and >> > > "Window2" >> > > > > are >> > > > > > > >> > > completely >> > > > > > > >> > > > > >>> > > > independent. The sink table might easily >> > have >> > > > data >> > > > > > from >> > > > > > > >> the >> > > > > > > >> > > > > >>> > Src1/Window1 >> > > > > > > >> > > > > >>> > > > from the event time T1 and Src2/Window2 >> from >> > > > later >> > > > > > event >> > > > > > > >> > time >> > > > > > > >> > > > T2. >> > > > > > > >> > > > > >>> > > > b) I think the same applies if you have >> > two >> > > > > > completely >> > > > > > > >> > > > > >>> independent >> > > > > > > >> > > > > >>> > ETL >> > > > > > > >> > > > > >>> > > > jobs writing either to the same sink >> table, >> > or >> > > > two >> > > > > > to >> > > > > > > >> > > different >> > > > > > > >> > > > > >>> sink >> > > > > > > >> > > > > >>> > > tables >> > > > > > > >> > > > > >>> > > > (that are both later used in the same >> > > downstream >> > > > > > job). >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > 4a) I'm not sure if I like the idea of >> > > > > centralising >> > > > > > the >> > > > > > > >> > whole >> > > > > > > >> > > > > >>> system in >> > > > > > > >> > > > > >>> > > > this way. If you have 10 jobs, the >> > likelihood >> > > of >> > > > > the >> > > > > > > >> > > checkpoint >> > > > > > > >> > > > > >>> failure >> > > > > > > >> > > > > >>> > > > will be 10 times higher, and/or the >> duration >> > > of >> > > > > the >> > > > > > > >> > > checkpoint >> > > > > > > >> > > > > can >> > > > > > > >> > > > > >>> be >> > > > > > > >> > > > > >>> > > much >> > > > > > > >> > > > > >>> > > > much longer (especially under >> backpressure). >> > > And >> > > > > > this is >> > > > > > > >> > > > actually >> > > > > > > >> > > > > >>> > > already a >> > > > > > > >> > > > > >>> > > > limitation of Apache Flink (global >> > checkpoints >> > > > are >> > > > > > more >> > > > > > > >> > prone >> > > > > > > >> > > > to >> > > > > > > >> > > > > >>> fail >> > > > > > > >> > > > > >>> > the >> > > > > > > >> > > > > >>> > > > larger the scale), so I would be anxious >> > about >> > > > > > making it >> > > > > > > >> > > > > >>> potentially >> > > > > > > >> > > > > >>> > > even a >> > > > > > > >> > > > > >>> > > > larger issue. >> > > > > > > >> > > > > >>> > > > 4b) I'm also worried about increased >> > > complexity >> > > > of >> > > > > > the >> > > > > > > >> > system >> > > > > > > >> > > > > after >> > > > > > > >> > > > > >>> > > adding >> > > > > > > >> > > > > >>> > > > the global checkpoint, and additional >> > > (single?) >> > > > > > point of >> > > > > > > >> > > > failure. >> > > > > > > >> > > > > >>> > > > 5. Such a design would also not work if >> we >> > > ever >> > > > > > wanted >> > > > > > > >> to >> > > > > > > >> > > have >> > > > > > > >> > > > > task >> > > > > > > >> > > > > >>> > local >> > > > > > > >> > > > > >>> > > > checkpoints. >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > All in all, it seems to me like actually >> the >> > > > > > watermarks >> > > > > > > >> and >> > > > > > > >> > > > even >> > > > > > > >> > > > > >>> time >> > > > > > > >> > > > > >>> > are >> > > > > > > >> > > > > >>> > > > the better concept in this context that >> > should >> > > > > have >> > > > > > been >> > > > > > > >> > used >> > > > > > > >> > > > for >> > > > > > > >> > > > > >>> > > > synchronising and data consistency across >> > the >> > > > > whole >> > > > > > > >> system. >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > Best, >> > > > > > > >> > > > > >>> > > > Piotrek >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY < >> > > > > > zjur...@gmail.com> >> > > > > > > >> > > > > >>> napisał(a): >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > >> Hi @Martijn >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> Thanks for your comments, and I'd like >> to >> > > reply >> > > > > to >> > > > > > them >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> 1. It sounds good to me, I'll update the >> > > > content >> > > > > > > >> structure >> > > > > > > >> > > in >> > > > > > > >> > > > > FLIP >> > > > > > > >> > > > > >>> > later >> > > > > > > >> > > > > >>> > > >> and give the problems first. >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> 2. "Each ETL job creates snapshots with >> > > > > checkpoint >> > > > > > > >> info on >> > > > > > > >> > > > sink >> > > > > > > >> > > > > >>> tables >> > > > > > > >> > > > > >>> > > in >> > > > > > > >> > > > > >>> > > >> Table Store" -> That reads like you're >> > > > proposing >> > > > > > that >> > > > > > > >> > > > snapshots >> > > > > > > >> > > > > >>> need >> > > > > > > >> > > > > >>> > to >> > > > > > > >> > > > > >>> > > >> be >> > > > > > > >> > > > > >>> > > >> written to Table Store? >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> Yes. To support the data consistency in >> the >> > > > FLIP, >> > > > > > we >> > > > > > > >> need >> > > > > > > >> > to >> > > > > > > >> > > > get >> > > > > > > >> > > > > >>> > through >> > > > > > > >> > > > > >>> > > >> checkpoints in Flink and snapshots in >> > store, >> > > > this >> > > > > > > >> > requires a >> > > > > > > >> > > > > close >> > > > > > > >> > > > > >>> > > >> combination of Flink and store >> > > implementation. >> > > > In >> > > > > > the >> > > > > > > >> > first >> > > > > > > >> > > > > stage >> > > > > > > >> > > > > >>> we >> > > > > > > >> > > > > >>> > > plan >> > > > > > > >> > > > > >>> > > >> to implement it based on Flink and Table >> > > Store >> > > > > > only, >> > > > > > > >> > > snapshots >> > > > > > > >> > > > > >>> written >> > > > > > > >> > > > > >>> > > to >> > > > > > > >> > > > > >>> > > >> external storage don't support >> consistency. >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> 3. If you introduce a MetaService, it >> > becomes >> > > > the >> > > > > > > >> single >> > > > > > > >> > > point >> > > > > > > >> > > > > of >> > > > > > > >> > > > > >>> > > failure >> > > > > > > >> > > > > >>> > > >> because it coordinates everything. But I >> > > can't >> > > > > find >> > > > > > > >> > anything >> > > > > > > >> > > > in >> > > > > > > >> > > > > >>> the >> > > > > > > >> > > > > >>> > FLIP >> > > > > > > >> > > > > >>> > > >> on >> > > > > > > >> > > > > >>> > > >> making the MetaService high available or >> > how >> > > to >> > > > > > deal >> > > > > > > >> with >> > > > > > > >> > > > > >>> failovers >> > > > > > > >> > > > > >>> > > there. >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> I think you raise a very important >> problem >> > > and >> > > > I >> > > > > > > >> missed it >> > > > > > > >> > > in >> > > > > > > >> > > > > >>> FLIP. >> > > > > > > >> > > > > >>> > The >> > > > > > > >> > > > > >>> > > >> MetaService is a single point and should >> > > > support >> > > > > > > >> failover, >> > > > > > > >> > > we >> > > > > > > >> > > > > >>> will do >> > > > > > > >> > > > > >>> > it >> > > > > > > >> > > > > >>> > > >> in >> > > > > > > >> > > > > >>> > > >> future in the first stage we only >> support >> > > > > > standalone >> > > > > > > >> mode, >> > > > > > > >> > > THX >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> 4. The FLIP states under Rejected >> > > Alternatives >> > > > > > > >> "Currently >> > > > > > > >> > > > > >>> watermark in >> > > > > > > >> > > > > >>> > > >> Flink cannot align data." which is not >> > true, >> > > > > given >> > > > > > that >> > > > > > > >> > > there >> > > > > > > >> > > > is >> > > > > > > >> > > > > >>> > > FLIP-182 >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> Watermark alignment in FLIP-182 is >> > different >> > > > from >> > > > > > > >> > > requirements >> > > > > > > >> > > > > >>> > > "watermark >> > > > > > > >> > > > > >>> > > >> align data" in our FLIP. FLIP-182 aims >> to >> > fix >> > > > > > watermark >> > > > > > > >> > > > > >>> generation in >> > > > > > > >> > > > > >>> > > >> different sources for "slight imbalance >> or >> > > data >> > > > > > skew", >> > > > > > > >> > which >> > > > > > > >> > > > > >>> means in >> > > > > > > >> > > > > >>> > > some >> > > > > > > >> > > > > >>> > > >> cases the source must generate watermark >> > even >> > > > if >> > > > > > they >> > > > > > > >> > should >> > > > > > > >> > > > > not. >> > > > > > > >> > > > > >>> When >> > > > > > > >> > > > > >>> > > the >> > > > > > > >> > > > > >>> > > >> operator collects watermarks, the data >> > > > processing >> > > > > > is as >> > > > > > > >> > > > > described >> > > > > > > >> > > > > >>> in >> > > > > > > >> > > > > >>> > our >> > > > > > > >> > > > > >>> > > >> FLIP, and the data cannot be aligned >> > through >> > > > the >> > > > > > > >> barrier >> > > > > > > >> > > like >> > > > > > > >> > > > > >>> > > Checkpoint. >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> 5. Given the MetaService role, it feels >> > like >> > > > this >> > > > > > is >> > > > > > > >> > > > > introducing a >> > > > > > > >> > > > > >>> > tight >> > > > > > > >> > > > > >>> > > >> dependency between Flink and the Table >> > Store. >> > > > How >> > > > > > > >> > pluggable >> > > > > > > >> > > is >> > > > > > > >> > > > > >>> this >> > > > > > > >> > > > > >>> > > >> solution, given the changes that need >> to be >> > > > made >> > > > > to >> > > > > > > >> Flink >> > > > > > > >> > in >> > > > > > > >> > > > > >>> order to >> > > > > > > >> > > > > >>> > > >> support this? >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> This is a good question, and I will try >> to >> > > > expand >> > > > > > it. >> > > > > > > >> Most >> > > > > > > >> > > of >> > > > > > > >> > > > > the >> > > > > > > >> > > > > >>> work >> > > > > > > >> > > > > >>> > > >> will >> > > > > > > >> > > > > >>> > > >> be completed in the Table Store, such as >> > the >> > > > new >> > > > > > > >> > > > SplitEnumerator >> > > > > > > >> > > > > >>> and >> > > > > > > >> > > > > >>> > > >> Source >> > > > > > > >> > > > > >>> > > >> implementation. The changes in Flink >> are as >> > > > > > followed: >> > > > > > > >> > > > > >>> > > >> 1) Flink job should put its job id in >> > context >> > > > > when >> > > > > > > >> > creating >> > > > > > > >> > > > > >>> > source/sink >> > > > > > > >> > > > > >>> > > to >> > > > > > > >> > > > > >>> > > >> help MetaService to create relationship >> > > between >> > > > > > source >> > > > > > > >> and >> > > > > > > >> > > > sink >> > > > > > > >> > > > > >>> > tables, >> > > > > > > >> > > > > >>> > > >> it's tiny >> > > > > > > >> > > > > >>> > > >> 2) Notify a listener when job is >> terminated >> > > in >> > > > > > Flink, >> > > > > > > >> and >> > > > > > > >> > > the >> > > > > > > >> > > > > >>> listener >> > > > > > > >> > > > > >>> > > >> implementation in Table Store will send >> > > "delete >> > > > > > event" >> > > > > > > >> to >> > > > > > > >> > > > > >>> MetaService. >> > > > > > > >> > > > > >>> > > >> 3) The changes are related to Flink >> > > Checkpoint >> > > > > > includes >> > > > > > > >> > > > > >>> > > >> a) Support triggering checkpoint with >> > > > > checkpoint >> > > > > > id >> > > > > > > >> by >> > > > > > > >> > > > > >>> > SplitEnumerator >> > > > > > > >> > > > > >>> > > >> b) Create the SplitEnumerator in Table >> > > Store >> > > > > > with a >> > > > > > > >> > > strategy >> > > > > > > >> > > > > to >> > > > > > > >> > > > > >>> > > perform >> > > > > > > >> > > > > >>> > > >> the specific checkpoint when all >> > > > > > "SplitEnumerator"s in >> > > > > > > >> the >> > > > > > > >> > > job >> > > > > > > >> > > > > >>> manager >> > > > > > > >> > > > > >>> > > >> trigger it. >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> Best, >> > > > > > > >> > > > > >>> > > >> Shammon >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn >> > > Visser < >> > > > > > > >> > > > > >>> > martijnvis...@apache.org >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > >> wrote: >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> > Hi all, >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > A couple of first comments on this: >> > > > > > > >> > > > > >>> > > >> > 1. I'm missing the problem statement >> in >> > the >> > > > > > overall >> > > > > > > >> > > > > >>> introduction. It >> > > > > > > >> > > > > >>> > > >> > immediately goes into proposal mode, I >> > > would >> > > > > > like to >> > > > > > > >> > first >> > > > > > > >> > > > > read >> > > > > > > >> > > > > >>> what >> > > > > > > >> > > > > >>> > > is >> > > > > > > >> > > > > >>> > > >> the >> > > > > > > >> > > > > >>> > > >> > actual problem, before diving into >> > > solutions. >> > > > > > > >> > > > > >>> > > >> > 2. "Each ETL job creates snapshots >> with >> > > > > > checkpoint >> > > > > > > >> info >> > > > > > > >> > on >> > > > > > > >> > > > > sink >> > > > > > > >> > > > > >>> > tables >> > > > > > > >> > > > > >>> > > >> in >> > > > > > > >> > > > > >>> > > >> > Table Store" -> That reads like >> you're >> > > > > proposing >> > > > > > > >> that >> > > > > > > >> > > > > snapshots >> > > > > > > >> > > > > >>> > need >> > > > > > > >> > > > > >>> > > >> to be >> > > > > > > >> > > > > >>> > > >> > written to Table Store? >> > > > > > > >> > > > > >>> > > >> > 3. If you introduce a MetaService, it >> > > becomes >> > > > > the >> > > > > > > >> single >> > > > > > > >> > > > point >> > > > > > > >> > > > > >>> of >> > > > > > > >> > > > > >>> > > >> failure >> > > > > > > >> > > > > >>> > > >> > because it coordinates everything. >> But I >> > > > can't >> > > > > > find >> > > > > > > >> > > anything >> > > > > > > >> > > > > in >> > > > > > > >> > > > > >>> the >> > > > > > > >> > > > > >>> > > >> FLIP on >> > > > > > > >> > > > > >>> > > >> > making the MetaService high available >> or >> > > how >> > > > to >> > > > > > deal >> > > > > > > >> > with >> > > > > > > >> > > > > >>> failovers >> > > > > > > >> > > > > >>> > > >> there. >> > > > > > > >> > > > > >>> > > >> > 4. The FLIP states under Rejected >> > > > Alternatives >> > > > > > > >> > "Currently >> > > > > > > >> > > > > >>> watermark >> > > > > > > >> > > > > >>> > in >> > > > > > > >> > > > > >>> > > >> > Flink cannot align data." which is not >> > > true, >> > > > > > given >> > > > > > > >> that >> > > > > > > >> > > > there >> > > > > > > >> > > > > is >> > > > > > > >> > > > > >>> > > >> FLIP-182 >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > 5. Given the MetaService role, it >> feels >> > > like >> > > > > > this is >> > > > > > > >> > > > > >>> introducing a >> > > > > > > >> > > > > >>> > > tight >> > > > > > > >> > > > > >>> > > >> > dependency between Flink and the Table >> > > Store. >> > > > > How >> > > > > > > >> > > pluggable >> > > > > > > >> > > > is >> > > > > > > >> > > > > >>> this >> > > > > > > >> > > > > >>> > > >> > solution, given the changes that need >> to >> > be >> > > > > made >> > > > > > to >> > > > > > > >> > Flink >> > > > > > > >> > > in >> > > > > > > >> > > > > >>> order >> > > > > > > >> > > > > >>> > to >> > > > > > > >> > > > > >>> > > >> > support this? >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > Best regards, >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > Martijn >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon >> > FY < >> > > > > > > >> > > > zjur...@gmail.com> >> > > > > > > >> > > > > >>> > wrote: >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> > > Hi devs: >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > I'd like to start a discussion about >> > > > > FLIP-276: >> > > > > > Data >> > > > > > > >> > > > > >>> Consistency of >> > > > > > > >> > > > > >>> > > >> > > Streaming and Batch ETL in Flink and >> > > Table >> > > > > > > >> Store[1]. >> > > > > > > >> > In >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> whole >> > > > > > > >> > > > > >>> > > data >> > > > > > > >> > > > > >>> > > >> > > stream processing, there are >> > consistency >> > > > > > problems >> > > > > > > >> such >> > > > > > > >> > > as >> > > > > > > >> > > > > how >> > > > > > > >> > > > > >>> to >> > > > > > > >> > > > > >>> > > >> manage >> > > > > > > >> > > > > >>> > > >> > the >> > > > > > > >> > > > > >>> > > >> > > dependencies of multiple jobs and >> > tables, >> > > > how >> > > > > > to >> > > > > > > >> > define >> > > > > > > >> > > > and >> > > > > > > >> > > > > >>> handle >> > > > > > > >> > > > > >>> > > E2E >> > > > > > > >> > > > > >>> > > >> > > delays, and how to ensure the data >> > > > > consistency >> > > > > > of >> > > > > > > >> > > queries >> > > > > > > >> > > > on >> > > > > > > >> > > > > >>> > flowing >> > > > > > > >> > > > > >>> > > >> > data? >> > > > > > > >> > > > > >>> > > >> > > This FLIP aims to support data >> > > consistency >> > > > > and >> > > > > > > >> answer >> > > > > > > >> > > > these >> > > > > > > >> > > > > >>> > > questions. >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > I'v discussed the details of this >> FLIP >> > > with >> > > > > > > >> @Jingsong >> > > > > > > >> > > Lee >> > > > > > > >> > > > > and >> > > > > > > >> > > > > >>> > > >> @libenchao >> > > > > > > >> > > > > >>> > > >> > > offline several times. We hope to >> > support >> > > > > data >> > > > > > > >> > > consistency >> > > > > > > >> > > > > of >> > > > > > > >> > > > > >>> > > queries >> > > > > > > >> > > > > >>> > > >> on >> > > > > > > >> > > > > >>> > > >> > > tables, managing relationships >> between >> > > > Flink >> > > > > > jobs >> > > > > > > >> and >> > > > > > > >> > > > tables >> > > > > > > >> > > > > >>> and >> > > > > > > >> > > > > >>> > > >> revising >> > > > > > > >> > > > > >>> > > >> > > tables on streaming in Flink and >> Table >> > > > Store >> > > > > to >> > > > > > > >> > improve >> > > > > > > >> > > > the >> > > > > > > >> > > > > >>> whole >> > > > > > > >> > > > > >>> > > data >> > > > > > > >> > > > > >>> > > >> > > stream processing. >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > Looking forward to your feedback. >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > [1] >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > > Best, >> > > > > > > >> > > > > >>> > > >> > > Shammon >> > > > > > > >> > > > > >>> > > >> > > >> > > > > > > >> > > > > >>> > > >> > >> > > > > > > >> > > > > >>> > > >> >> > > > > > > >> > > > > >>> > > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > >> > > > > > > >> > > > > >>> >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >