Hi Shammon, > I don't think we can combine watermarks and checkpoint barriers together to > guarantee data consistency. There will be a "Timestamp Barrier" in our > system to "commit data", "single etl failover", "low latency between ETLs" > and "strong data consistency with completed semantics" in the end.
Why do you think so? I've described to you above an alternative where we could be using watermarks for data consistency, regardless of what checkpointing/fault tolerance mechanism Flink would be using. Can you explain what's wrong with that approach? Let me rephrase it: 1. There is an independent mechanism that provides exactly-once guarantees, committing records/watermarks/events and taking care of the failover. It might be aligned, unaligned or task local checkpointing - this doesn't matter. Let's just assume we have such a mechanism. 2. There is a watermarking mechanism (it can be some kind of system versioning re-using watermarks code path if a user didn't configure watermarks), that takes care of the data consistency. Because watermarks from 2. are also subject to the exactly-once guarantees from the 1., once they are committed downstream systems (Flink jobs or other 3rd party systems) could just easily work with the committed watermarks to provide consistent view/snapshot of the tables. Any downstream system could always check what are the committed watermarks, select the watermark value (for example min across all used tables), and ask every table: please give me all of the data up until the selected watermark. Or give me all tables in the version for the selected watermark. Am I missing something? To me it seems like this way we can fully decouple the fault tolerance mechanism from the subject of the data consistency. Best, Piotrek czw., 15 gru 2022 o 13:01 Shammon FY <zjur...@gmail.com> napisał(a): > Hi Piotr, > > It's kind of amazing about the image, it's a simple example and I have to > put it in a document > > https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink > :) > > > Does it have to be combining watermarks and checkpoint barriers together? > > It's an interesting question. As we discussed above, what we need from > "Checkpoint" is the "Align Data Ability", and from "Watermark" is the > "Consistency Semantics", > > 1) Only "Align Data" can reach data consistency when performing queries on > upstream and downstream tables. I gave an example of "Global Count Tables" > in our previous discussion. We need a "Align Event" in the streaming > processing, it's the most basic. > > 2) Only "Timestamp" can provide complete consistency semantics. You gave > some good examples about "Window" and ect operators. > > I don't think we can combine watermarks and checkpoint barriers together to > guarantee data consistency. There will be a "Timestamp Barrier" in our > system to "commit data", "single etl failover", "low latency between ETLs" > and "strong data consistency with completed semantics" in the end. > > At the beginning I think we can do the simplest thing first: guarantee the > basic data consistency with a "Barrier Mechanism". In the current Flink > there's "Aligned Checkpoint" only, that's why we choose "Checkpoint" in our > FLIP. > > > I don't see an actual connection in the the implementation steps between > the checkpoint barriers approach and the watermark-like approach > > As I mentioned above, we choose "Checkpoint" to guarantee the basic data > consistency. But as we discussed, the most ideal solution is "Timestamp > Barrier". After the first stage is completed based on the "Checkpoint", we > need to evolve it to our ideal solution "Timestamp Barrier" (watermark-like > approach) in the next second or third stage. This does not mean upgrading > "Checkpoint Mechanism" in Flink. It means that after we implement a new > "Timestamp Barrier" or upgrade "Watermark" to support it, we can use it > instead of the current "Checkpoint Mechanism" directly in our "MetaService" > and "Table Store". > > In the discussion between @David and me, I summarized the work of upgrading > "Watermark" to support "Timestamp Barrier". It looks like a big job and you > can find the details in our discussion. I think we don't need to do that in > our first stage. > > Also in that discussion (my reply to @David) too, I briefly summarized the > work that needs to be done to use the new mechanism (Timestamp Barrier) > after we implement the basic function on "Checkpoint". It seems that the > work is not too big on my side, and it is feasible on the whole. > > Based on the above points, I think we can support basic data consistency on > "Checkpoint" in the first stage which is described in FLIP, and continue to > evolve it to "Timestamp Barrier" to support low latency between ETLs and > completed semantics in the second or third stage later. What do you think? > > Best, > Shammon > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Hi Shammon, > > > > > The following is a simple example. Data is transferred between ETL1, > ETL2 > > and ETL3 in Intermediate Table by Timestamp. > > > [image: simple_example.jpg] > > > > This time it's your image that doesn't want to load :) > > > > > Timestamp Barrier > > > > Does it have to be combining watermarks and checkpoint barriers together? > > Can we not achieve the same result with two independent processes > > checkpointing (regardless if this is a global aligned/unaligned > checkpoint, > > or a task local checkpoint) plus watermarking? Checkpointing would > provide > > exactly-once guarantees, and actually committing the results, and it > would > > be actually committing the last emitted watermark? From the perspective > of > > the sink/table, it shouldn't really matter how the exactly-once is > > achieved, and whether the job has performed an unaligned checkpoint or > > something completely different. It seems to me that the sink/table > > could/should be able to understand/work with only the basic information: > > here are records and watermarks (with at that point of time already fixed > > order), they are committed and will never change. > > > > > However, from the perspective of implementation complexity, I > personally > > think using Checkpoint in the first phase makes sense, what do you think? > > > > Maybe I'm missing something, but I don't see an actual connection in the > > implementation steps between the checkpoint barriers approach and the > > watermark-like approach. They seem to me (from the perspective of Flink > > runtime at least) like two completely different mechanisms. Not one > leading > > to the other. > > > > Best, > > Piotrek > > > > > > śr., 14 gru 2022 o 15:19 Shammon FY <zjur...@gmail.com> napisał(a): > > > > > Hi Piotr, > > > > > > Thanks for your valuable input which makes me consider the core point > of > > > data consistency in deep. I'd like to define the data consistency on > the > > > whole streaming & batch processing as follows and I hope that we can > have > > > an agreement on it: > > > > > > BOutput = Fn(BInput), BInput is a bounded input which is splitted from > > > unbounded streaming, Fn is the computation of a node or ETL, BOutput is > > the > > > bounded output of BInput. All the data in BInput and BOutput are > > unordered, > > > and BInput and BOutput are data consistent. > > > > > > The key points above include 1) the segment semantics of BInput; 2) the > > > computation semantics of Fn > > > > > > 1. The segment semantics of BInput > > > a) Transactionality of data. It is necessary to ensure the semantic > > > transaction of the bounded data set when it is splitted from the > > unbounded > > > streaming. For example, we cannot split multiple records in one > > transaction > > > to different bounded data sets. > > > b) Timeliness of data. Some data is related with time, such as boundary > > > data for a window. It is necessary to consider whether the bounded data > > set > > > needs to include a watermark which can trigger the window result. > > > c) Constraints of data. The Timestamp Barrier should perform some > > specific > > > operations after computation in operators, for example, force flush > data. > > > > > > Checkpoint Barrier misses all the semantics above, and we should > support > > > user to define Timestamp for data on Event Time or System Time > according > > to > > > the job and computation later. > > > > > > 2. The computation semantics of Fn > > > a) Deterministic computation > > > Most computations are deterministic such as map, filter, count, sum and > > > ect. They generate the same unordered result from the same unordered > > input > > > every time, and we can easily define data consistency on the input and > > > output for them. > > > > > > b) Non-deterministic computation > > > Some computations are non-deterministic. They will produce different > > > results from the same input every time. I try to divide them into the > > > following types: > > > 1) Non-deterministic computation semantics, such as rank operator. When > > it > > > computes multiple times (for example, failover), the first or last > output > > > results can both be the final result which will cause different > failover > > > handlers for downstream jobs. I will expand it later. > > > 2) Non-deterministic computation optimization, such as async io. It is > > > necessary to sync these operations when the barrier of input arrives. > > > 3) Deviation caused by data segmentat and computation semantics, such > as > > > Window. This requires that the users should customize the data > > segmentation > > > according to their needs correctly. > > > > > > Checkpoint Barrier matches a) and Timestamp Barrier can match all a) > and > > > b). > > > > > > We define data consistency of BInput and BOutput based all above. The > > > BOutput of upstream ETL will be the BInput of the next ETL, and > multiple > > > ETL jobs form a complex "ETL Topology". > > > > > > Based on the above definitions, I'd like to give a general proposal > with > > > "Timetamp Barrier" in my mind, it's not very detailed and please help > to > > > review it and feel free to comment @David, @Piotr > > > > > > 1. Data segment with Timestamp > > > a) Users can define the Timestamp Barrier with System Time, Event Time. > > > b) Source nodes generate the same Timestamp Barrier after reading data > > > from RootTable > > > c) There is a same Timetamp data in each record according to Timestamp > > > Barrier, such as (a, T), (b, T), (c, T), (T, barrier) > > > > > > 2. Computation with Timestamp > > > a) Records are unordered with the same Timestamp. Stateless operators > > such > > > as map/flatmap/filter can process data without aligning Timestamp > > Barrier, > > > which is different from Checkpoint Barrier. > > > b) Records between Timestamp are ordered. Stateful operators must align > > > data and compute by each Timestamp, then compute by Timetamp sequence. > > > c) Stateful operators will output results of specific Timestamp after > > > computation. > > > d) Sink operator "commit records" with specific Timestamp and report > the > > > status to JobManager > > > > > > 3. Read data with Timestamp > > > a) Downstream ETL reads data according to Timestamp after upstream ETL > > > "commit" it. > > > b) Stateful operators interact with state when computing data of > > > Timestamp, but they won't trigger checkpoint for every Timestamp. > > Therefore > > > source ETL job can generate Timestamp every few seconds or even > hundreds > > of > > > milliseconds > > > c) Based on Timestamp the delay between ETL jobs will be very small, > and > > > in the best case the E2E latency maybe only tens of seconds. > > > > > > 4. Failover and Recovery > > > ETL jobs are cascaded through the Intermediate Table. After a single > ETL > > > job fails, it needs to replay the input data and recompute the results. > > As > > > you mentioned, whether the cascaded ETL jobs are restarted depends on > the > > > determinacy of the intermediate data between them. > > > a) An ETL job will rollback and reread data from upstream ETL by > specific > > > Timestamp according to the Checkpoint. > > > b) According to the management of Checkpoint and Timestamp, ETL can > > replay > > > all Timestamp and data after failover, which means BInput is the same > > > before and after failover. > > > > > > c) For deterministic Fn, it generates the same BOutput from the same > > BInput > > > 1) If there's no data of the specific Timestamp in the sink table, ETL > > > just "commit" it as normal. > > > 2) If the Timestamp data exists in the sink table, ETL can just discard > > > the new data. > > > > > > d) For non-deterministic Fn, it generates different BOutput from the > same > > > BInput before and after failover. For example, BOutput1 before failover > > and > > > BOutput2 after failover. The state in ETL is consistent with BOutput2. > > > There are two cases according to users' requirements > > > 1) Users can accept BOutput1 as the final output and downstream ETLs > > don't > > > need to restart. Sink in ETL can discard BOutput2 directly if the > > Timestamp > > > exists in the sink table. > > > 2) Users only accept BOutput2 as the final output, then all the > > downstream > > > ETLs and Intermediate Table should rollback to specific Timestamp, the > > > downstream ETLs should be restarted too. > > > > > > The following is a simple example. Data is transferred between ETL1, > ETL2 > > > and ETL3 in Intermediate Table by Timestamp. > > > [image: simple_example.jpg] > > > > > > Besides Timestamp, there's a big challenge in Intermediate Table. It > > > should support a highly implemented "commit Timestamp snapshot" with > high > > > throughput, which requires the Table Store to enhance streaming > > > capabilities like pulsar or kafka. > > > > > > In this FLIP, we plan to implement the proposal with Checkpoint, the > > above > > > Timestamp can be replaced by Checkpoint. Of course, Checkpoint has some > > > problems. I think we have reached some consensus in the discussion > about > > > the Checkpoint problems, including data segment semantics, flush data > of > > > some operators, and the increase of E2E delay. However, from the > > > perspective of implementation complexity, I personally think using > > > Checkpoint in the first phase makes sense, what do you think? > > > > > > Finally, I think I misunderstood the "Rolling Checkpoint" and "All at > > once > > > Checkpoint" in my last explanation which you and @David mentioned. I > > > thought their differences were mainly to select different table > versions > > > for queries. According to your reply, I think it is whether there are > > > multiple "rolling checkpoints" in each ETL job, right? If I understand > > > correctly, the "Rolling Checkpoint" is a good idea, and we can > guarantee > > > "Strong Data Consistency" between multiple tables in MetaService for > > > queries. Thanks. > > > > > > Best, > > > Shammon > > > > > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski <pnowoj...@apache.org> > > > wrote: > > > > > >> Hi Shammon, > > >> > > >> Thanks for the explanations, I think I understand the problem better > > now. > > >> I have a couple of follow up questions, but first: > > >> > > >> >> 3. I'm pretty sure there are counter examples, where your proposed > > >> mechanism of using checkpoints (even aligned!) will produce > > >> inconsistent data from the perspective of the event time. > > >> >> a) For example what if one of your "ETL" jobs, has the following > > DAG: > > >> >> > > >> >> Even if you use aligned checkpoints for committing the data to the > > >> sink table, the watermarks of "Window1" and "Window2" are completely > > >> independent. The sink table might easily have data from the > Src1/Window1 > > >> from the event time T1 and Src2/Window2 from later event time T2. > > >> >> b) I think the same applies if you have two completely > > >> independent ETL jobs writing either to the same sink table, or two to > > >> different sink tables (that are both later used in the same downstream > > job). > > >> > > > >> > Thank you for your feedback. I cannot see the DAG in 3.a in your > > reply, > > >> > > >> I've attached the image directly. I hope you can see it now. > > >> > > >> Basically what I meant is that if you have a topology like (from the > > >> attached image): > > >> > > >> window1 = src1.keyBy(...).window(...) > > >> window2 = src2.keyBy(...).window(...) > > >> window1.join(window2, ...).addSink(sink) > > >> > > >> or with even simpler (note no keyBy between `src` and `process`): > > >> > > >> src.process(some_function_that_buffers_data)..addSink(sink) > > >> > > >> you will have the same problem. Generally speaking if there is an > > >> operator buffering some data, and if the data are not flushed on every > > >> checkpoint (any windowed or temporal operator, AsyncWaitOperator, CEP, > > >> ...), you can design a graph that will produce "inconsistent" data as > > part > > >> of a checkpoint. > > >> > > >> Apart from that a couple of other questions/issues. > > >> > > >> > 1) Global Checkpoint Commit: a) "rolling fashion" or b) altogether > > >> > > >> Do we need to support the "altogether" one? Rolling checkpoint, as > it's > > >> more independent, I could see it scale much better, and avoid a lot of > > >> problems that I mentioned before. > > >> > > >> > 1) Checkpoint VS Watermark > > >> > > > >> > 1. Stateful Computation is aligned according to Timestamp Barrier > > >> > > >> Indeed the biggest obstacle I see here, is that we would indeed most > > >> likely have: > > >> > > >> > b) Similar to the window operator, align data in memory according to > > >> Timestamp. > > >> > > >> for every operator. > > >> > > >> > 4. Failover supports Timestamp fine-grained data recovery > > >> > > > >> > As we mentioned in the FLIP, each ETL is a complex single node. A > > single > > >> > ETL job failover should not cause the failure of the entire "ETL > > >> Topology". > > >> > > >> I don't understand this point. Regardless if we are using > > >> rolling checkpoints, all at once checkpoints or watermarks, I see the > > same > > >> problems with non determinism, if we want to preserve the requirement > to > > >> not fail over the whole topology at once. > > >> > > >> Both Watermarks and "rolling checkpoint" I think have the same issue, > > >> that either require deterministic logic, or global failover, or > > downstream > > >> jobs can only work on the already committed by the upstream records. > But > > >> working with only "committed records" would either brake consistency > > >> between different jobs, or would cause huge delay in checkpointing and > > e2e > > >> latency, as: > > >> 1. upstream job has to produce some data, downstream can not process > it, > > >> downstream can not process this data yet > > >> 2. checkpoint 42 is triggered on the upstream job > > >> 3. checkpoint 42 is completed on the upstream job, data processed > since > > >> last checkpoint has been committed > > >> 4. upstream job can continue producing more data > > >> 5. only now downstream can start processing the data produced in 1., > but > > >> it can not read the not-yet-committed data from 4. > > >> 6. once downstream finishes processing data from 1., it can trigger > > >> checkpoint 42 > > >> > > >> The "all at once checkpoint", I can see only working with global > > failover > > >> of everything. > > >> > > >> This is assuming exactly-once mode. at-least-once would be much > easier. > > >> > > >> Best, > > >> Piotrek > > >> > > >> wt., 13 gru 2022 o 08:57 Shammon FY <zjur...@gmail.com> napisał(a): > > >> > > >>> Hi David, > > >>> > > >>> Thanks for the comments from you and @Piotr. I'd like to explain the > > >>> details about the FLIP first. > > >>> > > >>> 1) Global Checkpoint Commit: a) "rolling fashion" or b) altogether > > >>> > > >>> This mainly depends on the needs of users. Users can decide the data > > >>> version of tables in their queries according to different > requirements > > >>> for > > >>> data consistency and freshness. Since we manage multiple versions for > > >>> each > > >>> table, this will not bring too much complexity to the system. We only > > >>> need > > >>> to support different strategies when calculating table versions for > > >>> query. > > >>> So we give this decision to users, who can use "consistency.type" to > > set > > >>> different consistency in "Catalog". We can continue to refine this > > later. > > >>> For example, dynamic parameters support different consistency > > >>> requirements > > >>> for each query > > >>> > > >>> 2) MetaService module > > >>> > > >>> Many Flink streaming jobs use application mode, and they are > > independent > > >>> of > > >>> each other. So we currently assume that MetaService is an independent > > >>> node. > > >>> In the first phase, it will be started in standalone, and HA will be > > >>> supported later. This node will reuse many Flink modules, including > > REST, > > >>> Gateway-RpcServer, etc. We hope that the core functions of > MetaService > > >>> can > > >>> be developed as a component. When Flink subsequently uses a large > > session > > >>> cluster to support various computations, it can be integrated into > the > > >>> "ResourceManager" as a plug-in component. > > >>> > > >>> Besides above, I'd like to describe the Checkpoint and Watermark > > >>> mechanisms > > >>> in detail as follows. > > >>> > > >>> 1) Checkpoint VS Watermark > > >>> > > >>> As you mentioned, I think it's very correct that what we want in the > > >>> Checkpoint is to align streaming computation and data according to > > >>> certain > > >>> semantics. Timestamp is a very ideal solution. To achieve this goal, > we > > >>> can > > >>> think of the following functions that need to be supported in the > > >>> Watermark > > >>> mechanism: > > >>> > > >>> 1. Stateful Computation is aligned according to Timestamp Barrier > > >>> > > >>> As the "three tables example" we discussed above, we need to align > the > > >>> stateful operator computation according to the barrier to ensure the > > >>> consistency of the result data. In order to align the computation, > > there > > >>> are two ways in my mind > > >>> > > >>> a) Similar to the Aligned Checkpoint Barrier. Timestamp Barrier > aligns > > >>> data > > >>> according to the channel, which will lead to backpressure just like > the > > >>> aligned checkpoint. It seems not a good idea. > > >>> > > >>> b) Similar to the window operator, align data in memory according to > > >>> Timestamp. Two steps need to be supported here: first, data is > aligned > > by > > >>> timestamp for state operators; secondly, Timestamp is strictly > > >>> sequential, > > >>> global aggregation operators need to perform aggregation in timestamp > > >>> order > > >>> and output the final results. > > >>> > > >>> 2. Coordinate multiple source nodes to assign unified Timestamp > > Barriers > > >>> > > >>> Since the stateful operator needs to be aligned according to the > > >>> Timestamp > > >>> Barrier, source subtasks of multiple jobs should generate the same > > >>> Timestamp Barrier. ETL jobs consuming RootTable should interact with > > >>> "MetaService" to generate the same Timestamp T1, T2, T3 ... and so > on. > > >>> > > >>> 3. JobManager needs to manage the completed Timestamp Barrier > > >>> > > >>> When the Timestamp Barrier of the ETL job has been completed, it > means > > >>> that > > >>> the data of the specified Timestamp can be queried by users. > JobManager > > >>> needs to summarize its Timestamp processing and report the completed > > >>> Timestamp and data snapshots to the MetaServer. > > >>> > > >>> 4. Failover supports Timestamp fine-grained data recovery > > >>> > > >>> As we mentioned in the FLIP, each ETL is a complex single node. A > > single > > >>> ETL job failover should not cause the failure of the entire "ETL > > >>> Topology". > > >>> This requires that the result data of Timestamp generated by upstream > > ETL > > >>> should be deterministic. > > >>> > > >>> a) The determinacy of Timestamp, that is, before and after ETL job > > >>> failover, the same Timestamp sequence must be generated. Each > > Checkpoint > > >>> needs to record the included Timestamp list, especially the source > node > > >>> of > > >>> the RootTable. After Failover, it needs to regenerate Timestamp > > according > > >>> to the Timestamp list. > > >>> > > >>> b) The determinacy of Timestamp data, that is, the same Timestamp > needs > > >>> to > > >>> replay the same data before and after Failover, and generate the same > > >>> results in Sink Table. Each Timestamp must save start and end offsets > > (or > > >>> snapshot id) of RootTable. After failover, the source nodes need to > > >>> replay > > >>> the data according to the offset to ensure that the data of each > > >>> Timestamp > > >>> is consistent before and after Failover. > > >>> > > >>> For the specific requirements and complexity, please help to review > > when > > >>> you are free @David @Piotr, thanks :) > > >>> > > >>> 2) Evolution from Checkpoint to Timestamp Mechanism > > >>> > > >>> You give a very important question in your reply which I missed > before: > > >>> if > > >>> Aligned Checkpoint is used in the first stage, how complex is the > > >>> evolution > > >>> from Checkpoint to Timestamp later? I made a general comparison here, > > >>> which > > >>> may not be very detailed. There are three roles in the whole system: > > >>> MetaService, Flink ETL Job and Table Store. > > >>> > > >>> a) MetaService > > >>> > > >>> It manages the data consistency among multiple ETL jobs, including > > >>> coordinating the Barrier for the Source ETL nodes, setting the > starting > > >>> Barrier for ETL job startup, and calculating the Table version for > > >>> queries > > >>> according to different strategies. It has little to do with > Checkpoint > > in > > >>> fact, we can pay attention to it when designing the API and > > implementing > > >>> the functions. > > >>> > > >>> b) Flink ETL Job > > >>> > > >>> At present, the workload is relatively small and we need to trigger > > >>> checkpoints in CheckpointCoordinator manually by SplitEnumerator. > > >>> > > >>> c) Table Store > > >>> > > >>> Table Store mainly provides the ability to write and read data. > > >>> > > >>> c.1) Write data. At present, Table Store generates snapshots > according > > to > > >>> two phases in Flink. When using Checkpoint as consistency management, > > we > > >>> need to write checkpoint information to snapshots. After using > > Timestamp > > >>> Barrier, the snapshot in Table Store may be disassembled more finely, > > and > > >>> we need to write Timestamp information to the data file. A > > "checkpointed > > >>> snapshot" may contain multiple "Timestamp snapshots". > > >>> > > >>> c.2) Read data. The SplitEnumerator that reads data from the Table > > Store > > >>> will manage multiple splits according to the version number. After > the > > >>> specified splits are completed, it sends a Barrier command to > trigger a > > >>> checkpoint in the ETL job. The source node will broadcast the > > checkpoint > > >>> barrier downstream after receiving it. When using Timestamp Barrier, > > the > > >>> overall process is similar, but the SplitEnumerator does not need to > > >>> trigger a checkpoint to the Flink ETL, and the Source node needs to > > >>> support > > >>> broadcasting Timestamp Barrier to the downstream at that time. > > >>> > > >>> From the above overall, the evolution complexity from Checkpoint to > > >>> Timestamp seems controllable, but the specific implementation needs > > >>> careful > > >>> design, and the concept and features of Checkpoint should not be > > >>> introduced > > >>> too much into relevant interfaces and functions. > > >>> > > >>> What do you think of it? Looking forward to your feedback, thanks > > >>> > > >>> Best, > > >>> Shammon > > >>> > > >>> > > >>> > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek <d...@apache.org> > > wrote: > > >>> > > >>> > Hi Shammon, > > >>> > > > >>> > I'm starting to see what you're trying to achieve, and it's really > > >>> > exciting. I share Piotr's concerns about e2e latency and disability > > to > > >>> use > > >>> > unaligned checkpoints. > > >>> > > > >>> > I have a couple of questions that are not clear to me from going > over > > >>> the > > >>> > FLIP: > > >>> > > > >>> > 1) Global Checkpoint Commit > > >>> > > > >>> > Are you planning on committing the checkpoints in a) a "rolling > > >>> fashion" - > > >>> > one pipeline after another, or b) altogether - once the data have > > been > > >>> > processed by all pipelines? > > >>> > > > >>> > Option a) would be eventually consistent (for batch queries, you'd > > >>> need to > > >>> > use the last checkpoint produced by the most downstream table), > > >>> whereas b) > > >>> > would be strongly consistent at the cost of increasing the e2e > > latency > > >>> even > > >>> > more. > > >>> > > > >>> > I feel that option a) is what this should be headed for. > > >>> > > > >>> > 2) MetaService > > >>> > > > >>> > Should this be a new general Flink component or one specific to the > > >>> Flink > > >>> > Table Store? > > >>> > > > >>> > 3) Follow-ups > > >>> > > > >>> > From the above discussion, there is a consensus that, in the ideal > > >>> case, > > >>> > watermarks would be a way to go, but there is some underlying > > mechanism > > >>> > missing. It would be great to discuss this option in more detail to > > >>> compare > > >>> > the solutions in terms of implementation cost, maybe it could not > be > > as > > >>> > complex. > > >>> > > > >>> > > > >>> > All in all, I don't feel that checkpoints are suitable for > providing > > >>> > consistent table versioning between multiple pipelines. The main > > >>> reason is > > >>> > that they are designed to be a fault tolerance mechanism. Somewhere > > >>> between > > >>> > the lines, you've already noted that the primitive you're looking > for > > >>> is > > >>> > cross-pipeline barrier alignment, which is the mechanism a subset > of > > >>> > currently supported checkpointing implementations happen to be > using. > > >>> Is > > >>> > that correct? > > >>> > > > >>> > My biggest concern is that tying this with a "side-effect" of the > > >>> > checkpointing mechanism could block us from evolving it further. > > >>> > > > >>> > Best, > > >>> > D. > > >>> > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY <zjur...@gmail.com> > > wrote: > > >>> > > > >>> > > Hi Piotr, > > >>> > > > > >>> > > Thank you for your feedback. I cannot see the DAG in 3.a in your > > >>> reply, > > >>> > but > > >>> > > I'd like to answer some questions first. > > >>> > > > > >>> > > Your understanding is very correct. We want to align the data > > >>> versions of > > >>> > > all intermediate tables through checkpoint mechanism in Flink. > I'm > > >>> sorry > > >>> > > that I have omitted some default constraints in FLIP, including > > only > > >>> > > supporting aligned checkpoints; one table can only be written by > > one > > >>> ETL > > >>> > > job. I will add these later. > > >>> > > > > >>> > > Why can't the watermark mechanism achieve the data consistency we > > >>> wanted? > > >>> > > For example, there are 3 tables, Table1 is word table, Table2 is > > >>> > word->cnt > > >>> > > table and Table3 is cnt1->cnt2 table. > > >>> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO Table2 SELECT word, > > >>> count(*) > > >>> > > FROM Table1 GROUP BY word > > >>> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO Table3 SELECT cnt, > > >>> count(*) > > >>> > FROM > > >>> > > Table2 GROUP BY cnt > > >>> > > > > >>> > > ETL1 has 2 subtasks to read multiple buckets from Table1, where > > >>> subtask1 > > >>> > > reads streaming data as [a, b, c, a, d, a, b, c, d ...] and > > subtask2 > > >>> > reads > > >>> > > streaming data as [a, c, d, q, a, v, c, d ...]. > > >>> > > > > >>> > > 1. Unbounded streaming data is divided into multiple sets > according > > >>> to > > >>> > some > > >>> > > semantic requirements. The most extreme may be one set for each > > data. > > >>> > > Assume that the sets of subtask1 and subtask2 separated by the > same > > >>> > > semantics are [a, b, c, a, d] and [a, c, d, q], respectively. > > >>> > > > > >>> > > 2. After the above two sets are computed by ETL1, the result data > > >>> > generated > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d, 2), (q, 1)]. > > >>> > > > > >>> > > 3. The result data generated in Table 3 after the data in Table 2 > > is > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3, 1)] > > >>> > > > > >>> > > We want to align the data of Table1, Table2 and Table3 and manage > > the > > >>> > data > > >>> > > versions. When users execute OLAP/Batch queries join on these > > >>> tables, the > > >>> > > following consistency data can be found > > >>> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q] > > >>> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2], [q, 1] > > >>> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1] > > >>> > > > > >>> > > Users can perform query: SELECT t1.word, t2.cnt, t3.cnt2 from > > Table1 > > >>> t1 > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on t1.word=t2.word and > > t2.cnt=t3.cnt1; > > >>> > > > > >>> > > In the view of users, the data is consistent on a unified > "version" > > >>> > between > > >>> > > Table1, Table2 and Table3. > > >>> > > > > >>> > > In the current Flink implementation, the aligned checkpoint can > > >>> achieve > > >>> > the > > >>> > > above capabilities (let's ignore the segmentation semantics of > > >>> checkpoint > > >>> > > first). Because the Checkpoint Barrier will align the data when > > >>> > performing > > >>> > > the global Count aggregation, we can associate the snapshot with > > the > > >>> > > checkpoint in the Table Store, query the specified snapshot of > > >>> > > Table1/Table2/Table3 through the checkpoint, and achieve the > > >>> consistency > > >>> > > requirements of the above unified "version". > > >>> > > > > >>> > > Current watermark mechanism in Flink cannot achieve the above > > >>> > consistency. > > >>> > > For example, we use watermark to divide data into multiple sets > in > > >>> > subtask1 > > >>> > > and subtask2 as followed > > >>> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a, T1), (d, T1)], T1, > [(a, > > >>> T2), > > >>> > > (b, T2), (c, T2), (d, T2)], T2 > > >>> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1, .... > > >>> > > > > >>> > > As Flink watermark does not have barriers and cannot align data, > > ETL1 > > >>> > Count > > >>> > > operator may compute the data of subtask1 first: [(a, T1), (b, > T1), > > >>> (c, > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b, T2)], then compute the > > >>> data of > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1, which is not > > >>> possible > > >>> > > in aligned checkpoint. > > >>> > > > > >>> > > In this order, the result output to Table2 after the Count > > >>> aggregation > > >>> > will > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1), (a, 2, T1), (a, 3, T2), > (b, > > >>> 2, > > >>> > T2), > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1, T1), which can be > > >>> simplified > > >>> > as: > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4, T1), (c, 2, T1), (d, > 1, > > >>> T1), > > >>> > > (q, 1, T1)] > > >>> > > > > >>> > > There's no (a, 3, T1), we have been unable to query consistent > data > > >>> > results > > >>> > > on Table1 and Table2 according to T1. Table 3 has the same > problem. > > >>> > > > > >>> > > In addition to using Checkpoint Barrier, the other implementation > > >>> > > supporting watermark above is to convert Count aggregation into > > >>> Window > > >>> > > Count. After the global Count is converted into window operator, > it > > >>> needs > > >>> > > to support cross window data computation. Similar to the data > > >>> > relationship > > >>> > > between the previous and the current Checkpoint, it is equivalent > > to > > >>> > > introducing the Watermark Barrier, which requires adjustments to > > the > > >>> > > current Flink Watermark mechanism. > > >>> > > > > >>> > > Besides the above global aggregation, there are window operators > in > > >>> > Flink. > > >>> > > I don't know if my understanding is correct(I cannot see the DAG > in > > >>> your > > >>> > > example), please correct me if it's wrong. I think you raise a > very > > >>> > > important and interesting question: how to define data > consistency > > in > > >>> > > different window computations which will generate different > > >>> timestamps of > > >>> > > the same data. This situation also occurs when using event time > to > > >>> align > > >>> > > data. At present, what I can think of is to store these > information > > >>> in > > >>> > > Table Store, users can perform filter or join on data with them. > > This > > >>> > FLIP > > >>> > > is our first phase, and the specific implementation of this will > be > > >>> > > designed and considered in the next phase and FLIP. > > >>> > > > > >>> > > Although the Checkpoint Barrier can achieve the most basic > > >>> consistency, > > >>> > as > > >>> > > you mentioned, using the Checkpoint mechanism will cause many > > >>> problems, > > >>> > > including the increase of checkpoint time for multiple cascade > > jobs, > > >>> the > > >>> > > increase of E2E data freshness time (several minutes or even > dozens > > >>> of > > >>> > > minutes), and the increase of the overall system complexity. At > the > > >>> same > > >>> > > time, the semantics of Checkpoint data segmentation is unclear. > > >>> > > > > >>> > > The current FLIP is the first phase of our whole proposal, and > you > > >>> can > > >>> > find > > >>> > > the follow-up plan in our future worker. In the first stage, we > do > > >>> not > > >>> > want > > >>> > > to modify the Flink mechanism. We'd like to realize basic system > > >>> > functions > > >>> > > based on existing mechanisms in Flink, including the relationship > > >>> > > management of ETL and tables, and the basic data consistency, so > we > > >>> > choose > > >>> > > Global Checkpoint in our FLIP. > > >>> > > > > >>> > > We agree with you very much that event time is more suitable for > > data > > >>> > > consistency management. We'd like consider this matter in the > > second > > >>> or > > >>> > > third stage after the current FLIP. We hope to improve the > > watermark > > >>> > > mechanism in Flink to support barriers. As you mentioned in your > > >>> reply, > > >>> > we > > >>> > > can achieve data consistency based on timestamp, while > maintaining > > >>> E2E > > >>> > data > > >>> > > freshness of seconds or even milliseconds for 10+ cascaded jobs. > > >>> > > > > >>> > > What do you think? Thanks > > >>> > > > > >>> > > Best, > > >>> > > Shammon > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr Nowojski < > > pnowoj...@apache.org> > > >>> > > wrote: > > >>> > > > > >>> > > > Hi Shammon, > > >>> > > > > > >>> > > > Do I understand it correctly, that you effectively want to > expand > > >>> the > > >>> > > > checkpoint alignment mechanism across many different jobs and > > hand > > >>> over > > >>> > > > checkpoint barriers from upstream to downstream jobs using the > > >>> > > intermediate > > >>> > > > tables? > > >>> > > > > > >>> > > > Re the watermarks for the "Rejected Alternatives". I don't > > >>> understand > > >>> > why > > >>> > > > this has been rejected. Could you elaborate on this point? Here > > >>> are a > > >>> > > > couple of my thoughts on this matter, but please correct me if > > I'm > > >>> > wrong, > > >>> > > > as I haven't dived deeper into this topic. > > >>> > > > > > >>> > > > > As shown above, there are 2 watermarks T1 and T2, T1 < T2. > > >>> > > > > The StreamTask reads data in order: > > >>> > > > V11,V12,V21,T1(channel1),V13,T1(channel2). > > >>> > > > > At this time, StreamTask will confirm that watermark T1 is > > >>> completed, > > >>> > > > but the data beyond > > >>> > > > > T1 has been processed(V13) and the results are written to the > > >>> sink > > >>> > > > table. > > >>> > > > > > >>> > > > 1. I see the same "problem" with unaligned checkpoints in your > > >>> current > > >>> > > > proposal. > > >>> > > > 2. I don't understand why this is a problem? Just store in the > > >>> "sink > > >>> > > > table" what's the watermark (T1), and downstream jobs should > > >>> process > > >>> > the > > >>> > > > data with that "watermark" anyway. Record "V13" should be > treated > > >>> as > > >>> > > > "early" data. Downstream jobs if: > > >>> > > > a) they are streaming jobs, for example they should aggregate > it > > >>> in > > >>> > > > windowed/temporal state, but they shouldn't produce the result > > that > > >>> > > > contains it, as the watermark T2 was not yet processed. Or they > > >>> would > > >>> > > just > > >>> > > > pass that record as "early" data. > > >>> > > > b) they are batch jobs, it looks to me like batch jobs > shouldn't > > >>> take > > >>> > > > "all available data", but only consider "all the data until > some > > >>> > > > watermark", for example the latest available: T1 > > >>> > > > > > >>> > > > 3. I'm pretty sure there are counter examples, where your > > proposed > > >>> > > > mechanism of using checkpoints (even aligned!) will produce > > >>> > > > inconsistent data from the perspective of the event time. > > >>> > > > a) For example what if one of your "ETL" jobs, has the > > following > > >>> DAG: > > >>> > > > [image: flip276.jpg] > > >>> > > > Even if you use aligned checkpoints for committing the data > to > > >>> the > > >>> > sink > > >>> > > > table, the watermarks of "Window1" and "Window2" are completely > > >>> > > > independent. The sink table might easily have data from the > > >>> > Src1/Window1 > > >>> > > > from the event time T1 and Src2/Window2 from later event time > T2. > > >>> > > > b) I think the same applies if you have two completely > > >>> independent > > >>> > ETL > > >>> > > > jobs writing either to the same sink table, or two to different > > >>> sink > > >>> > > tables > > >>> > > > (that are both later used in the same downstream job). > > >>> > > > > > >>> > > > 4a) I'm not sure if I like the idea of centralising the whole > > >>> system in > > >>> > > > this way. If you have 10 jobs, the likelihood of the checkpoint > > >>> failure > > >>> > > > will be 10 times higher, and/or the duration of the checkpoint > > can > > >>> be > > >>> > > much > > >>> > > > much longer (especially under backpressure). And this is > actually > > >>> > > already a > > >>> > > > limitation of Apache Flink (global checkpoints are more prone > to > > >>> fail > > >>> > the > > >>> > > > larger the scale), so I would be anxious about making it > > >>> potentially > > >>> > > even a > > >>> > > > larger issue. > > >>> > > > 4b) I'm also worried about increased complexity of the system > > after > > >>> > > adding > > >>> > > > the global checkpoint, and additional (single?) point of > failure. > > >>> > > > 5. Such a design would also not work if we ever wanted to have > > task > > >>> > local > > >>> > > > checkpoints. > > >>> > > > > > >>> > > > All in all, it seems to me like actually the watermarks and > even > > >>> time > > >>> > are > > >>> > > > the better concept in this context that should have been used > for > > >>> > > > synchronising and data consistency across the whole system. > > >>> > > > > > >>> > > > Best, > > >>> > > > Piotrek > > >>> > > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY <zjur...@gmail.com> > > >>> napisał(a): > > >>> > > > > > >>> > > >> Hi @Martijn > > >>> > > >> > > >>> > > >> Thanks for your comments, and I'd like to reply to them > > >>> > > >> > > >>> > > >> 1. It sounds good to me, I'll update the content structure in > > FLIP > > >>> > later > > >>> > > >> and give the problems first. > > >>> > > >> > > >>> > > >> 2. "Each ETL job creates snapshots with checkpoint info on > sink > > >>> tables > > >>> > > in > > >>> > > >> Table Store" -> That reads like you're proposing that > snapshots > > >>> need > > >>> > to > > >>> > > >> be > > >>> > > >> written to Table Store? > > >>> > > >> > > >>> > > >> Yes. To support the data consistency in the FLIP, we need to > get > > >>> > through > > >>> > > >> checkpoints in Flink and snapshots in store, this requires a > > close > > >>> > > >> combination of Flink and store implementation. In the first > > stage > > >>> we > > >>> > > plan > > >>> > > >> to implement it based on Flink and Table Store only, snapshots > > >>> written > > >>> > > to > > >>> > > >> external storage don't support consistency. > > >>> > > >> > > >>> > > >> 3. If you introduce a MetaService, it becomes the single point > > of > > >>> > > failure > > >>> > > >> because it coordinates everything. But I can't find anything > in > > >>> the > > >>> > FLIP > > >>> > > >> on > > >>> > > >> making the MetaService high available or how to deal with > > >>> failovers > > >>> > > there. > > >>> > > >> > > >>> > > >> I think you raise a very important problem and I missed it in > > >>> FLIP. > > >>> > The > > >>> > > >> MetaService is a single point and should support failover, we > > >>> will do > > >>> > it > > >>> > > >> in > > >>> > > >> future in the first stage we only support standalone mode, THX > > >>> > > >> > > >>> > > >> 4. The FLIP states under Rejected Alternatives "Currently > > >>> watermark in > > >>> > > >> Flink cannot align data." which is not true, given that there > is > > >>> > > FLIP-182 > > >>> > > >> > > >>> > > >> > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > >>> > > >> > > >>> > > >> Watermark alignment in FLIP-182 is different from requirements > > >>> > > "watermark > > >>> > > >> align data" in our FLIP. FLIP-182 aims to fix watermark > > >>> generation in > > >>> > > >> different sources for "slight imbalance or data skew", which > > >>> means in > > >>> > > some > > >>> > > >> cases the source must generate watermark even if they should > > not. > > >>> When > > >>> > > the > > >>> > > >> operator collects watermarks, the data processing is as > > described > > >>> in > > >>> > our > > >>> > > >> FLIP, and the data cannot be aligned through the barrier like > > >>> > > Checkpoint. > > >>> > > >> > > >>> > > >> 5. Given the MetaService role, it feels like this is > > introducing a > > >>> > tight > > >>> > > >> dependency between Flink and the Table Store. How pluggable is > > >>> this > > >>> > > >> solution, given the changes that need to be made to Flink in > > >>> order to > > >>> > > >> support this? > > >>> > > >> > > >>> > > >> This is a good question, and I will try to expand it. Most of > > the > > >>> work > > >>> > > >> will > > >>> > > >> be completed in the Table Store, such as the new > SplitEnumerator > > >>> and > > >>> > > >> Source > > >>> > > >> implementation. The changes in Flink are as followed: > > >>> > > >> 1) Flink job should put its job id in context when creating > > >>> > source/sink > > >>> > > to > > >>> > > >> help MetaService to create relationship between source and > sink > > >>> > tables, > > >>> > > >> it's tiny > > >>> > > >> 2) Notify a listener when job is terminated in Flink, and the > > >>> listener > > >>> > > >> implementation in Table Store will send "delete event" to > > >>> MetaService. > > >>> > > >> 3) The changes are related to Flink Checkpoint includes > > >>> > > >> a) Support triggering checkpoint with checkpoint id by > > >>> > SplitEnumerator > > >>> > > >> b) Create the SplitEnumerator in Table Store with a strategy > > to > > >>> > > perform > > >>> > > >> the specific checkpoint when all "SplitEnumerator"s in the job > > >>> manager > > >>> > > >> trigger it. > > >>> > > >> > > >>> > > >> > > >>> > > >> Best, > > >>> > > >> Shammon > > >>> > > >> > > >>> > > >> > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn Visser < > > >>> > martijnvis...@apache.org > > >>> > > > > > >>> > > >> wrote: > > >>> > > >> > > >>> > > >> > Hi all, > > >>> > > >> > > > >>> > > >> > A couple of first comments on this: > > >>> > > >> > 1. I'm missing the problem statement in the overall > > >>> introduction. It > > >>> > > >> > immediately goes into proposal mode, I would like to first > > read > > >>> what > > >>> > > is > > >>> > > >> the > > >>> > > >> > actual problem, before diving into solutions. > > >>> > > >> > 2. "Each ETL job creates snapshots with checkpoint info on > > sink > > >>> > tables > > >>> > > >> in > > >>> > > >> > Table Store" -> That reads like you're proposing that > > snapshots > > >>> > need > > >>> > > >> to be > > >>> > > >> > written to Table Store? > > >>> > > >> > 3. If you introduce a MetaService, it becomes the single > point > > >>> of > > >>> > > >> failure > > >>> > > >> > because it coordinates everything. But I can't find anything > > in > > >>> the > > >>> > > >> FLIP on > > >>> > > >> > making the MetaService high available or how to deal with > > >>> failovers > > >>> > > >> there. > > >>> > > >> > 4. The FLIP states under Rejected Alternatives "Currently > > >>> watermark > > >>> > in > > >>> > > >> > Flink cannot align data." which is not true, given that > there > > is > > >>> > > >> FLIP-182 > > >>> > > >> > > > >>> > > >> > > > >>> > > >> > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > >>> > > >> > > > >>> > > >> > 5. Given the MetaService role, it feels like this is > > >>> introducing a > > >>> > > tight > > >>> > > >> > dependency between Flink and the Table Store. How pluggable > is > > >>> this > > >>> > > >> > solution, given the changes that need to be made to Flink in > > >>> order > > >>> > to > > >>> > > >> > support this? > > >>> > > >> > > > >>> > > >> > Best regards, > > >>> > > >> > > > >>> > > >> > Martijn > > >>> > > >> > > > >>> > > >> > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon FY < > zjur...@gmail.com> > > >>> > wrote: > > >>> > > >> > > > >>> > > >> > > Hi devs: > > >>> > > >> > > > > >>> > > >> > > I'd like to start a discussion about FLIP-276: Data > > >>> Consistency of > > >>> > > >> > > Streaming and Batch ETL in Flink and Table Store[1]. In > the > > >>> whole > > >>> > > data > > >>> > > >> > > stream processing, there are consistency problems such as > > how > > >>> to > > >>> > > >> manage > > >>> > > >> > the > > >>> > > >> > > dependencies of multiple jobs and tables, how to define > and > > >>> handle > > >>> > > E2E > > >>> > > >> > > delays, and how to ensure the data consistency of queries > on > > >>> > flowing > > >>> > > >> > data? > > >>> > > >> > > This FLIP aims to support data consistency and answer > these > > >>> > > questions. > > >>> > > >> > > > > >>> > > >> > > I'v discussed the details of this FLIP with @Jingsong Lee > > and > > >>> > > >> @libenchao > > >>> > > >> > > offline several times. We hope to support data consistency > > of > > >>> > > queries > > >>> > > >> on > > >>> > > >> > > tables, managing relationships between Flink jobs and > tables > > >>> and > > >>> > > >> revising > > >>> > > >> > > tables on streaming in Flink and Table Store to improve > the > > >>> whole > > >>> > > data > > >>> > > >> > > stream processing. > > >>> > > >> > > > > >>> > > >> > > Looking forward to your feedback. > > >>> > > >> > > > > >>> > > >> > > [1] > > >>> > > >> > > > > >>> > > >> > > > > >>> > > >> > > > >>> > > >> > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store > > >>> > > >> > > > > >>> > > >> > > > > >>> > > >> > > Best, > > >>> > > >> > > Shammon > > >>> > > >> > > > > >>> > > >> > > > >>> > > >> > > >>> > > > > > >>> > > > > >>> > > > >>> > > >> > > >