Hi Piotr

Thanks for your feedback.

> - stateless operators, could completely ignore the issue and process the
records normally, as they are doing right now
> - stateful operators, should either:
>     - if the business doesn't require ordering, they could process the
records immediately
>     - or buffer the records internally, like currently windowed/temporal
operators are doing. Non windowed joins/aggregations could also work in a
similar manner, like pre-aggregate data per each "epoch" (as demarcated by
timestamp barriers).
> - sinks implementation would have to match what external system support:
>     - if the external system requires ordered writes (something like
Kafka topic?), the sinks would have to buffer the writes until a "timestamp
barrier" arrives
>     - some sinks might support writing the data simultaneously to
different "epochs". For example writing files bucketed by each epoch. Each
bucket/epoch could be committed independently

It sounds good to me and I totally agree with the proposal. We need to give
users more choices to meet different business needs and storage support. I
have updated the key points in the FLINK section[1]

> Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't
say that the timestamp barrier has been committed, but that all records for
given "epoch" have been processed/written, but not yet committed, so they
can still be rolled-back?

Nice! According to your suggestion, I have updated the FLIP for "epoch" as:
1. It is PROCESSED when records are written to a table
2. It is WRITTEN when the records are in a snapshot
3. It is PRECOMMIT when all tables are PROCESSED but not WRITTEN
4. It is COMMIT when all tables are WRITTEN
Records not WRITTEN in a table will be rolled back due to job failure.


> Why do we need to do that? Only to disallow this? To forbid writing from
two jobs into a single table? If so, can we not push this responsibility
down to the connector? Like sink/source operator coordinators should
negotiate with respective external systems if the given read/write is
allowed? So if there is a need for such meta service, Flink doesn't need to
know about it?

As I mentioned, MetaService will do some atomic operations to check and
disallow some operations when jobs are submitted concurrently. But I'm
sorry that I may not have explained the relationship between it and
sink/source clearly. Generally speaking, the interactive between Flink and
MetaService is as:
1. When the Client submits a flink job (streaming&batch), it interacts with
MetaService through Catalog in CatalogManager, including getting the table
version, registering the source/link table relationship for ETL.
2. When the flink job is running, JobManager collects data processing
progress (Timestamp Barrier and Checkpoint) from source/link subtasks and
reports them to MetaService.
We can implement the above functions in a MetaService node. Of course, it
can also be based on an atomic system (such as Zookeeper), with Client and
JobManager doing their own work.

Of course, source and sink also need some special work, such as reading
timestamp barrier, collecting timestamp barrier, writing timestamp barrier,
etc. But source/sink subtasks will not interact with MetaService directly.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-GlobalTimestampBarrierMechanism


Best,
Shammon


On Tue, Feb 7, 2023 at 1:26 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

>  Hi,
>
> Thanks for the answers.
>
> >> Are you proposing that all of the inputs to stateful operators would
> have to be sorted?
> >>
> > Records in stream don't need to be sorted, but it should be managed by
> `Timestamp Barrier`, which means
> > 1. Records belonging to a specific `Timestamp Barrier` are disordered.
> > 2. Computations in different timestamp barriers are ordered. For the
> above
> > example, each stateful subtask can start computation for T2 only after it
> > finishes computation for T1. Subtasks are independent of each other.
>
> Wouldn't that add significant latency to processing the records? You would
> basically introduce a batch processing concept in Flink?
>
> Have you considered some alternative solutions? Like for example letting
> each operator/function/sink to take care of the data disorder? For example:
> - stateless operators, could completely ignore the issue and process the
> records normally, as they are doing right now
> - stateful operators, should either:
>     - if the business doesn't require ordering, they could process the
> records immediately
>     - or buffer the records internally, like currently windowed/temporal
> operators are doing. Non windowed joins/aggregations could also work in a
> similar manner, like pre-aggregate data per each "epoch" (as demarcated by
> timestamp barriers).
> - sinks implementation would have to match what external system support:
>     - if the external system requires ordered writes (something like Kafka
> topic?), the sinks would have to buffer the writes until a "timestamp
> barrier" arrives
>     - some sinks might support writing the data simultaneously to different
> "epochs". For example writing files bucketed by each epoch. Each
> bucket/epoch could be committed independently
>
> This way, latency would be behaving very much like it currently does in
> Flink. For example if we have a following streaming SQL:
>
> INSERT INTO alerts_with_user SELECT * FROM alerts a, users u WHERE
> a.user_id = u.id
>
> If there is some lag in the users table, alerts would be still generated.
> Downstream applications could process and react to newly generated
> `alerts_with_user`, while at the same time, we could have a consistent view
> across those three tables (users, alerts, alerts_with_user) if needed.
>
> > I call the data of the timetamp barrier "committed" if the data
> > is written to a table according to the barrier without a snapshot, and
> the
> > data may be "rolled back" due to job failure. (sorry that the "committed"
> > here may not be appropriate)
>
> Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't
> say that the timestamp barrier has been committed, but that all records for
> given "epoch" have been processed/written, but not yet committed, so they
> can still be rolled-back?
>
> > For example, when multiple jobs start at the same time and register
> themselves in `MetaService`,
> > it needs to serially check whether they write to the same table
>
> Why do we need to do that? Only to disallow this? To forbid writing from
> two jobs into a single table? If so, can we not push this responsibility
> down to the connector? Like sink/source operator coordinators should
> negotiate with respective external systems if the given read/write is
> allowed? So if there is a need for such meta service, Flink doesn't need to
> know about it?
>
> Best,
> Piotrek
>
> pon., 6 lut 2023 o 10:44 Shammon FY <zjur...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > Thanks for your feedback. In general, I think `Timesamp Barrier` is a
> > special `Watermark` that all sources send watermarks with the same
> > timestamp as `Timestamp Barrier` and aggregation operators will align
> data
> > by it. For example, all source subtasks are assigned two unified
> watermarks
> > T1 and T2, T1 < T2. All records with timestamp <= T1 will be aligned by
> T1,
> > and records with timestamp (T1, T2] will be aligned by T2.
> >
> > > Are you proposing that all of the inputs to stateful operators would
> have
> > to be sorted?
> >
> > Records in stream don't need to be sorted, but it should be managed by
> > `Timestamp Barrier`, which means
> > 1. Records belonging to a specific `Timestamp Barrier` are disordered.
> > 2. Computations in different timestamp barriers are ordered. For the
> above
> > example, each stateful subtask can start computation for T2 only after it
> > finishes computation for T1. Subtasks are independent of each other.
> >
> > > Can you explain why do you need those 3 states? Why can committed
> records
> > be rolled back?
> >
> > Here I try to define the states of data in tables according to Timestamp
> > Barrier and Snapshot, and I found that the 3 states are incomplete. For
> > example, there is timestamp barrier T associated with checkpoint P, and
> > sink operator will create snapshot S for P in tables. The data states in
> > tables are as follows
> > 1. Sink finishes writing data of timestamp barrier T to a table, but
> > snapshot P is not created in the table and T is not finished in all
> tables.
> > 2. Sink finishes writing data of timestamp barrier T to a table, creates
> > snapshot P according to checkpoint C, but the T1 is not finished in all
> > tables.
> > 3. Timestamp barrier T is finished in all tables, but snapshot P is not
> > created in all tables.
> > 4. Timestamp barrier T is finished in all tables, and snapshot P is
> created
> > in all tables too.
> >
> > Currently users can only get data from snapshots in Table Store and other
> > storages such as Iceberg. Users can get different "versioned" data from
> > tables according to their data freshness and consistency requirements.
> > I think we should support getting data with a timestamp barrier even
> before
> > the sink operator finishes creating the snapshot in the future. In this
> > situation, I call the data of the timetamp barrier "committed" if the
> data
> > is written to a table according to the barrier without a snapshot, and
> the
> > data may be "rolled back" due to job failure. (sorry that the "committed"
> > here may not be appropriate)
> >
> > > I'm not sure if I follow. Generally speaking, why do we need
> MetaService
> > at all? Why can we only support writes to and reads from TableStore, and
> > not any source/sink that implements some specific interface?
> >
> > It's a good point. I added a `MetaService` node in FLIP mainly to perform
> > some atomic operations. For example, when multiple jobs start at the same
> > time and register themselves in `MetaService`, it needs to serially check
> > whether they write to the same table. If we do not use an
> > independent `MetaService Node`, we may need to introduce some other
> "atomic
> > dependency" such as ZooKeeper. But removing `MetaService Node` can make
> the
> > system more flexible, I think it's also valuable. Maybe we can carefully
> > design MetaService API and support different deployment modes in the next
> > FLIP? WDYT?
> >
> >
> > Best,
> > Shammon
> >
> >
> > On Fri, Feb 3, 2023 at 10:43 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for pushing the topic further. I'm not sure how this new
> proposal
> > is
> > > supposed to be working? How should timestamp barrier interplay with
> event
> > > time and watermarks? Or is timestamp barrier supposed to completely
> > replace
> > > watermarks?
> > >
> > > > stateful and temporal operators should align them (records) according
> > to
> > > their timestamp field.
> > >
> > > Are you proposing that all of the inputs to stateful operators would
> have
> > > to be sorted?
> > >
> > > > There're three states in a table for specific transaction :
> PreCommit,
> > > Commit and Snapshot
> > >
> > > Can you explain why do you need those 3 states? Why can committed
> records
> > > be rolled back?
> > >
> > > >> 10. Have you considered proposing a general consistency mechanism
> > > instead
> > > >> of restricting it to TableStore+ETL graphs? For example, it seems to
> > me
> > > to
> > > >> be possible and valuable to define instead the contract that
> > > sources/sinks
> > > >> need to implement in order to participate in globally consistent
> > > snapshots.
> > > >
> > > > A general consistency mechanism is cool! In my mind, the overall
> > > > `consistency system` consists of three components: Streaming & Batch
> > ETL,
> > > > Streaming & Batch Storage and MetaService. MetaService is decoupled
> > from
> > > > Storage Layer, but it stores consistency information in persistent
> > > storage.
> > > > It can be started as an independent node or a component in a large
> > Flink
> > > > cluster. In the FLIP we use TableStore as the Storage Layer. As you
> > > > mentioned, we plan to implement specific source and sink on the
> > > TableStore
> > > > in the first phase, and may consider other storage in the future
> > >
> > > I'm not sure if I follow. Generally speaking, why do we need
> MetaService
> > at
> > > all? Why can we only support writes to and reads from TableStore, and
> not
> > > any source/sink that implements some specific interface?
> > >
> > > Best,
> > > Piotrek
> > >
> > > niedz., 29 sty 2023 o 12:11 Shammon FY <zjur...@gmail.com> napisał(a):
> > >
> > > > Hi @Vicky
> > > >
> > > > Thank you for your suggestions about consistency and they're very
> nice
> > to
> > > > me!
> > > >
> > > > I have updated the examples and consistency types[1] in FLIP. In
> > > general, I
> > > > regard the Timestamp Barrier processing as a transaction and divide
> the
> > > > data consistency supported in FLIP into three types
> > > >
> > > > 1. Read Uncommitted: Read data from tables even when a transaction is
> > not
> > > > committed.
> > > > 2. Read Committed: Read data from tables according to the committed
> > > > transaction.
> > > > 3. Repeatable Read: Read data from tables according to the committed
> > > > transaction in snapshots.
> > > >
> > > > You can get more information from the updated FLIP. Looking forward
> to
> > > your
> > > > feedback, THX
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-DataConsistencyType
> > > >
> > > > Best,
> > > > Shammon
> > > >
> > > >
> > > > On Sat, Jan 28, 2023 at 4:42 AM Vasiliki Papavasileiou
> > > > <vpapavasile...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Shammon,
> > > > >
> > > > >
> > > > > Thank you for opening this FLIP which is very interesting and such
> an
> > > > > important feature to add to the Flink ecosystem. I have a couple of
> > > > > suggestions/questions:
> > > > >
> > > > >
> > > > >
> > > > >    -
> > > > >
> > > > >    Consistency is a very broad term with different meanings. There
> > are
> > > > many
> > > > >    variations between the two extremes of weak and strong
> consistency
> > > > that
> > > > >    tradeoff latency for consistency. https://jepsen.io/consistency
> > It
> > > > > would
> > > > >    be great if we could devise an approach that allows the user to
> > > choose
> > > > >    which consistency level they want to use for a query.
> > > > >
> > > > >
> > > > > Example: In your figure where you have a DAG, assume a user queries
> > > only
> > > > > Table1 for a specific key. Then, a failure happens and the table
> > > restores
> > > > > from a checkpoint. The user issues the same query, looking up the
> > same
> > > > key.
> > > > > What value does she see? With monotonic-reads, the system
> guarantees
> > > that
> > > > > she will only see the same or newer values but not older, hence
> will
> > > not
> > > > > experience time-travel. This is a very useful property for a system
> > to
> > > > have
> > > > > albeit it is at the weaker-end of consistency guarantees. But it
> is a
> > > > good
> > > > > stepping stone.
> > > > >
> > > > >
> > > > > Another example, assume the user queries Table1 for key K1 and gets
> > the
> > > > > value V11. Then, she queries Table2 that is derived from Table1 for
> > the
> > > > > same key, K1, that returns value V21. What is the relationship
> > between
> > > > V21
> > > > > and V11? Is V21 derived from V11 or can it be an older value V1
> (the
> > > > > previous value of K1)? What if value V21 is not yet in table
> Table2?
> > > What
> > > > > should she see when she queries Table1? Should she see the key V11
> or
> > > > not?
> > > > > Should the requirement be that a record is not visible in any of
> the
> > > > tables
> > > > > in a DAG unless it is available in all of them?
> > > > >
> > > > >
> > > > >
> > > > >    -
> > > > >
> > > > >    It would we good to have a set of examples with consistency
> > > anomalies
> > > > >    that can happen (like the examples above) and what consistency
> > > levels
> > > > we
> > > > >    want the system to offer to prevent them.
> > > > >    Moreover, for each such example, it would be good to have a
> > > > description
> > > > >    of how the approach (Timestamp Barriers) will work in practice
> to
> > > > > prevent
> > > > >    such anomalies.
> > > > >
> > > > >
> > > > > Thank you,
> > > > > Vicky
> > > > >
> > > > >
> > > > > On Fri, Jan 27, 2023 at 4:46 PM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hello Shammon and all,
> > > > > >
> > > > > > Thanks for this FLIP! I've been working toward this kind of
> global
> > > > > > consistency across large scale data infrastructure for a long
> time,
> > > and
> > > > > > it's fantastic to see a high-profile effort like this come into
> > play.
> > > > > >
> > > > > > I have been lurking in the discussion for a while and delaying my
> > > > > response
> > > > > > while I collected my thoughts. However, I've realized at some
> > point,
> > > > > > delaying more is not as useful as just asking a few questions, so
> > I'm
> > > > > sorry
> > > > > > if some of this seems beside the point. I'll number these to not
> > > > collide
> > > > > > with prior discussion points:
> > > > > >
> > > > > > 10. Have you considered proposing a general consistency mechanism
> > > > instead
> > > > > > of restricting it to TableStore+ETL graphs? For example, it seems
> > to
> > > me
> > > > > to
> > > > > > be possible and valuable to define instead the contract that
> > > > > sources/sinks
> > > > > > need to implement in order to participate in globally consistent
> > > > > snapshots.
> > > > > >
> > > > > > 11. It seems like this design is assuming that the "ETL Topology"
> > > under
> > > > > > the envelope of the consistency model is a well-ordered set of
> > jobs,
> > > > but
> > > > > I
> > > > > > suspect this is not the case for many organizations. It may be
> > > > > > aspirational, but I think the gold-standard here would be to
> > provide
> > > an
> > > > > > entire organization with a consistency model spanning a loosely
> > > coupled
> > > > > > ecosystem of jobs and data flows spanning teams and systems that
> > are
> > > > > > organizationally far apart.
> > > > > >
> > > > > > I realize that may be kind of abstract. Here's some examples of
> > > what's
> > > > on
> > > > > > my mind here:
> > > > > >
> > > > > > 11a. Engineering may operate one Flink cluster, and some other
> org,
> > > > like
> > > > > > Finance may operate another. In most cases, those are separate
> > > domains
> > > > > that
> > > > > > don't typically get mixed together in jobs, but some people, like
> > the
> > > > > CEO,
> > > > > > would still benefit from being able to make a consistent query
> that
> > > > spans
> > > > > > arbitrary contexts within the business. How well can a feature
> like
> > > > this
> > > > > > transcend a single Flink infrastructure? Does it make sense to
> > > > consider a
> > > > > > model in which snapshots from different domains can be
> composable?
> > > > > >
> > > > > > 11b. Some groups may have a relatively stable set of long-running
> > > jobs,
> > > > > > while others (like data science, skunkworks, etc) may adopt a
> more
> > > > > > experimental, iterative approach with lots of jobs entering and
> > > exiting
> > > > > the
> > > > > > ecosystem over time. It's still valuable to have them participate
> > in
> > > > the
> > > > > > consistency model, but it seems like the consistency system will
> > have
> > > > to
> > > > > > deal with more chaos than I see in the design. For example, how
> can
> > > > this
> > > > > > feature tolerate things like zombie jobs (which are registered in
> > the
> > > > > > system, but fail to check in for a long time, and then come back
> > > > later).
> > > > > >
> > > > > > 12. I didn't see any statements about patterns like cycles in the
> > ETL
> > > > > > Topology. I'm aware that there are fundamental constraints on how
> > > well
> > > > > > cyclic topologies can be supported by a distributed snapshot
> > > algorithm.
> > > > > > However, there are a range of approaches/compromises that we can
> > > apply
> > > > to
> > > > > > cyclic topologies. At the very least, we can state that we will
> > > detect
> > > > > > cycles and produce a warning, etc.
> > > > > >
> > > > > > 13. I'm not sure how heavily you're waiting the query syntax part
> > of
> > > > the
> > > > > > proposal, so please feel free to defer this point. It looked to
> me
> > > like
> > > > > the
> > > > > > proposal assumes people want to query either the latest
> consistent
> > > > > snapshot
> > > > > > or the latest inconsistent state. However, it seems like there's
> a
> > > > > > significant opportunity to maintain a manifest of historical
> > > snapshots
> > > > > and
> > > > > > allow people to query as of old points in time. That can be
> > valuable
> > > > for
> > > > > > individuals answering data questions, building products, and
> > > crucially
> > > > > > supporting auditability use cases. To that latter point, it seems
> > > nice
> > > > to
> > > > > > provide not only a mechanism to query arbitrary snapshots, but
> also
> > > to
> > > > > > define a TTL/GC model that allows users to keep hourly snapshots
> > for
> > > N
> > > > > > hours, daily snapshots for N days, weekly snapshots for N weeks,
> > and
> > > > the
> > > > > > same for monthly, quarterly, and yearly snapshots.
> > > > > >
> > > > > > Ok, that's all I have for now :) I'd also like to understand some
> > > > > > lower-level details, but I wanted to get these high-level
> questions
> > > off
> > > > > my
> > > > > > chest.
> > > > > >
> > > > > > Thanks again for the FLIP!
> > > > > > -John
> > > > > >
> > > > > > On 2023/01/13 11:43:28 Shammon FY wrote:
> > > > > > > Hi Piotr,
> > > > > > >
> > > > > > > I discussed with @jinsong lee about `Timestamp Barrier` and
> > > `Aligned
> > > > > > > Checkpoint` for data consistency in FLIP, we think there are
> many
> > > > > defects
> > > > > > > indeed in using `Aligned Checkpoint` to support data
> consistency
> > as
> > > > you
> > > > > > > mentioned.
> > > > > > >
> > > > > > > According to our historical discussion, I think we have reached
> > an
> > > > > > > agreement on an important point: we finally need `Timestamp
> > Barrier
> > > > > > > Mechanism` to support data consistency. But according to our
> > > > (@jinsong
> > > > > > lee
> > > > > > > and I) opinions, the total design and implementation based on
> > > > > 'Timestamp
> > > > > > > Barrier' will be too complex, and it's also too big in one
> FLIP.
> > > > > > >
> > > > > > > So we‘d like to use FLIP-276[1] as an overview design of data
> > > > > consistency
> > > > > > > in Flink Streaming and Batch ETL based on `Timestamp Barrier`.
> > > > @jinsong
> > > > > > and
> > > > > > > I hope that we can reach an agreement on the overall design in
> > > > > FLINK-276
> > > > > > > first, and then on the basic of FLIP-276 we can create other
> > FLIPs
> > > > with
> > > > > > > detailed design according to modules and drive them. Finally,
> we
> > > can
> > > > > > > support data consistency based on Timestamp in Flink.
> > > > > > >
> > > > > > > I have updated FLIP-276, deleted the Checkpoint section, and
> > added
> > > > the
> > > > > > > overall design of  `Timestamp Barrier`. Here I briefly describe
> > the
> > > > > > modules
> > > > > > > of `Timestamp Barrier` as follows
> > > > > > > 1. Generation: JobManager must coordinate all source subtasks
> and
> > > > > > generate
> > > > > > > a unified timestamp barrier from System Time or Event Time for
> > them
> > > > > > > 2. Checkpoint: Store <checkpoint, timestamp barrier> when the
> > > > timestamp
> > > > > > > barrier is generated, so that the job can recover the same
> > > timestamp
> > > > > > > barrier for the uncompleted checkpoint.
> > > > > > > 3. Replay data: Store <timestamp barrier, offset> for source
> when
> > > it
> > > > > > > broadcasts timestamp barrier, so that the source can replay the
> > > same
> > > > > data
> > > > > > > according to the same timestamp barrier.
> > > > > > > 4. Align data: Align data for stateful operator(aggregation,
> join
> > > and
> > > > > > etc.)
> > > > > > > and temporal operator(window)
> > > > > > > 5. Computation: Operator computation for a specific timestamp
> > > barrier
> > > > > > based
> > > > > > > on the results of a previous timestamp barrier.
> > > > > > > 6. Output: Operator outputs or commits results when it collects
> > all
> > > > the
> > > > > > > timestamp barriers, including operators with data buffer or
> async
> > > > > > > operations.
> > > > > > >
> > > > > > > I also list the main work in Flink and Table Store in FLIP-276.
> > > > Please
> > > > > > help
> > > > > > > to review the FLIP when you're free and feel free to give any
> > > > comments.
> > > > > > >
> > > > > > > Looking forward for your feedback, THX
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
> > > > > > >
> > > > > > > Best,
> > > > > > > Shammon
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Dec 20, 2022 at 10:01 AM Shammon FY <zjur...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Piotr,
> > > > > > > >
> > > > > > > > Thanks for your syncing. I will update the FLIP later and
> keep
> > > this
> > > > > > > > discussion open. Looking forward to your feedback, thanks
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Shammon
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski <
> > > > > pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi Shammon,
> > > > > > > >>
> > > > > > > >> I've tried to sync with Timo, David Moravek and Dawid
> > Wysakowicz
> > > > > about
> > > > > > > >> this
> > > > > > > >> subject. We have only briefly chatted and exchanged some
> > > > > > thoughts/ideas,
> > > > > > > >> but unfortunately we were not able to finish the discussions
> > > > before
> > > > > > the
> > > > > > > >> holiday season/vacations. Can we get back to this topic in
> > > > January?
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Piotrek
> > > > > > > >>
> > > > > > > >> pt., 16 gru 2022 o 10:53 Shammon FY <zjur...@gmail.com>
> > > > napisał(a):
> > > > > > > >>
> > > > > > > >> > Hi Piotr,
> > > > > > > >> >
> > > > > > > >> > I found there may be several points in our discussion, it
> > will
> > > > > cause
> > > > > > > >> > misunderstanding between us when we focus on different
> one.
> > I
> > > > list
> > > > > > each
> > > > > > > >> > point in our discussion as follows
> > > > > > > >> >
> > > > > > > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to
> > > > guarantee
> > > > > > data
> > > > > > > >> > consistency in the current Flink implementation, and
> > > "Watermark"
> > > > > and
> > > > > > > >> > "Aligned Checkpoint cannot do that?
> > > > > > > >> > My answer is "Yes", the "Aligned Checkpoint" is the only
> one
> > > due
> > > > > to
> > > > > > its
> > > > > > > >> > "Align Data" ability, we can do it in the first stage.
> > > > > > > >> >
> > > > > > > >> > > Point2: Can the combination of "Checkpoint Barrier" and
> > > > > > "Watermark"
> > > > > > > >> > support the complete consistency semantics based on
> > > "Timestamp"
> > > > in
> > > > > > the
> > > > > > > >> > current Flink implementation?
> > > > > > > >> > My answer is "No", we need a new "Timestamp Barrier"
> > mechanism
> > > > to
> > > > > do
> > > > > > > >> that
> > > > > > > >> > which may be upgraded from current "Watermark" or a new
> > > > mechanism,
> > > > > > we
> > > > > > > >> can
> > > > > > > >> > do it in the next second or third stage.
> > > > > > > >> >
> > > > > > > >> > > Point3: Are the "Checkpoint" and the new "Timestamp
> > Barrier"
> > > > > > > >> completely
> > > > > > > >> > independent? The "Checkpoint" whatever "Aligned" or
> > > "Unaligned"
> > > > or
> > > > > > "Task
> > > > > > > >> > Local" supports the "Exactly-Once" between ETLs, and the
> > > > > "Timestamp
> > > > > > > >> > Barrier" mechanism guarantees data consistency between
> > tables
> > > > > > according
> > > > > > > >> to
> > > > > > > >> > timestamp for queries.
> > > > > > > >> > My answer is "Yes", I totally agree with you. Let
> > "Checkpoint"
> > > > be
> > > > > > > >> > responsible for fault tolerance and "Timestamp Barrier"
> for
> > > > > > consistency
> > > > > > > >> > independently.
> > > > > > > >> >
> > > > > > > >> > @Piotr, What do you think? If I am missing or
> > misunderstanding
> > > > > > anything,
> > > > > > > >> > please correct me, thanks
> > > > > > > >> >
> > > > > > > >> > Best,
> > > > > > > >> > Shammon
> > > > > > > >> >
> > > > > > > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski <
> > > > > > pnowoj...@apache.org>
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi Shammon,
> > > > > > > >> > >
> > > > > > > >> > > > I don't think we can combine watermarks and checkpoint
> > > > > barriers
> > > > > > > >> > together
> > > > > > > >> > > to
> > > > > > > >> > > > guarantee data consistency. There will be a "Timestamp
> > > > > Barrier"
> > > > > > in
> > > > > > > >> our
> > > > > > > >> > > > system to "commit data", "single etl failover", "low
> > > latency
> > > > > > between
> > > > > > > >> > > ETLs"
> > > > > > > >> > > > and "strong data consistency with completed semantics"
> > in
> > > > the
> > > > > > end.
> > > > > > > >> > >
> > > > > > > >> > > Why do you think so? I've described to you above an
> > > > alternative
> > > > > > where
> > > > > > > >> we
> > > > > > > >> > > could be using watermarks for data consistency,
> regardless
> > > of
> > > > > what
> > > > > > > >> > > checkpointing/fault tolerance mechanism Flink would be
> > > using.
> > > > > Can
> > > > > > you
> > > > > > > >> > > explain what's wrong with that approach? Let me rephrase
> > it:
> > > > > > > >> > >
> > > > > > > >> > > 1. There is an independent mechanism that provides
> > > > exactly-once
> > > > > > > >> > guarantees,
> > > > > > > >> > > committing records/watermarks/events and taking care of
> > the
> > > > > > failover.
> > > > > > > >> It
> > > > > > > >> > > might be aligned, unaligned or task local checkpointing
> -
> > > this
> > > > > > doesn't
> > > > > > > >> > > matter. Let's just assume we have such a mechanism.
> > > > > > > >> > > 2. There is a watermarking mechanism (it can be some
> kind
> > of
> > > > > > system
> > > > > > > >> > > versioning re-using watermarks code path if a user
> didn't
> > > > > > configure
> > > > > > > >> > > watermarks), that takes care of the data consistency.
> > > > > > > >> > >
> > > > > > > >> > > Because watermarks from 2. are also subject to the
> > > > exactly-once
> > > > > > > >> > guarantees
> > > > > > > >> > > from the 1., once they are committed downstream systems
> > > (Flink
> > > > > > jobs or
> > > > > > > >> > > other 3rd party systems) could just easily work with the
> > > > > committed
> > > > > > > >> > > watermarks to provide consistent view/snapshot of the
> > > tables.
> > > > > Any
> > > > > > > >> > > downstream system could always check what are the
> > committed
> > > > > > > >> watermarks,
> > > > > > > >> > > select the watermark value (for example min across all
> > used
> > > > > > tables),
> > > > > > > >> and
> > > > > > > >> > > ask every table: please give me all of the data up until
> > the
> > > > > > selected
> > > > > > > >> > > watermark. Or give me all tables in the version for the
> > > > selected
> > > > > > > >> > watermark.
> > > > > > > >> > >
> > > > > > > >> > > Am I missing something? To me it seems like this way we
> > can
> > > > > fully
> > > > > > > >> > decouple
> > > > > > > >> > > the fault tolerance mechanism from the subject of the
> data
> > > > > > > >> consistency.
> > > > > > > >> > >
> > > > > > > >> > > Best,
> > > > > > > >> > > Piotrek
> > > > > > > >> > >
> > > > > > > >> > > czw., 15 gru 2022 o 13:01 Shammon FY <zjur...@gmail.com
> >
> > > > > > napisał(a):
> > > > > > > >> > >
> > > > > > > >> > > > Hi Piotr,
> > > > > > > >> > > >
> > > > > > > >> > > > It's kind of amazing about the image, it's a simple
> > > example
> > > > > and
> > > > > > I
> > > > > > > >> have
> > > > > > > >> > to
> > > > > > > >> > > > put it in a document
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink
> > > > > > > >> > > > :)
> > > > > > > >> > > >
> > > > > > > >> > > > > Does it have to be combining watermarks and
> checkpoint
> > > > > > barriers
> > > > > > > >> > > together?
> > > > > > > >> > > >
> > > > > > > >> > > > It's an interesting question. As we discussed above,
> > what
> > > we
> > > > > > need
> > > > > > > >> from
> > > > > > > >> > > > "Checkpoint" is the "Align Data Ability", and from
> > > > "Watermark"
> > > > > > is
> > > > > > > >> the
> > > > > > > >> > > > "Consistency Semantics",
> > > > > > > >> > > >
> > > > > > > >> > > > 1) Only "Align Data" can reach data consistency when
> > > > > performing
> > > > > > > >> queries
> > > > > > > >> > > on
> > > > > > > >> > > > upstream and downstream tables. I gave an example of
> > > "Global
> > > > > > Count
> > > > > > > >> > > Tables"
> > > > > > > >> > > > in our previous discussion. We need a "Align Event" in
> > the
> > > > > > streaming
> > > > > > > >> > > > processing, it's the most basic.
> > > > > > > >> > > >
> > > > > > > >> > > > 2) Only "Timestamp" can provide complete consistency
> > > > > semantics.
> > > > > > You
> > > > > > > >> > gave
> > > > > > > >> > > > some good examples about "Window" and ect operators.
> > > > > > > >> > > >
> > > > > > > >> > > > I don't think we can combine watermarks and checkpoint
> > > > > barriers
> > > > > > > >> > together
> > > > > > > >> > > to
> > > > > > > >> > > > guarantee data consistency. There will be a "Timestamp
> > > > > Barrier"
> > > > > > in
> > > > > > > >> our
> > > > > > > >> > > > system to "commit data", "single etl failover", "low
> > > latency
> > > > > > between
> > > > > > > >> > > ETLs"
> > > > > > > >> > > > and "strong data consistency with completed semantics"
> > in
> > > > the
> > > > > > end.
> > > > > > > >> > > >
> > > > > > > >> > > > At the beginning I think we can do the simplest thing
> > > first:
> > > > > > > >> guarantee
> > > > > > > >> > > the
> > > > > > > >> > > > basic data consistency with a "Barrier Mechanism". In
> > the
> > > > > > current
> > > > > > > >> Flink
> > > > > > > >> > > > there's "Aligned Checkpoint" only, that's why we
> choose
> > > > > > > >> "Checkpoint" in
> > > > > > > >> > > our
> > > > > > > >> > > > FLIP.
> > > > > > > >> > > >
> > > > > > > >> > > > > I don't see an actual connection in the the
> > > implementation
> > > > > > steps
> > > > > > > >> > > between
> > > > > > > >> > > > the checkpoint barriers approach and the
> watermark-like
> > > > > approach
> > > > > > > >> > > >
> > > > > > > >> > > > As I mentioned above, we choose "Checkpoint" to
> > guarantee
> > > > the
> > > > > > basic
> > > > > > > >> > data
> > > > > > > >> > > > consistency. But as we discussed, the most ideal
> > solution
> > > is
> > > > > > > >> "Timestamp
> > > > > > > >> > > > Barrier". After the first stage is completed based on
> > the
> > > > > > > >> "Checkpoint",
> > > > > > > >> > > we
> > > > > > > >> > > > need to evolve it to our ideal solution "Timestamp
> > > Barrier"
> > > > > > > >> > > (watermark-like
> > > > > > > >> > > > approach) in the next second or third stage. This does
> > not
> > > > > mean
> > > > > > > >> > upgrading
> > > > > > > >> > > > "Checkpoint Mechanism" in Flink. It means that after
> we
> > > > > > implement a
> > > > > > > >> new
> > > > > > > >> > > > "Timestamp Barrier" or upgrade "Watermark" to support
> > it,
> > > we
> > > > > can
> > > > > > > >> use it
> > > > > > > >> > > > instead of the current "Checkpoint Mechanism" directly
> > in
> > > > our
> > > > > > > >> > > "MetaService"
> > > > > > > >> > > > and "Table Store".
> > > > > > > >> > > >
> > > > > > > >> > > > In the discussion between @David and me, I summarized
> > the
> > > > work
> > > > > > of
> > > > > > > >> > > upgrading
> > > > > > > >> > > > "Watermark" to support "Timestamp Barrier". It looks
> > like
> > > a
> > > > > big
> > > > > > job
> > > > > > > >> and
> > > > > > > >> > > you
> > > > > > > >> > > > can find the details in our discussion. I think we
> don't
> > > > need
> > > > > > to do
> > > > > > > >> > that
> > > > > > > >> > > in
> > > > > > > >> > > > our first stage.
> > > > > > > >> > > >
> > > > > > > >> > > > Also in that discussion (my reply to @David) too, I
> > > briefly
> > > > > > > >> summarized
> > > > > > > >> > > the
> > > > > > > >> > > > work that needs to be done to use the new mechanism
> > > > (Timestamp
> > > > > > > >> Barrier)
> > > > > > > >> > > > after we implement the basic function on "Checkpoint".
> > It
> > > > > seems
> > > > > > that
> > > > > > > >> > the
> > > > > > > >> > > > work is not too big on my side, and it is feasible on
> > the
> > > > > whole.
> > > > > > > >> > > >
> > > > > > > >> > > > Based on the above points, I think we can support
> basic
> > > data
> > > > > > > >> > consistency
> > > > > > > >> > > on
> > > > > > > >> > > > "Checkpoint" in the first stage which is described in
> > > FLIP,
> > > > > and
> > > > > > > >> > continue
> > > > > > > >> > > to
> > > > > > > >> > > > evolve it to "Timestamp Barrier" to support low
> latency
> > > > > between
> > > > > > ETLs
> > > > > > > >> > and
> > > > > > > >> > > > completed semantics in the second or third stage
> later.
> > > > What
> > > > > > do you
> > > > > > > >> > > think?
> > > > > > > >> > > >
> > > > > > > >> > > > Best,
> > > > > > > >> > > > Shammon
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski <
> > > > > > > >> pnowoj...@apache.org>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hi Shammon,
> > > > > > > >> > > > >
> > > > > > > >> > > > > > The following is a simple example. Data is
> > transferred
> > > > > > between
> > > > > > > >> > ETL1,
> > > > > > > >> > > > ETL2
> > > > > > > >> > > > > and ETL3 in Intermediate Table by Timestamp.
> > > > > > > >> > > > > > [image: simple_example.jpg]
> > > > > > > >> > > > >
> > > > > > > >> > > > > This time it's your image that doesn't want to load
> :)
> > > > > > > >> > > > >
> > > > > > > >> > > > > >  Timestamp Barrier
> > > > > > > >> > > > >
> > > > > > > >> > > > > Does it have to be combining watermarks and
> checkpoint
> > > > > > barriers
> > > > > > > >> > > together?
> > > > > > > >> > > > > Can we not achieve the same result with two
> > independent
> > > > > > processes
> > > > > > > >> > > > > checkpointing (regardless if this is a global
> > > > > > aligned/unaligned
> > > > > > > >> > > > checkpoint,
> > > > > > > >> > > > > or a task local checkpoint) plus watermarking?
> > > > Checkpointing
> > > > > > would
> > > > > > > >> > > > provide
> > > > > > > >> > > > > exactly-once guarantees, and actually committing the
> > > > > results,
> > > > > > and
> > > > > > > >> it
> > > > > > > >> > > > would
> > > > > > > >> > > > > be actually committing the last emitted watermark?
> > From
> > > > the
> > > > > > > >> > perspective
> > > > > > > >> > > > of
> > > > > > > >> > > > > the sink/table, it shouldn't really matter how the
> > > > > > exactly-once is
> > > > > > > >> > > > > achieved, and whether the job has performed an
> > unaligned
> > > > > > > >> checkpoint
> > > > > > > >> > or
> > > > > > > >> > > > > something completely different. It seems to me that
> > the
> > > > > > sink/table
> > > > > > > >> > > > > could/should be able to understand/work with only
> the
> > > > basic
> > > > > > > >> > > information:
> > > > > > > >> > > > > here are records and watermarks (with at that point
> of
> > > > time
> > > > > > > >> already
> > > > > > > >> > > fixed
> > > > > > > >> > > > > order), they are committed and will never change.
> > > > > > > >> > > > >
> > > > > > > >> > > > > > However, from the perspective of implementation
> > > > > complexity,
> > > > > > I
> > > > > > > >> > > > personally
> > > > > > > >> > > > > think using Checkpoint in the first phase makes
> sense,
> > > > what
> > > > > > do you
> > > > > > > >> > > think?
> > > > > > > >> > > > >
> > > > > > > >> > > > > Maybe I'm missing something, but I don't see an
> actual
> > > > > > connection
> > > > > > > >> in
> > > > > > > >> > > the
> > > > > > > >> > > > > implementation steps between the checkpoint barriers
> > > > > approach
> > > > > > and
> > > > > > > >> the
> > > > > > > >> > > > > watermark-like approach. They seem to me (from the
> > > > > > perspective of
> > > > > > > >> > Flink
> > > > > > > >> > > > > runtime at least) like two completely different
> > > > mechanisms.
> > > > > > Not
> > > > > > > >> one
> > > > > > > >> > > > leading
> > > > > > > >> > > > > to the other.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Best,
> > > > > > > >> > > > > Piotrek
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > > śr., 14 gru 2022 o 15:19 Shammon FY <
> > zjur...@gmail.com>
> > > > > > > >> napisał(a):
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Hi Piotr,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks for your valuable input which makes me
> > consider
> > > > the
> > > > > > core
> > > > > > > >> > point
> > > > > > > >> > > > of
> > > > > > > >> > > > > > data consistency in deep. I'd like to define the
> > data
> > > > > > > >> consistency
> > > > > > > >> > on
> > > > > > > >> > > > the
> > > > > > > >> > > > > > whole streaming & batch processing as follows and
> I
> > > hope
> > > > > > that we
> > > > > > > >> > can
> > > > > > > >> > > > have
> > > > > > > >> > > > > > an agreement on it:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > BOutput = Fn(BInput), BInput is a bounded input
> > which
> > > is
> > > > > > > >> splitted
> > > > > > > >> > > from
> > > > > > > >> > > > > > unbounded streaming, Fn is the computation of a
> node
> > > or
> > > > > ETL,
> > > > > > > >> > BOutput
> > > > > > > >> > > is
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > bounded output of BInput. All the data in BInput
> and
> > > > > > BOutput are
> > > > > > > >> > > > > unordered,
> > > > > > > >> > > > > > and BInput and BOutput are data consistent.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > The key points above include 1) the segment
> > semantics
> > > of
> > > > > > > >> BInput; 2)
> > > > > > > >> > > the
> > > > > > > >> > > > > > computation semantics of Fn
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 1. The segment semantics of BInput
> > > > > > > >> > > > > > a) Transactionality of data. It is necessary to
> > ensure
> > > > the
> > > > > > > >> semantic
> > > > > > > >> > > > > > transaction of the bounded data set when it is
> > > splitted
> > > > > > from the
> > > > > > > >> > > > > unbounded
> > > > > > > >> > > > > > streaming. For example, we cannot split multiple
> > > records
> > > > > in
> > > > > > one
> > > > > > > >> > > > > transaction
> > > > > > > >> > > > > > to different bounded data sets.
> > > > > > > >> > > > > > b) Timeliness of data. Some data is related with
> > time,
> > > > > such
> > > > > > as
> > > > > > > >> > > boundary
> > > > > > > >> > > > > > data for a window. It is necessary to consider
> > whether
> > > > the
> > > > > > > >> bounded
> > > > > > > >> > > data
> > > > > > > >> > > > > set
> > > > > > > >> > > > > > needs to include a watermark which can trigger the
> > > > window
> > > > > > > >> result.
> > > > > > > >> > > > > > c) Constraints of data. The Timestamp Barrier
> should
> > > > > perform
> > > > > > > >> some
> > > > > > > >> > > > > specific
> > > > > > > >> > > > > > operations after computation in operators, for
> > > example,
> > > > > > force
> > > > > > > >> flush
> > > > > > > >> > > > data.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Checkpoint Barrier misses all the semantics above,
> > and
> > > > we
> > > > > > should
> > > > > > > >> > > > support
> > > > > > > >> > > > > > user to define Timestamp for data on Event Time or
> > > > System
> > > > > > Time
> > > > > > > >> > > > according
> > > > > > > >> > > > > to
> > > > > > > >> > > > > > the job and computation later.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 2. The computation semantics of Fn
> > > > > > > >> > > > > > a) Deterministic computation
> > > > > > > >> > > > > > Most computations are deterministic such as map,
> > > filter,
> > > > > > count,
> > > > > > > >> sum
> > > > > > > >> > > and
> > > > > > > >> > > > > > ect. They generate the same unordered result from
> > the
> > > > same
> > > > > > > >> > unordered
> > > > > > > >> > > > > input
> > > > > > > >> > > > > > every time, and we can easily define data
> > consistency
> > > on
> > > > > the
> > > > > > > >> input
> > > > > > > >> > > and
> > > > > > > >> > > > > > output for them.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > b) Non-deterministic computation
> > > > > > > >> > > > > > Some computations are non-deterministic. They will
> > > > produce
> > > > > > > >> > different
> > > > > > > >> > > > > > results from the same input every time. I try to
> > > divide
> > > > > them
> > > > > > > >> into
> > > > > > > >> > the
> > > > > > > >> > > > > > following types:
> > > > > > > >> > > > > > 1) Non-deterministic computation semantics, such
> as
> > > rank
> > > > > > > >> operator.
> > > > > > > >> > > When
> > > > > > > >> > > > > it
> > > > > > > >> > > > > > computes multiple times (for example, failover),
> the
> > > > first
> > > > > > or
> > > > > > > >> last
> > > > > > > >> > > > output
> > > > > > > >> > > > > > results can both be the final result which will
> > cause
> > > > > > different
> > > > > > > >> > > > failover
> > > > > > > >> > > > > > handlers for downstream jobs. I will expand it
> > later.
> > > > > > > >> > > > > > 2) Non-deterministic computation optimization,
> such
> > as
> > > > > async
> > > > > > > >> io. It
> > > > > > > >> > > is
> > > > > > > >> > > > > > necessary to sync these operations when the
> barrier
> > of
> > > > > input
> > > > > > > >> > arrives.
> > > > > > > >> > > > > > 3) Deviation caused by data segmentat and
> > computation
> > > > > > semantics,
> > > > > > > >> > such
> > > > > > > >> > > > as
> > > > > > > >> > > > > > Window. This requires that the users should
> > customize
> > > > the
> > > > > > data
> > > > > > > >> > > > > segmentation
> > > > > > > >> > > > > > according to their needs correctly.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Checkpoint Barrier matches a) and Timestamp
> Barrier
> > > can
> > > > > > match
> > > > > > > >> all
> > > > > > > >> > a)
> > > > > > > >> > > > and
> > > > > > > >> > > > > > b).
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > We define data consistency of BInput and BOutput
> > based
> > > > all
> > > > > > > >> above.
> > > > > > > >> > The
> > > > > > > >> > > > > > BOutput of upstream ETL will be the BInput of the
> > next
> > > > > ETL,
> > > > > > and
> > > > > > > >> > > > multiple
> > > > > > > >> > > > > > ETL jobs form a complex "ETL Topology".
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Based on the above definitions, I'd like to give a
> > > > general
> > > > > > > >> proposal
> > > > > > > >> > > > with
> > > > > > > >> > > > > > "Timetamp Barrier" in my mind, it's not very
> > detailed
> > > > and
> > > > > > please
> > > > > > > >> > help
> > > > > > > >> > > > to
> > > > > > > >> > > > > > review it and feel free to comment @David, @Piotr
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 1. Data segment with Timestamp
> > > > > > > >> > > > > > a) Users can define the Timestamp Barrier with
> > System
> > > > > Time,
> > > > > > > >> Event
> > > > > > > >> > > Time.
> > > > > > > >> > > > > > b) Source nodes generate the same Timestamp
> Barrier
> > > > after
> > > > > > > >> reading
> > > > > > > >> > > data
> > > > > > > >> > > > > > from RootTable
> > > > > > > >> > > > > > c) There is a same Timetamp data in each record
> > > > according
> > > > > to
> > > > > > > >> > > Timestamp
> > > > > > > >> > > > > > Barrier, such as (a, T), (b, T), (c, T), (T,
> > barrier)
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 2. Computation with Timestamp
> > > > > > > >> > > > > > a) Records are unordered with the same Timestamp.
> > > > > Stateless
> > > > > > > >> > operators
> > > > > > > >> > > > > such
> > > > > > > >> > > > > > as map/flatmap/filter can process data without
> > > aligning
> > > > > > > >> Timestamp
> > > > > > > >> > > > > Barrier,
> > > > > > > >> > > > > > which is different from Checkpoint Barrier.
> > > > > > > >> > > > > > b) Records between Timestamp are ordered. Stateful
> > > > > operators
> > > > > > > >> must
> > > > > > > >> > > align
> > > > > > > >> > > > > > data and compute by each Timestamp, then compute
> by
> > > > > Timetamp
> > > > > > > >> > > sequence.
> > > > > > > >> > > > > > c) Stateful operators will output results of
> > specific
> > > > > > Timestamp
> > > > > > > >> > after
> > > > > > > >> > > > > > computation.
> > > > > > > >> > > > > > d) Sink operator "commit records" with specific
> > > > Timestamp
> > > > > > and
> > > > > > > >> > report
> > > > > > > >> > > > the
> > > > > > > >> > > > > > status to JobManager
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 3. Read data with Timestamp
> > > > > > > >> > > > > > a) Downstream ETL reads data according to
> Timestamp
> > > > after
> > > > > > > >> upstream
> > > > > > > >> > > ETL
> > > > > > > >> > > > > > "commit" it.
> > > > > > > >> > > > > > b) Stateful operators interact with state when
> > > computing
> > > > > > data of
> > > > > > > >> > > > > > Timestamp, but they won't trigger checkpoint for
> > every
> > > > > > > >> Timestamp.
> > > > > > > >> > > > > Therefore
> > > > > > > >> > > > > > source ETL job can generate Timestamp every few
> > > seconds
> > > > or
> > > > > > even
> > > > > > > >> > > > hundreds
> > > > > > > >> > > > > of
> > > > > > > >> > > > > > milliseconds
> > > > > > > >> > > > > > c) Based on Timestamp the delay between ETL jobs
> > will
> > > be
> > > > > > very
> > > > > > > >> > small,
> > > > > > > >> > > > and
> > > > > > > >> > > > > > in the best case the E2E latency maybe only tens
> of
> > > > > seconds.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 4. Failover and Recovery
> > > > > > > >> > > > > > ETL jobs are cascaded through the Intermediate
> > Table.
> > > > > After
> > > > > > a
> > > > > > > >> > single
> > > > > > > >> > > > ETL
> > > > > > > >> > > > > > job fails, it needs to replay the input data and
> > > > recompute
> > > > > > the
> > > > > > > >> > > results.
> > > > > > > >> > > > > As
> > > > > > > >> > > > > > you mentioned, whether the cascaded ETL jobs are
> > > > restarted
> > > > > > > >> depends
> > > > > > > >> > on
> > > > > > > >> > > > the
> > > > > > > >> > > > > > determinacy of the intermediate data between them.
> > > > > > > >> > > > > > a) An ETL job will rollback and reread data from
> > > > upstream
> > > > > > ETL by
> > > > > > > >> > > > specific
> > > > > > > >> > > > > > Timestamp according to the Checkpoint.
> > > > > > > >> > > > > > b) According to the management of Checkpoint and
> > > > > Timestamp,
> > > > > > ETL
> > > > > > > >> can
> > > > > > > >> > > > > replay
> > > > > > > >> > > > > > all Timestamp and data after failover, which means
> > > > BInput
> > > > > > is the
> > > > > > > >> > same
> > > > > > > >> > > > > > before and after failover.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > c) For deterministic Fn, it generates the same
> > BOutput
> > > > > from
> > > > > > the
> > > > > > > >> > same
> > > > > > > >> > > > > BInput
> > > > > > > >> > > > > > 1) If there's no data of the specific Timestamp in
> > the
> > > > > sink
> > > > > > > >> table,
> > > > > > > >> > > ETL
> > > > > > > >> > > > > > just "commit" it as normal.
> > > > > > > >> > > > > > 2) If the Timestamp data exists in the sink table,
> > ETL
> > > > can
> > > > > > just
> > > > > > > >> > > discard
> > > > > > > >> > > > > > the new data.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > d) For non-deterministic Fn, it generates
> different
> > > > > BOutput
> > > > > > from
> > > > > > > >> > the
> > > > > > > >> > > > same
> > > > > > > >> > > > > > BInput before and after failover. For example,
> > > BOutput1
> > > > > > before
> > > > > > > >> > > failover
> > > > > > > >> > > > > and
> > > > > > > >> > > > > > BOutput2 after failover. The state in ETL is
> > > consistent
> > > > > with
> > > > > > > >> > > BOutput2.
> > > > > > > >> > > > > > There are two cases according to users'
> requirements
> > > > > > > >> > > > > > 1) Users can accept BOutput1 as the final output
> and
> > > > > > downstream
> > > > > > > >> > ETLs
> > > > > > > >> > > > > don't
> > > > > > > >> > > > > > need to restart. Sink in ETL can discard BOutput2
> > > > directly
> > > > > > if
> > > > > > > >> the
> > > > > > > >> > > > > Timestamp
> > > > > > > >> > > > > > exists in the sink table.
> > > > > > > >> > > > > > 2) Users only accept BOutput2 as the final output,
> > > then
> > > > > all
> > > > > > the
> > > > > > > >> > > > > downstream
> > > > > > > >> > > > > > ETLs and Intermediate Table should rollback to
> > > specific
> > > > > > > >> Timestamp,
> > > > > > > >> > > the
> > > > > > > >> > > > > > downstream ETLs should be restarted too.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > The following is a simple example. Data is
> > transferred
> > > > > > between
> > > > > > > >> > ETL1,
> > > > > > > >> > > > ETL2
> > > > > > > >> > > > > > and ETL3 in Intermediate Table by Timestamp.
> > > > > > > >> > > > > > [image: simple_example.jpg]
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Besides Timestamp, there's a big challenge in
> > > > Intermediate
> > > > > > > >> Table.
> > > > > > > >> > It
> > > > > > > >> > > > > > should support a highly implemented "commit
> > Timestamp
> > > > > > snapshot"
> > > > > > > >> > with
> > > > > > > >> > > > high
> > > > > > > >> > > > > > throughput, which requires the Table Store to
> > enhance
> > > > > > streaming
> > > > > > > >> > > > > > capabilities like pulsar or kafka.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > In this FLIP, we plan to implement the proposal
> with
> > > > > > Checkpoint,
> > > > > > > >> > the
> > > > > > > >> > > > > above
> > > > > > > >> > > > > > Timestamp can be replaced by Checkpoint. Of
> course,
> > > > > > Checkpoint
> > > > > > > >> has
> > > > > > > >> > > some
> > > > > > > >> > > > > > problems. I think we have reached some consensus
> in
> > > the
> > > > > > > >> discussion
> > > > > > > >> > > > about
> > > > > > > >> > > > > > the Checkpoint problems, including data segment
> > > > semantics,
> > > > > > flush
> > > > > > > >> > data
> > > > > > > >> > > > of
> > > > > > > >> > > > > > some operators, and the increase of E2E delay.
> > > However,
> > > > > > from the
> > > > > > > >> > > > > > perspective of implementation complexity, I
> > personally
> > > > > think
> > > > > > > >> using
> > > > > > > >> > > > > > Checkpoint in the first phase makes sense, what do
> > you
> > > > > > think?
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Finally, I think I misunderstood the "Rolling
> > > > Checkpoint"
> > > > > > and
> > > > > > > >> "All
> > > > > > > >> > at
> > > > > > > >> > > > > once
> > > > > > > >> > > > > > Checkpoint" in my last explanation which you and
> > > @David
> > > > > > > >> mentioned.
> > > > > > > >> > I
> > > > > > > >> > > > > > thought their differences were mainly to select
> > > > different
> > > > > > table
> > > > > > > >> > > > versions
> > > > > > > >> > > > > > for queries. According to your reply, I think it
> is
> > > > > whether
> > > > > > > >> there
> > > > > > > >> > are
> > > > > > > >> > > > > > multiple "rolling checkpoints" in each ETL job,
> > right?
> > > > If
> > > > > I
> > > > > > > >> > > understand
> > > > > > > >> > > > > > correctly, the "Rolling Checkpoint" is a good
> idea,
> > > and
> > > > we
> > > > > > can
> > > > > > > >> > > > guarantee
> > > > > > > >> > > > > > "Strong Data Consistency" between multiple tables
> in
> > > > > > MetaService
> > > > > > > >> > for
> > > > > > > >> > > > > > queries. Thanks.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Best,
> > > > > > > >> > > > > > Shammon
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski <
> > > > > > > >> > pnowoj...@apache.org
> > > > > > > >> > > >
> > > > > > > >> > > > > > wrote:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >> Hi Shammon,
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Thanks for the explanations, I think I understand
> > the
> > > > > > problem
> > > > > > > >> > better
> > > > > > > >> > > > > now.
> > > > > > > >> > > > > >> I have a couple of follow up questions, but
> first:
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> >> 3. I'm pretty sure there are counter examples,
> > > where
> > > > > > your
> > > > > > > >> > > proposed
> > > > > > > >> > > > > >> mechanism of using checkpoints (even aligned!)
> will
> > > > > produce
> > > > > > > >> > > > > >> inconsistent data from the perspective of the
> event
> > > > time.
> > > > > > > >> > > > > >> >>  a) For example what if one of your "ETL"
> jobs,
> > > has
> > > > > the
> > > > > > > >> > following
> > > > > > > >> > > > > DAG:
> > > > > > > >> > > > > >> >>
> > > > > > > >> > > > > >> >>  Even if you use aligned checkpoints for
> > > committing
> > > > > the
> > > > > > > >> data to
> > > > > > > >> > > the
> > > > > > > >> > > > > >> sink table, the watermarks of "Window1" and
> > "Window2"
> > > > are
> > > > > > > >> > completely
> > > > > > > >> > > > > >> independent. The sink table might easily have
> data
> > > from
> > > > > the
> > > > > > > >> > > > Src1/Window1
> > > > > > > >> > > > > >> from the event time T1 and Src2/Window2 from
> later
> > > > event
> > > > > > time
> > > > > > > >> T2.
> > > > > > > >> > > > > >> >>  b) I think the same applies if you have two
> > > > > completely
> > > > > > > >> > > > > >> independent ETL jobs writing either to the same
> > sink
> > > > > > table, or
> > > > > > > >> two
> > > > > > > >> > > to
> > > > > > > >> > > > > >> different sink tables (that are both later used
> in
> > > the
> > > > > same
> > > > > > > >> > > downstream
> > > > > > > >> > > > > job).
> > > > > > > >> > > > > >> >
> > > > > > > >> > > > > >> > Thank you for your feedback. I cannot see the
> DAG
> > > in
> > > > > 3.a
> > > > > > in
> > > > > > > >> your
> > > > > > > >> > > > > reply,
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> I've attached the image directly. I hope you can
> > see
> > > it
> > > > > > now.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Basically what I meant is that if you have a
> > topology
> > > > > like
> > > > > > > >> (from
> > > > > > > >> > the
> > > > > > > >> > > > > >> attached image):
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> window1 = src1.keyBy(...).window(...)
> > > > > > > >> > > > > >> window2 = src2.keyBy(...).window(...)
> > > > > > > >> > > > > >> window1.join(window2, ...).addSink(sink)
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> or with even simpler (note no keyBy between `src`
> > and
> > > > > > > >> `process`):
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >>
> > > > > src.process(some_function_that_buffers_data)..addSink(sink)
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> you will have the same problem. Generally
> speaking
> > if
> > > > > > there is
> > > > > > > >> an
> > > > > > > >> > > > > >> operator buffering some data, and if the data are
> > not
> > > > > > flushed
> > > > > > > >> on
> > > > > > > >> > > every
> > > > > > > >> > > > > >> checkpoint (any windowed or temporal operator,
> > > > > > > >> AsyncWaitOperator,
> > > > > > > >> > > CEP,
> > > > > > > >> > > > > >> ...), you can design a graph that will produce
> > > > > > "inconsistent"
> > > > > > > >> data
> > > > > > > >> > > as
> > > > > > > >> > > > > part
> > > > > > > >> > > > > >> of a checkpoint.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Apart from that a couple of other
> questions/issues.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> > 1) Global Checkpoint Commit: a) "rolling
> fashion"
> > > or
> > > > b)
> > > > > > > >> > altogether
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Do we need to support the "altogether" one?
> Rolling
> > > > > > > >> checkpoint, as
> > > > > > > >> > > > it's
> > > > > > > >> > > > > >> more independent, I could see it scale much
> better,
> > > and
> > > > > > avoid a
> > > > > > > >> > lot
> > > > > > > >> > > of
> > > > > > > >> > > > > >> problems that I mentioned before.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> > 1) Checkpoint VS Watermark
> > > > > > > >> > > > > >> >
> > > > > > > >> > > > > >> > 1. Stateful Computation is aligned according to
> > > > > Timestamp
> > > > > > > >> > Barrier
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Indeed the biggest obstacle I see here, is that
> we
> > > > would
> > > > > > indeed
> > > > > > > >> > most
> > > > > > > >> > > > > >> likely have:
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> > b) Similar to the window operator, align data
> in
> > > > memory
> > > > > > > >> > according
> > > > > > > >> > > to
> > > > > > > >> > > > > >> Timestamp.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> for every operator.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> > 4. Failover supports Timestamp fine-grained
> data
> > > > > recovery
> > > > > > > >> > > > > >> >
> > > > > > > >> > > > > >> > As we mentioned in the FLIP, each ETL is a
> > complex
> > > > > single
> > > > > > > >> node.
> > > > > > > >> > A
> > > > > > > >> > > > > single
> > > > > > > >> > > > > >> > ETL job failover should not cause the failure
> of
> > > the
> > > > > > entire
> > > > > > > >> "ETL
> > > > > > > >> > > > > >> Topology".
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> I don't understand this point. Regardless if we
> are
> > > > using
> > > > > > > >> > > > > >> rolling checkpoints, all at once checkpoints or
> > > > > > watermarks, I
> > > > > > > >> see
> > > > > > > >> > > the
> > > > > > > >> > > > > same
> > > > > > > >> > > > > >> problems with non determinism, if we want to
> > preserve
> > > > the
> > > > > > > >> > > requirement
> > > > > > > >> > > > to
> > > > > > > >> > > > > >> not fail over the whole topology at once.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Both Watermarks and "rolling checkpoint" I think
> > have
> > > > the
> > > > > > same
> > > > > > > >> > > issue,
> > > > > > > >> > > > > >> that either require deterministic logic, or
> global
> > > > > > failover, or
> > > > > > > >> > > > > downstream
> > > > > > > >> > > > > >> jobs can only work on the already committed by
> the
> > > > > upstream
> > > > > > > >> > records.
> > > > > > > >> > > > But
> > > > > > > >> > > > > >> working with only "committed records" would
> either
> > > > brake
> > > > > > > >> > consistency
> > > > > > > >> > > > > >> between different jobs, or would cause huge delay
> > in
> > > > > > > >> checkpointing
> > > > > > > >> > > and
> > > > > > > >> > > > > e2e
> > > > > > > >> > > > > >> latency, as:
> > > > > > > >> > > > > >> 1. upstream job has to produce some data,
> > downstream
> > > > can
> > > > > > not
> > > > > > > >> > process
> > > > > > > >> > > > it,
> > > > > > > >> > > > > >> downstream can not process this data yet
> > > > > > > >> > > > > >> 2. checkpoint 42 is triggered on the upstream job
> > > > > > > >> > > > > >> 3. checkpoint 42 is completed on the upstream
> job,
> > > data
> > > > > > > >> processed
> > > > > > > >> > > > since
> > > > > > > >> > > > > >> last checkpoint has been committed
> > > > > > > >> > > > > >> 4. upstream job can continue producing more data
> > > > > > > >> > > > > >> 5. only now downstream can start processing the
> > data
> > > > > > produced
> > > > > > > >> in
> > > > > > > >> > 1.,
> > > > > > > >> > > > but
> > > > > > > >> > > > > >> it can not read the not-yet-committed data from
> 4.
> > > > > > > >> > > > > >> 6. once downstream finishes processing data from
> > 1.,
> > > it
> > > > > can
> > > > > > > >> > trigger
> > > > > > > >> > > > > >> checkpoint 42
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> The "all at once checkpoint", I can see only
> > working
> > > > with
> > > > > > > >> global
> > > > > > > >> > > > > failover
> > > > > > > >> > > > > >> of everything.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> This is assuming exactly-once mode. at-least-once
> > > would
> > > > > be
> > > > > > much
> > > > > > > >> > > > easier.
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> Best,
> > > > > > > >> > > > > >> Piotrek
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >> wt., 13 gru 2022 o 08:57 Shammon FY <
> > > zjur...@gmail.com
> > > > >
> > > > > > > >> > napisał(a):
> > > > > > > >> > > > > >>
> > > > > > > >> > > > > >>> Hi David,
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> Thanks for the comments from you and @Piotr. I'd
> > > like
> > > > to
> > > > > > > >> explain
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> details about the FLIP first.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 1) Global Checkpoint Commit: a) "rolling
> fashion"
> > or
> > > > b)
> > > > > > > >> > altogether
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> This mainly depends on the needs of users. Users
> > can
> > > > > > decide
> > > > > > > >> the
> > > > > > > >> > > data
> > > > > > > >> > > > > >>> version of tables in their queries according to
> > > > > different
> > > > > > > >> > > > requirements
> > > > > > > >> > > > > >>> for
> > > > > > > >> > > > > >>> data consistency and freshness. Since we manage
> > > > multiple
> > > > > > > >> versions
> > > > > > > >> > > for
> > > > > > > >> > > > > >>> each
> > > > > > > >> > > > > >>> table, this will not bring too much complexity
> to
> > > the
> > > > > > system.
> > > > > > > >> We
> > > > > > > >> > > only
> > > > > > > >> > > > > >>> need
> > > > > > > >> > > > > >>> to support different strategies when calculating
> > > table
> > > > > > > >> versions
> > > > > > > >> > for
> > > > > > > >> > > > > >>> query.
> > > > > > > >> > > > > >>> So we give this decision to users, who can use
> > > > > > > >> "consistency.type"
> > > > > > > >> > > to
> > > > > > > >> > > > > set
> > > > > > > >> > > > > >>> different consistency in "Catalog". We can
> > continue
> > > to
> > > > > > refine
> > > > > > > >> > this
> > > > > > > >> > > > > later.
> > > > > > > >> > > > > >>> For example, dynamic parameters support
> different
> > > > > > consistency
> > > > > > > >> > > > > >>> requirements
> > > > > > > >> > > > > >>> for each query
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 2) MetaService module
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> Many Flink streaming jobs use application mode,
> > and
> > > > they
> > > > > > are
> > > > > > > >> > > > > independent
> > > > > > > >> > > > > >>> of
> > > > > > > >> > > > > >>> each other. So we currently assume that
> > MetaService
> > > is
> > > > > an
> > > > > > > >> > > independent
> > > > > > > >> > > > > >>> node.
> > > > > > > >> > > > > >>> In the first phase, it will be started in
> > > standalone,
> > > > > and
> > > > > > HA
> > > > > > > >> will
> > > > > > > >> > > be
> > > > > > > >> > > > > >>> supported later. This node will reuse many Flink
> > > > > modules,
> > > > > > > >> > including
> > > > > > > >> > > > > REST,
> > > > > > > >> > > > > >>> Gateway-RpcServer, etc. We hope that the core
> > > > functions
> > > > > of
> > > > > > > >> > > > MetaService
> > > > > > > >> > > > > >>> can
> > > > > > > >> > > > > >>> be developed as a component. When Flink
> > subsequently
> > > > > uses
> > > > > > a
> > > > > > > >> large
> > > > > > > >> > > > > session
> > > > > > > >> > > > > >>> cluster to support various computations, it can
> be
> > > > > > integrated
> > > > > > > >> > into
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> "ResourceManager" as a plug-in component.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> Besides above, I'd like to describe the
> Checkpoint
> > > and
> > > > > > > >> Watermark
> > > > > > > >> > > > > >>> mechanisms
> > > > > > > >> > > > > >>> in detail as follows.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 1) Checkpoint VS Watermark
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> As you mentioned, I think it's very correct that
> > > what
> > > > we
> > > > > > want
> > > > > > > >> in
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> Checkpoint is to align streaming computation and
> > > data
> > > > > > > >> according
> > > > > > > >> > to
> > > > > > > >> > > > > >>> certain
> > > > > > > >> > > > > >>> semantics. Timestamp is a very ideal solution.
> To
> > > > > achieve
> > > > > > this
> > > > > > > >> > > goal,
> > > > > > > >> > > > we
> > > > > > > >> > > > > >>> can
> > > > > > > >> > > > > >>> think of the following functions that need to be
> > > > > > supported in
> > > > > > > >> the
> > > > > > > >> > > > > >>> Watermark
> > > > > > > >> > > > > >>> mechanism:
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 1. Stateful Computation is aligned according to
> > > > > Timestamp
> > > > > > > >> Barrier
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> As the "three tables example" we discussed
> above,
> > we
> > > > > need
> > > > > > to
> > > > > > > >> > align
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> stateful operator computation according to the
> > > barrier
> > > > > to
> > > > > > > >> ensure
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> consistency of the result data. In order to
> align
> > > the
> > > > > > > >> > computation,
> > > > > > > >> > > > > there
> > > > > > > >> > > > > >>> are two ways in my mind
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> a) Similar to the Aligned Checkpoint Barrier.
> > > > Timestamp
> > > > > > > >> Barrier
> > > > > > > >> > > > aligns
> > > > > > > >> > > > > >>> data
> > > > > > > >> > > > > >>> according to the channel, which will lead to
> > > > > backpressure
> > > > > > just
> > > > > > > >> > like
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> aligned checkpoint. It seems not a good idea.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> b) Similar to the window operator, align data in
> > > > memory
> > > > > > > >> according
> > > > > > > >> > > to
> > > > > > > >> > > > > >>> Timestamp. Two steps need to be supported here:
> > > first,
> > > > > > data is
> > > > > > > >> > > > aligned
> > > > > > > >> > > > > by
> > > > > > > >> > > > > >>> timestamp for state operators; secondly,
> Timestamp
> > > is
> > > > > > strictly
> > > > > > > >> > > > > >>> sequential,
> > > > > > > >> > > > > >>> global aggregation operators need to perform
> > > > aggregation
> > > > > > in
> > > > > > > >> > > timestamp
> > > > > > > >> > > > > >>> order
> > > > > > > >> > > > > >>> and output the final results.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 2. Coordinate multiple source nodes to assign
> > > unified
> > > > > > > >> Timestamp
> > > > > > > >> > > > > Barriers
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> Since the stateful operator needs to be aligned
> > > > > according
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > > > > >>> Timestamp
> > > > > > > >> > > > > >>> Barrier, source subtasks of multiple jobs should
> > > > > generate
> > > > > > the
> > > > > > > >> > same
> > > > > > > >> > > > > >>> Timestamp Barrier. ETL jobs consuming RootTable
> > > should
> > > > > > > >> interact
> > > > > > > >> > > with
> > > > > > > >> > > > > >>> "MetaService" to generate the same Timestamp T1,
> > T2,
> > > > T3
> > > > > > ...
> > > > > > > >> and
> > > > > > > >> > so
> > > > > > > >> > > > on.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 3. JobManager needs to manage the completed
> > > Timestamp
> > > > > > Barrier
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> When the Timestamp Barrier of the ETL job has
> been
> > > > > > completed,
> > > > > > > >> it
> > > > > > > >> > > > means
> > > > > > > >> > > > > >>> that
> > > > > > > >> > > > > >>> the data of the specified Timestamp can be
> queried
> > > by
> > > > > > users.
> > > > > > > >> > > > JobManager
> > > > > > > >> > > > > >>> needs to summarize its Timestamp processing and
> > > report
> > > > > the
> > > > > > > >> > > completed
> > > > > > > >> > > > > >>> Timestamp and data snapshots to the MetaServer.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 4. Failover supports Timestamp fine-grained data
> > > > > recovery
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> As we mentioned in the FLIP, each ETL is a
> complex
> > > > > single
> > > > > > > >> node. A
> > > > > > > >> > > > > single
> > > > > > > >> > > > > >>> ETL job failover should not cause the failure of
> > the
> > > > > > entire
> > > > > > > >> "ETL
> > > > > > > >> > > > > >>> Topology".
> > > > > > > >> > > > > >>> This requires that the result data of Timestamp
> > > > > generated
> > > > > > by
> > > > > > > >> > > upstream
> > > > > > > >> > > > > ETL
> > > > > > > >> > > > > >>> should be deterministic.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> a) The determinacy of Timestamp, that is, before
> > and
> > > > > > after ETL
> > > > > > > >> > job
> > > > > > > >> > > > > >>> failover, the same Timestamp sequence must be
> > > > generated.
> > > > > > Each
> > > > > > > >> > > > > Checkpoint
> > > > > > > >> > > > > >>> needs to record the included Timestamp list,
> > > > especially
> > > > > > the
> > > > > > > >> > source
> > > > > > > >> > > > node
> > > > > > > >> > > > > >>> of
> > > > > > > >> > > > > >>> the RootTable. After Failover, it needs to
> > > regenerate
> > > > > > > >> Timestamp
> > > > > > > >> > > > > according
> > > > > > > >> > > > > >>> to the Timestamp list.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> b) The determinacy of Timestamp data, that is,
> the
> > > > same
> > > > > > > >> Timestamp
> > > > > > > >> > > > needs
> > > > > > > >> > > > > >>> to
> > > > > > > >> > > > > >>> replay the same data before and after Failover,
> > and
> > > > > > generate
> > > > > > > >> the
> > > > > > > >> > > same
> > > > > > > >> > > > > >>> results in Sink Table. Each Timestamp must save
> > > start
> > > > > and
> > > > > > end
> > > > > > > >> > > offsets
> > > > > > > >> > > > > (or
> > > > > > > >> > > > > >>> snapshot id) of RootTable. After failover, the
> > > source
> > > > > > nodes
> > > > > > > >> need
> > > > > > > >> > to
> > > > > > > >> > > > > >>> replay
> > > > > > > >> > > > > >>> the data according to the offset to ensure that
> > the
> > > > data
> > > > > > of
> > > > > > > >> each
> > > > > > > >> > > > > >>> Timestamp
> > > > > > > >> > > > > >>> is consistent before and after Failover.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> For the specific requirements and complexity,
> > please
> > > > > help
> > > > > > to
> > > > > > > >> > review
> > > > > > > >> > > > > when
> > > > > > > >> > > > > >>> you are free @David @Piotr, thanks :)
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> 2) Evolution from Checkpoint to Timestamp
> > Mechanism
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> You give a very important question in your reply
> > > > which I
> > > > > > > >> missed
> > > > > > > >> > > > before:
> > > > > > > >> > > > > >>> if
> > > > > > > >> > > > > >>> Aligned Checkpoint is used in the first stage,
> how
> > > > > > complex is
> > > > > > > >> the
> > > > > > > >> > > > > >>> evolution
> > > > > > > >> > > > > >>> from Checkpoint to Timestamp later? I made a
> > general
> > > > > > > >> comparison
> > > > > > > >> > > here,
> > > > > > > >> > > > > >>> which
> > > > > > > >> > > > > >>> may not be very detailed. There are three roles
> in
> > > the
> > > > > > whole
> > > > > > > >> > > system:
> > > > > > > >> > > > > >>> MetaService, Flink ETL Job and Table Store.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> a) MetaService
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> It manages the data consistency among multiple
> ETL
> > > > jobs,
> > > > > > > >> > including
> > > > > > > >> > > > > >>> coordinating the Barrier for the Source ETL
> nodes,
> > > > > > setting the
> > > > > > > >> > > > starting
> > > > > > > >> > > > > >>> Barrier for ETL job startup, and calculating the
> > > Table
> > > > > > version
> > > > > > > >> > for
> > > > > > > >> > > > > >>> queries
> > > > > > > >> > > > > >>> according to different strategies. It has little
> > to
> > > do
> > > > > > with
> > > > > > > >> > > > Checkpoint
> > > > > > > >> > > > > in
> > > > > > > >> > > > > >>> fact, we can pay attention to it when designing
> > the
> > > > API
> > > > > > and
> > > > > > > >> > > > > implementing
> > > > > > > >> > > > > >>> the functions.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> b) Flink ETL Job
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> At present, the workload is relatively small and
> > we
> > > > need
> > > > > > to
> > > > > > > >> > trigger
> > > > > > > >> > > > > >>> checkpoints in CheckpointCoordinator manually by
> > > > > > > >> SplitEnumerator.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> c) Table Store
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> Table Store mainly provides the ability to write
> > and
> > > > > read
> > > > > > > >> data.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> c.1) Write data. At present, Table Store
> generates
> > > > > > snapshots
> > > > > > > >> > > > according
> > > > > > > >> > > > > to
> > > > > > > >> > > > > >>> two phases in Flink. When using Checkpoint as
> > > > > consistency
> > > > > > > >> > > management,
> > > > > > > >> > > > > we
> > > > > > > >> > > > > >>> need to write checkpoint information to
> snapshots.
> > > > After
> > > > > > using
> > > > > > > >> > > > > Timestamp
> > > > > > > >> > > > > >>> Barrier, the snapshot in Table Store may be
> > > > disassembled
> > > > > > more
> > > > > > > >> > > finely,
> > > > > > > >> > > > > and
> > > > > > > >> > > > > >>> we need to write Timestamp information to the
> data
> > > > > file. A
> > > > > > > >> > > > > "checkpointed
> > > > > > > >> > > > > >>> snapshot" may contain multiple "Timestamp
> > > snapshots".
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> c.2) Read data. The SplitEnumerator that reads
> > data
> > > > from
> > > > > > the
> > > > > > > >> > Table
> > > > > > > >> > > > > Store
> > > > > > > >> > > > > >>> will manage multiple splits according to the
> > version
> > > > > > number.
> > > > > > > >> > After
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> specified splits are completed, it sends a
> Barrier
> > > > > > command to
> > > > > > > >> > > > trigger a
> > > > > > > >> > > > > >>> checkpoint in the ETL job. The source node will
> > > > > broadcast
> > > > > > the
> > > > > > > >> > > > > checkpoint
> > > > > > > >> > > > > >>> barrier downstream after receiving it. When
> using
> > > > > > Timestamp
> > > > > > > >> > > Barrier,
> > > > > > > >> > > > > the
> > > > > > > >> > > > > >>> overall process is similar, but the
> > SplitEnumerator
> > > > does
> > > > > > not
> > > > > > > >> need
> > > > > > > >> > > to
> > > > > > > >> > > > > >>> trigger a checkpoint to the Flink ETL, and the
> > > Source
> > > > > node
> > > > > > > >> needs
> > > > > > > >> > to
> > > > > > > >> > > > > >>> support
> > > > > > > >> > > > > >>> broadcasting Timestamp Barrier to the downstream
> > at
> > > > that
> > > > > > time.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> From the above overall, the evolution complexity
> > > from
> > > > > > > >> Checkpoint
> > > > > > > >> > to
> > > > > > > >> > > > > >>> Timestamp seems controllable, but the specific
> > > > > > implementation
> > > > > > > >> > needs
> > > > > > > >> > > > > >>> careful
> > > > > > > >> > > > > >>> design, and the concept and features of
> Checkpoint
> > > > > should
> > > > > > not
> > > > > > > >> be
> > > > > > > >> > > > > >>> introduced
> > > > > > > >> > > > > >>> too much into relevant interfaces and functions.
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> What do you think of it? Looking forward to your
> > > > > feedback,
> > > > > > > >> thanks
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> Best,
> > > > > > > >> > > > > >>> Shammon
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek <
> > > > > > > >> d...@apache.org>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>> > Hi Shammon,
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > I'm starting to see what you're trying to
> > achieve,
> > > > and
> > > > > > it's
> > > > > > > >> > > really
> > > > > > > >> > > > > >>> > exciting. I share Piotr's concerns about e2e
> > > latency
> > > > > and
> > > > > > > >> > > disability
> > > > > > > >> > > > > to
> > > > > > > >> > > > > >>> use
> > > > > > > >> > > > > >>> > unaligned checkpoints.
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > I have a couple of questions that are not
> clear
> > to
> > > > me
> > > > > > from
> > > > > > > >> > going
> > > > > > > >> > > > over
> > > > > > > >> > > > > >>> the
> > > > > > > >> > > > > >>> > FLIP:
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > 1) Global Checkpoint Commit
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > Are you planning on committing the checkpoints
> > in
> > > > a) a
> > > > > > > >> "rolling
> > > > > > > >> > > > > >>> fashion" -
> > > > > > > >> > > > > >>> > one pipeline after another, or b) altogether -
> > > once
> > > > > the
> > > > > > data
> > > > > > > >> > have
> > > > > > > >> > > > > been
> > > > > > > >> > > > > >>> > processed by all pipelines?
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > Option a) would be eventually consistent (for
> > > batch
> > > > > > queries,
> > > > > > > >> > > you'd
> > > > > > > >> > > > > >>> need to
> > > > > > > >> > > > > >>> > use the last checkpoint produced by the most
> > > > > downstream
> > > > > > > >> table),
> > > > > > > >> > > > > >>> whereas b)
> > > > > > > >> > > > > >>> > would be strongly consistent at the cost of
> > > > increasing
> > > > > > the
> > > > > > > >> e2e
> > > > > > > >> > > > > latency
> > > > > > > >> > > > > >>> even
> > > > > > > >> > > > > >>> > more.
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > I feel that option a) is what this should be
> > > headed
> > > > > for.
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > 2) MetaService
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > Should this be a new general Flink component
> or
> > > one
> > > > > > > >> specific to
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> Flink
> > > > > > > >> > > > > >>> > Table Store?
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > 3) Follow-ups
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > From the above discussion, there is a
> consensus
> > > > that,
> > > > > > in the
> > > > > > > >> > > ideal
> > > > > > > >> > > > > >>> case,
> > > > > > > >> > > > > >>> > watermarks would be a way to go, but there is
> > some
> > > > > > > >> underlying
> > > > > > > >> > > > > mechanism
> > > > > > > >> > > > > >>> > missing. It would be great to discuss this
> > option
> > > in
> > > > > > more
> > > > > > > >> > detail
> > > > > > > >> > > to
> > > > > > > >> > > > > >>> compare
> > > > > > > >> > > > > >>> > the solutions in terms of implementation cost,
> > > maybe
> > > > > it
> > > > > > > >> could
> > > > > > > >> > not
> > > > > > > >> > > > be
> > > > > > > >> > > > > as
> > > > > > > >> > > > > >>> > complex.
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > All in all, I don't feel that checkpoints are
> > > > suitable
> > > > > > for
> > > > > > > >> > > > providing
> > > > > > > >> > > > > >>> > consistent table versioning between multiple
> > > > > pipelines.
> > > > > > The
> > > > > > > >> > main
> > > > > > > >> > > > > >>> reason is
> > > > > > > >> > > > > >>> > that they are designed to be a fault tolerance
> > > > > > mechanism.
> > > > > > > >> > > Somewhere
> > > > > > > >> > > > > >>> between
> > > > > > > >> > > > > >>> > the lines, you've already noted that the
> > primitive
> > > > > > you're
> > > > > > > >> > looking
> > > > > > > >> > > > for
> > > > > > > >> > > > > >>> is
> > > > > > > >> > > > > >>> > cross-pipeline barrier alignment, which is the
> > > > > > mechanism a
> > > > > > > >> > subset
> > > > > > > >> > > > of
> > > > > > > >> > > > > >>> > currently supported checkpointing
> > implementations
> > > > > > happen to
> > > > > > > >> be
> > > > > > > >> > > > using.
> > > > > > > >> > > > > >>> Is
> > > > > > > >> > > > > >>> > that correct?
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > My biggest concern is that tying this with a
> > > > > > "side-effect"
> > > > > > > >> of
> > > > > > > >> > the
> > > > > > > >> > > > > >>> > checkpointing mechanism could block us from
> > > evolving
> > > > > it
> > > > > > > >> > further.
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > Best,
> > > > > > > >> > > > > >>> > D.
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY <
> > > > > > > >> zjur...@gmail.com>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>> > > Hi Piotr,
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Thank you for your feedback. I cannot see
> the
> > > DAG
> > > > in
> > > > > > 3.a
> > > > > > > >> in
> > > > > > > >> > > your
> > > > > > > >> > > > > >>> reply,
> > > > > > > >> > > > > >>> > but
> > > > > > > >> > > > > >>> > > I'd like to answer some questions first.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Your understanding is very correct. We want
> to
> > > > align
> > > > > > the
> > > > > > > >> data
> > > > > > > >> > > > > >>> versions of
> > > > > > > >> > > > > >>> > > all intermediate tables through checkpoint
> > > > mechanism
> > > > > > in
> > > > > > > >> > Flink.
> > > > > > > >> > > > I'm
> > > > > > > >> > > > > >>> sorry
> > > > > > > >> > > > > >>> > > that I have omitted some default constraints
> > in
> > > > > FLIP,
> > > > > > > >> > including
> > > > > > > >> > > > > only
> > > > > > > >> > > > > >>> > > supporting aligned checkpoints; one table
> can
> > > only
> > > > > be
> > > > > > > >> written
> > > > > > > >> > > by
> > > > > > > >> > > > > one
> > > > > > > >> > > > > >>> ETL
> > > > > > > >> > > > > >>> > > job. I will add these later.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Why can't the watermark mechanism achieve
> the
> > > data
> > > > > > > >> > consistency
> > > > > > > >> > > we
> > > > > > > >> > > > > >>> wanted?
> > > > > > > >> > > > > >>> > > For example, there are 3 tables, Table1 is
> > word
> > > > > table,
> > > > > > > >> Table2
> > > > > > > >> > > is
> > > > > > > >> > > > > >>> > word->cnt
> > > > > > > >> > > > > >>> > > table and Table3 is cnt1->cnt2 table.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO
> > > Table2
> > > > > > SELECT
> > > > > > > >> > word,
> > > > > > > >> > > > > >>> count(*)
> > > > > > > >> > > > > >>> > > FROM Table1 GROUP BY word
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO
> > > Table3
> > > > > > SELECT
> > > > > > > >> cnt,
> > > > > > > >> > > > > >>> count(*)
> > > > > > > >> > > > > >>> > FROM
> > > > > > > >> > > > > >>> > > Table2 GROUP BY cnt
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > ETL1 has 2 subtasks to read multiple buckets
> > > from
> > > > > > Table1,
> > > > > > > >> > where
> > > > > > > >> > > > > >>> subtask1
> > > > > > > >> > > > > >>> > > reads streaming data as [a, b, c, a, d, a,
> b,
> > > c, d
> > > > > > ...]
> > > > > > > >> and
> > > > > > > >> > > > > subtask2
> > > > > > > >> > > > > >>> > reads
> > > > > > > >> > > > > >>> > > streaming data as [a, c, d, q, a, v, c, d
> > ...].
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 1. Unbounded streaming data is divided into
> > > > multiple
> > > > > > sets
> > > > > > > >> > > > according
> > > > > > > >> > > > > >>> to
> > > > > > > >> > > > > >>> > some
> > > > > > > >> > > > > >>> > > semantic requirements. The most extreme may
> be
> > > one
> > > > > > set for
> > > > > > > >> > each
> > > > > > > >> > > > > data.
> > > > > > > >> > > > > >>> > > Assume that the sets of subtask1 and
> subtask2
> > > > > > separated by
> > > > > > > >> > the
> > > > > > > >> > > > same
> > > > > > > >> > > > > >>> > > semantics are [a, b, c, a, d] and [a, c, d,
> > q],
> > > > > > > >> respectively.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 2. After the above two sets are computed by
> > > ETL1,
> > > > > the
> > > > > > > >> result
> > > > > > > >> > > data
> > > > > > > >> > > > > >>> > generated
> > > > > > > >> > > > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d,
> 2),
> > > (q,
> > > > > > 1)].
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 3. The result data generated in Table 3
> after
> > > the
> > > > > > data in
> > > > > > > >> > > Table 2
> > > > > > > >> > > > > is
> > > > > > > >> > > > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3, 1)]
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > We want to align the data of Table1, Table2
> > and
> > > > > > Table3 and
> > > > > > > >> > > manage
> > > > > > > >> > > > > the
> > > > > > > >> > > > > >>> > data
> > > > > > > >> > > > > >>> > > versions. When users execute OLAP/Batch
> > queries
> > > > join
> > > > > > on
> > > > > > > >> these
> > > > > > > >> > > > > >>> tables, the
> > > > > > > >> > > > > >>> > > following consistency data can be found
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q]
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2],
> [q,
> > > 1]
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1]
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Users can perform query: SELECT t1.word,
> > t2.cnt,
> > > > > > t3.cnt2
> > > > > > > >> from
> > > > > > > >> > > > > Table1
> > > > > > > >> > > > > >>> t1
> > > > > > > >> > > > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on
> > t1.word=t2.word
> > > > and
> > > > > > > >> > > > > t2.cnt=t3.cnt1;
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > In the view of users, the data is consistent
> > on
> > > a
> > > > > > unified
> > > > > > > >> > > > "version"
> > > > > > > >> > > > > >>> > between
> > > > > > > >> > > > > >>> > > Table1, Table2 and Table3.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > In the current Flink implementation, the
> > aligned
> > > > > > > >> checkpoint
> > > > > > > >> > can
> > > > > > > >> > > > > >>> achieve
> > > > > > > >> > > > > >>> > the
> > > > > > > >> > > > > >>> > > above capabilities (let's ignore the
> > > segmentation
> > > > > > > >> semantics
> > > > > > > >> > of
> > > > > > > >> > > > > >>> checkpoint
> > > > > > > >> > > > > >>> > > first). Because the Checkpoint Barrier will
> > > align
> > > > > the
> > > > > > data
> > > > > > > >> > when
> > > > > > > >> > > > > >>> > performing
> > > > > > > >> > > > > >>> > > the global Count aggregation, we can
> associate
> > > the
> > > > > > > >> snapshot
> > > > > > > >> > > with
> > > > > > > >> > > > > the
> > > > > > > >> > > > > >>> > > checkpoint in the Table Store, query the
> > > specified
> > > > > > > >> snapshot
> > > > > > > >> > of
> > > > > > > >> > > > > >>> > > Table1/Table2/Table3 through the checkpoint,
> > and
> > > > > > achieve
> > > > > > > >> the
> > > > > > > >> > > > > >>> consistency
> > > > > > > >> > > > > >>> > > requirements of the above unified "version".
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Current watermark mechanism in Flink cannot
> > > > achieve
> > > > > > the
> > > > > > > >> above
> > > > > > > >> > > > > >>> > consistency.
> > > > > > > >> > > > > >>> > > For example, we use watermark to divide data
> > > into
> > > > > > multiple
> > > > > > > >> > sets
> > > > > > > >> > > > in
> > > > > > > >> > > > > >>> > subtask1
> > > > > > > >> > > > > >>> > > and subtask2 as followed
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a,
> > T1),
> > > > (d,
> > > > > > T1)],
> > > > > > > >> > T1,
> > > > > > > >> > > > [(a,
> > > > > > > >> > > > > >>> T2),
> > > > > > > >> > > > > >>> > > (b, T2), (c, T2), (d, T2)], T2
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1), (q,
> > > T1)],
> > > > > T1,
> > > > > > > >> ....
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > As Flink watermark does not have barriers
> and
> > > > cannot
> > > > > > align
> > > > > > > >> > > data,
> > > > > > > >> > > > > ETL1
> > > > > > > >> > > > > >>> > Count
> > > > > > > >> > > > > >>> > > operator may compute the data of subtask1
> > first:
> > > > > [(a,
> > > > > > T1),
> > > > > > > >> > (b,
> > > > > > > >> > > > T1),
> > > > > > > >> > > > > >>> (c,
> > > > > > > >> > > > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b,
> > T2)],
> > > > then
> > > > > > > >> compute
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> data of
> > > > > > > >> > > > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q,
> > T1)],
> > > > T1,
> > > > > > which
> > > > > > > >> is
> > > > > > > >> > > not
> > > > > > > >> > > > > >>> possible
> > > > > > > >> > > > > >>> > > in aligned checkpoint.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > In this order, the result output to Table2
> > after
> > > > the
> > > > > > Count
> > > > > > > >> > > > > >>> aggregation
> > > > > > > >> > > > > >>> > will
> > > > > > > >> > > > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1), (a,
> 2,
> > > > T1),
> > > > > > (a, 3,
> > > > > > > >> > T2),
> > > > > > > >> > > > (b,
> > > > > > > >> > > > > >>> 2,
> > > > > > > >> > > > > >>> > T2),
> > > > > > > >> > > > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1,
> > T1),
> > > > > which
> > > > > > can
> > > > > > > >> be
> > > > > > > >> > > > > >>> simplified
> > > > > > > >> > > > > >>> > as:
> > > > > > > >> > > > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4,
> > T1),
> > > > (c,
> > > > > > 2,
> > > > > > > >> T1),
> > > > > > > >> > > (d,
> > > > > > > >> > > > 1,
> > > > > > > >> > > > > >>> T1),
> > > > > > > >> > > > > >>> > > (q, 1, T1)]
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > There's no (a, 3, T1), we have been unable
> to
> > > > query
> > > > > > > >> > consistent
> > > > > > > >> > > > data
> > > > > > > >> > > > > >>> > results
> > > > > > > >> > > > > >>> > > on Table1 and Table2 according to T1. Table
> 3
> > > has
> > > > > the
> > > > > > same
> > > > > > > >> > > > problem.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > In addition to using Checkpoint Barrier, the
> > > other
> > > > > > > >> > > implementation
> > > > > > > >> > > > > >>> > > supporting watermark above is to convert
> Count
> > > > > > aggregation
> > > > > > > >> > into
> > > > > > > >> > > > > >>> Window
> > > > > > > >> > > > > >>> > > Count. After the global Count is converted
> > into
> > > > > window
> > > > > > > >> > > operator,
> > > > > > > >> > > > it
> > > > > > > >> > > > > >>> needs
> > > > > > > >> > > > > >>> > > to support cross window data computation.
> > > Similar
> > > > to
> > > > > > the
> > > > > > > >> data
> > > > > > > >> > > > > >>> > relationship
> > > > > > > >> > > > > >>> > > between the previous and the current
> > Checkpoint,
> > > > it
> > > > > is
> > > > > > > >> > > equivalent
> > > > > > > >> > > > > to
> > > > > > > >> > > > > >>> > > introducing the Watermark Barrier, which
> > > requires
> > > > > > > >> adjustments
> > > > > > > >> > > to
> > > > > > > >> > > > > the
> > > > > > > >> > > > > >>> > > current Flink Watermark mechanism.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Besides the above global aggregation, there
> > are
> > > > > window
> > > > > > > >> > > operators
> > > > > > > >> > > > in
> > > > > > > >> > > > > >>> > Flink.
> > > > > > > >> > > > > >>> > > I don't know if my understanding is
> correct(I
> > > > cannot
> > > > > > see
> > > > > > > >> the
> > > > > > > >> > > DAG
> > > > > > > >> > > > in
> > > > > > > >> > > > > >>> your
> > > > > > > >> > > > > >>> > > example), please correct me if it's wrong. I
> > > think
> > > > > you
> > > > > > > >> raise
> > > > > > > >> > a
> > > > > > > >> > > > very
> > > > > > > >> > > > > >>> > > important and interesting question: how to
> > > define
> > > > > data
> > > > > > > >> > > > consistency
> > > > > > > >> > > > > in
> > > > > > > >> > > > > >>> > > different window computations which will
> > > generate
> > > > > > > >> different
> > > > > > > >> > > > > >>> timestamps of
> > > > > > > >> > > > > >>> > > the same data. This situation also occurs
> when
> > > > using
> > > > > > event
> > > > > > > >> > time
> > > > > > > >> > > > to
> > > > > > > >> > > > > >>> align
> > > > > > > >> > > > > >>> > > data. At present, what I can think of is to
> > > store
> > > > > > these
> > > > > > > >> > > > information
> > > > > > > >> > > > > >>> in
> > > > > > > >> > > > > >>> > > Table Store, users can perform filter or
> join
> > on
> > > > > data
> > > > > > with
> > > > > > > >> > > them.
> > > > > > > >> > > > > This
> > > > > > > >> > > > > >>> > FLIP
> > > > > > > >> > > > > >>> > > is our first phase, and the specific
> > > > implementation
> > > > > of
> > > > > > > >> this
> > > > > > > >> > > will
> > > > > > > >> > > > be
> > > > > > > >> > > > > >>> > > designed and considered in the next phase
> and
> > > > FLIP.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Although the Checkpoint Barrier can achieve
> > the
> > > > most
> > > > > > basic
> > > > > > > >> > > > > >>> consistency,
> > > > > > > >> > > > > >>> > as
> > > > > > > >> > > > > >>> > > you mentioned, using the Checkpoint
> mechanism
> > > will
> > > > > > cause
> > > > > > > >> many
> > > > > > > >> > > > > >>> problems,
> > > > > > > >> > > > > >>> > > including the increase of checkpoint time
> for
> > > > > multiple
> > > > > > > >> > cascade
> > > > > > > >> > > > > jobs,
> > > > > > > >> > > > > >>> the
> > > > > > > >> > > > > >>> > > increase of E2E data freshness time (several
> > > > minutes
> > > > > > or
> > > > > > > >> even
> > > > > > > >> > > > dozens
> > > > > > > >> > > > > >>> of
> > > > > > > >> > > > > >>> > > minutes), and the increase of the overall
> > system
> > > > > > > >> complexity.
> > > > > > > >> > At
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> same
> > > > > > > >> > > > > >>> > > time, the semantics of Checkpoint data
> > > > segmentation
> > > > > is
> > > > > > > >> > unclear.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > The current FLIP is the first phase of our
> > whole
> > > > > > proposal,
> > > > > > > >> > and
> > > > > > > >> > > > you
> > > > > > > >> > > > > >>> can
> > > > > > > >> > > > > >>> > find
> > > > > > > >> > > > > >>> > > the follow-up plan in our future worker. In
> > the
> > > > > first
> > > > > > > >> stage,
> > > > > > > >> > we
> > > > > > > >> > > > do
> > > > > > > >> > > > > >>> not
> > > > > > > >> > > > > >>> > want
> > > > > > > >> > > > > >>> > > to modify the Flink mechanism. We'd like to
> > > > realize
> > > > > > basic
> > > > > > > >> > > system
> > > > > > > >> > > > > >>> > functions
> > > > > > > >> > > > > >>> > > based on existing mechanisms in Flink,
> > including
> > > > the
> > > > > > > >> > > relationship
> > > > > > > >> > > > > >>> > > management of ETL and tables, and the basic
> > data
> > > > > > > >> consistency,
> > > > > > > >> > > so
> > > > > > > >> > > > we
> > > > > > > >> > > > > >>> > choose
> > > > > > > >> > > > > >>> > > Global Checkpoint in our FLIP.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > We agree with you very much that event time
> is
> > > > more
> > > > > > > >> suitable
> > > > > > > >> > > for
> > > > > > > >> > > > > data
> > > > > > > >> > > > > >>> > > consistency management. We'd like consider
> > this
> > > > > > matter in
> > > > > > > >> the
> > > > > > > >> > > > > second
> > > > > > > >> > > > > >>> or
> > > > > > > >> > > > > >>> > > third stage after the current FLIP. We hope
> to
> > > > > > improve the
> > > > > > > >> > > > > watermark
> > > > > > > >> > > > > >>> > > mechanism in Flink to support barriers. As
> you
> > > > > > mentioned
> > > > > > > >> in
> > > > > > > >> > > your
> > > > > > > >> > > > > >>> reply,
> > > > > > > >> > > > > >>> > we
> > > > > > > >> > > > > >>> > > can achieve data consistency based on
> > timestamp,
> > > > > while
> > > > > > > >> > > > maintaining
> > > > > > > >> > > > > >>> E2E
> > > > > > > >> > > > > >>> > data
> > > > > > > >> > > > > >>> > > freshness of seconds or even milliseconds
> for
> > > 10+
> > > > > > cascaded
> > > > > > > >> > > jobs.
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > What do you think? Thanks
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > Best,
> > > > > > > >> > > > > >>> > > Shammon
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr
> Nowojski
> > <
> > > > > > > >> > > > > pnowoj...@apache.org>
> > > > > > > >> > > > > >>> > > wrote:
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> > > > Hi Shammon,
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > Do I understand it correctly, that you
> > > > effectively
> > > > > > want
> > > > > > > >> to
> > > > > > > >> > > > expand
> > > > > > > >> > > > > >>> the
> > > > > > > >> > > > > >>> > > > checkpoint alignment mechanism across many
> > > > > different
> > > > > > > >> jobs
> > > > > > > >> > and
> > > > > > > >> > > > > hand
> > > > > > > >> > > > > >>> over
> > > > > > > >> > > > > >>> > > > checkpoint barriers from upstream to
> > > downstream
> > > > > jobs
> > > > > > > >> using
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> > > intermediate
> > > > > > > >> > > > > >>> > > > tables?
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > Re the watermarks for the "Rejected
> > > > > Alternatives". I
> > > > > > > >> don't
> > > > > > > >> > > > > >>> understand
> > > > > > > >> > > > > >>> > why
> > > > > > > >> > > > > >>> > > > this has been rejected. Could you
> elaborate
> > on
> > > > > this
> > > > > > > >> point?
> > > > > > > >> > > Here
> > > > > > > >> > > > > >>> are a
> > > > > > > >> > > > > >>> > > > couple of my thoughts on this matter, but
> > > please
> > > > > > > >> correct me
> > > > > > > >> > > if
> > > > > > > >> > > > > I'm
> > > > > > > >> > > > > >>> > wrong,
> > > > > > > >> > > > > >>> > > > as I haven't dived deeper into this topic.
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > > As shown above, there are 2 watermarks
> T1
> > > and
> > > > > T2,
> > > > > > T1 <
> > > > > > > >> > T2.
> > > > > > > >> > > > > >>> > > > > The StreamTask reads data in order:
> > > > > > > >> > > > > >>> > > > V11,V12,V21,T1(channel1),V13,T1(channel2).
> > > > > > > >> > > > > >>> > > > > At this time, StreamTask will confirm
> that
> > > > > > watermark
> > > > > > > >> T1
> > > > > > > >> > is
> > > > > > > >> > > > > >>> completed,
> > > > > > > >> > > > > >>> > > > but the data beyond
> > > > > > > >> > > > > >>> > > > > T1 has been processed(V13) and the
> results
> > > are
> > > > > > > >> written to
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> sink
> > > > > > > >> > > > > >>> > > > table.
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > 1. I see the same "problem" with unaligned
> > > > > > checkpoints
> > > > > > > >> in
> > > > > > > >> > > your
> > > > > > > >> > > > > >>> current
> > > > > > > >> > > > > >>> > > > proposal.
> > > > > > > >> > > > > >>> > > > 2. I don't understand why this is a
> problem?
> > > > Just
> > > > > > store
> > > > > > > >> in
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> "sink
> > > > > > > >> > > > > >>> > > > table" what's the watermark (T1), and
> > > downstream
> > > > > > jobs
> > > > > > > >> > should
> > > > > > > >> > > > > >>> process
> > > > > > > >> > > > > >>> > the
> > > > > > > >> > > > > >>> > > > data with that "watermark" anyway. Record
> > > "V13"
> > > > > > should
> > > > > > > >> be
> > > > > > > >> > > > treated
> > > > > > > >> > > > > >>> as
> > > > > > > >> > > > > >>> > > > "early" data. Downstream jobs if:
> > > > > > > >> > > > > >>> > > >  a) they are streaming jobs, for example
> > they
> > > > > should
> > > > > > > >> > > aggregate
> > > > > > > >> > > > it
> > > > > > > >> > > > > >>> in
> > > > > > > >> > > > > >>> > > > windowed/temporal state, but they
> shouldn't
> > > > > produce
> > > > > > the
> > > > > > > >> > > result
> > > > > > > >> > > > > that
> > > > > > > >> > > > > >>> > > > contains it, as the watermark T2 was not
> yet
> > > > > > processed.
> > > > > > > >> Or
> > > > > > > >> > > they
> > > > > > > >> > > > > >>> would
> > > > > > > >> > > > > >>> > > just
> > > > > > > >> > > > > >>> > > > pass that record as "early" data.
> > > > > > > >> > > > > >>> > > >  b) they are batch jobs, it looks to me
> like
> > > > batch
> > > > > > jobs
> > > > > > > >> > > > shouldn't
> > > > > > > >> > > > > >>> take
> > > > > > > >> > > > > >>> > > > "all available data", but only consider
> "all
> > > the
> > > > > > data
> > > > > > > >> until
> > > > > > > >> > > > some
> > > > > > > >> > > > > >>> > > > watermark", for example the latest
> > available:
> > > T1
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > 3. I'm pretty sure there are counter
> > examples,
> > > > > where
> > > > > > > >> your
> > > > > > > >> > > > > proposed
> > > > > > > >> > > > > >>> > > > mechanism of using checkpoints (even
> > aligned!)
> > > > > will
> > > > > > > >> produce
> > > > > > > >> > > > > >>> > > > inconsistent data from the perspective of
> > the
> > > > > event
> > > > > > > >> time.
> > > > > > > >> > > > > >>> > > >   a) For example what if one of your "ETL"
> > > jobs,
> > > > > > has the
> > > > > > > >> > > > > following
> > > > > > > >> > > > > >>> DAG:
> > > > > > > >> > > > > >>> > > > [image: flip276.jpg]
> > > > > > > >> > > > > >>> > > >   Even if you use aligned checkpoints for
> > > > > > committing the
> > > > > > > >> > data
> > > > > > > >> > > > to
> > > > > > > >> > > > > >>> the
> > > > > > > >> > > > > >>> > sink
> > > > > > > >> > > > > >>> > > > table, the watermarks of "Window1" and
> > > "Window2"
> > > > > are
> > > > > > > >> > > completely
> > > > > > > >> > > > > >>> > > > independent. The sink table might easily
> > have
> > > > data
> > > > > > from
> > > > > > > >> the
> > > > > > > >> > > > > >>> > Src1/Window1
> > > > > > > >> > > > > >>> > > > from the event time T1 and Src2/Window2
> from
> > > > later
> > > > > > event
> > > > > > > >> > time
> > > > > > > >> > > > T2.
> > > > > > > >> > > > > >>> > > >   b) I think the same applies if you have
> > two
> > > > > > completely
> > > > > > > >> > > > > >>> independent
> > > > > > > >> > > > > >>> > ETL
> > > > > > > >> > > > > >>> > > > jobs writing either to the same sink
> table,
> > or
> > > > two
> > > > > > to
> > > > > > > >> > > different
> > > > > > > >> > > > > >>> sink
> > > > > > > >> > > > > >>> > > tables
> > > > > > > >> > > > > >>> > > > (that are both later used in the same
> > > downstream
> > > > > > job).
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > 4a) I'm not sure if I like the idea of
> > > > > centralising
> > > > > > the
> > > > > > > >> > whole
> > > > > > > >> > > > > >>> system in
> > > > > > > >> > > > > >>> > > > this way. If you have 10 jobs, the
> > likelihood
> > > of
> > > > > the
> > > > > > > >> > > checkpoint
> > > > > > > >> > > > > >>> failure
> > > > > > > >> > > > > >>> > > > will be 10 times higher, and/or the
> duration
> > > of
> > > > > the
> > > > > > > >> > > checkpoint
> > > > > > > >> > > > > can
> > > > > > > >> > > > > >>> be
> > > > > > > >> > > > > >>> > > much
> > > > > > > >> > > > > >>> > > > much longer (especially under
> backpressure).
> > > And
> > > > > > this is
> > > > > > > >> > > > actually
> > > > > > > >> > > > > >>> > > already a
> > > > > > > >> > > > > >>> > > > limitation of Apache Flink (global
> > checkpoints
> > > > are
> > > > > > more
> > > > > > > >> > prone
> > > > > > > >> > > > to
> > > > > > > >> > > > > >>> fail
> > > > > > > >> > > > > >>> > the
> > > > > > > >> > > > > >>> > > > larger the scale), so I would be anxious
> > about
> > > > > > making it
> > > > > > > >> > > > > >>> potentially
> > > > > > > >> > > > > >>> > > even a
> > > > > > > >> > > > > >>> > > > larger issue.
> > > > > > > >> > > > > >>> > > > 4b) I'm also worried about increased
> > > complexity
> > > > of
> > > > > > the
> > > > > > > >> > system
> > > > > > > >> > > > > after
> > > > > > > >> > > > > >>> > > adding
> > > > > > > >> > > > > >>> > > > the global checkpoint, and additional
> > > (single?)
> > > > > > point of
> > > > > > > >> > > > failure.
> > > > > > > >> > > > > >>> > > > 5. Such a design would also not work if we
> > > ever
> > > > > > wanted
> > > > > > > >> to
> > > > > > > >> > > have
> > > > > > > >> > > > > task
> > > > > > > >> > > > > >>> > local
> > > > > > > >> > > > > >>> > > > checkpoints.
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > All in all, it seems to me like actually
> the
> > > > > > watermarks
> > > > > > > >> and
> > > > > > > >> > > > even
> > > > > > > >> > > > > >>> time
> > > > > > > >> > > > > >>> > are
> > > > > > > >> > > > > >>> > > > the better concept in this context that
> > should
> > > > > have
> > > > > > been
> > > > > > > >> > used
> > > > > > > >> > > > for
> > > > > > > >> > > > > >>> > > > synchronising and data consistency across
> > the
> > > > > whole
> > > > > > > >> system.
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > Best,
> > > > > > > >> > > > > >>> > > > Piotrek
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY <
> > > > > > zjur...@gmail.com>
> > > > > > > >> > > > > >>> napisał(a):
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > >> Hi @Martijn
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> Thanks for your comments, and I'd like to
> > > reply
> > > > > to
> > > > > > them
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> 1. It sounds good to me, I'll update the
> > > > content
> > > > > > > >> structure
> > > > > > > >> > > in
> > > > > > > >> > > > > FLIP
> > > > > > > >> > > > > >>> > later
> > > > > > > >> > > > > >>> > > >> and give the problems first.
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> 2. "Each ETL job creates snapshots with
> > > > > checkpoint
> > > > > > > >> info on
> > > > > > > >> > > > sink
> > > > > > > >> > > > > >>> tables
> > > > > > > >> > > > > >>> > > in
> > > > > > > >> > > > > >>> > > >> Table Store"  -> That reads like you're
> > > > proposing
> > > > > > that
> > > > > > > >> > > > snapshots
> > > > > > > >> > > > > >>> need
> > > > > > > >> > > > > >>> > to
> > > > > > > >> > > > > >>> > > >> be
> > > > > > > >> > > > > >>> > > >> written to Table Store?
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> Yes. To support the data consistency in
> the
> > > > FLIP,
> > > > > > we
> > > > > > > >> need
> > > > > > > >> > to
> > > > > > > >> > > > get
> > > > > > > >> > > > > >>> > through
> > > > > > > >> > > > > >>> > > >> checkpoints in Flink and snapshots in
> > store,
> > > > this
> > > > > > > >> > requires a
> > > > > > > >> > > > > close
> > > > > > > >> > > > > >>> > > >> combination of Flink and store
> > > implementation.
> > > > In
> > > > > > the
> > > > > > > >> > first
> > > > > > > >> > > > > stage
> > > > > > > >> > > > > >>> we
> > > > > > > >> > > > > >>> > > plan
> > > > > > > >> > > > > >>> > > >> to implement it based on Flink and Table
> > > Store
> > > > > > only,
> > > > > > > >> > > snapshots
> > > > > > > >> > > > > >>> written
> > > > > > > >> > > > > >>> > > to
> > > > > > > >> > > > > >>> > > >> external storage don't support
> consistency.
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> 3. If you introduce a MetaService, it
> > becomes
> > > > the
> > > > > > > >> single
> > > > > > > >> > > point
> > > > > > > >> > > > > of
> > > > > > > >> > > > > >>> > > failure
> > > > > > > >> > > > > >>> > > >> because it coordinates everything. But I
> > > can't
> > > > > find
> > > > > > > >> > anything
> > > > > > > >> > > > in
> > > > > > > >> > > > > >>> the
> > > > > > > >> > > > > >>> > FLIP
> > > > > > > >> > > > > >>> > > >> on
> > > > > > > >> > > > > >>> > > >> making the MetaService high available or
> > how
> > > to
> > > > > > deal
> > > > > > > >> with
> > > > > > > >> > > > > >>> failovers
> > > > > > > >> > > > > >>> > > there.
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> I think you raise a very important
> problem
> > > and
> > > > I
> > > > > > > >> missed it
> > > > > > > >> > > in
> > > > > > > >> > > > > >>> FLIP.
> > > > > > > >> > > > > >>> > The
> > > > > > > >> > > > > >>> > > >> MetaService is a single point and should
> > > > support
> > > > > > > >> failover,
> > > > > > > >> > > we
> > > > > > > >> > > > > >>> will do
> > > > > > > >> > > > > >>> > it
> > > > > > > >> > > > > >>> > > >> in
> > > > > > > >> > > > > >>> > > >> future in the first stage we only support
> > > > > > standalone
> > > > > > > >> mode,
> > > > > > > >> > > THX
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> 4. The FLIP states under Rejected
> > > Alternatives
> > > > > > > >> "Currently
> > > > > > > >> > > > > >>> watermark in
> > > > > > > >> > > > > >>> > > >> Flink cannot align data." which is not
> > true,
> > > > > given
> > > > > > that
> > > > > > > >> > > there
> > > > > > > >> > > > is
> > > > > > > >> > > > > >>> > > FLIP-182
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> Watermark alignment in FLIP-182 is
> > different
> > > > from
> > > > > > > >> > > requirements
> > > > > > > >> > > > > >>> > > "watermark
> > > > > > > >> > > > > >>> > > >> align data" in our FLIP. FLIP-182 aims to
> > fix
> > > > > > watermark
> > > > > > > >> > > > > >>> generation in
> > > > > > > >> > > > > >>> > > >> different sources for "slight imbalance
> or
> > > data
> > > > > > skew",
> > > > > > > >> > which
> > > > > > > >> > > > > >>> means in
> > > > > > > >> > > > > >>> > > some
> > > > > > > >> > > > > >>> > > >> cases the source must generate watermark
> > even
> > > > if
> > > > > > they
> > > > > > > >> > should
> > > > > > > >> > > > > not.
> > > > > > > >> > > > > >>> When
> > > > > > > >> > > > > >>> > > the
> > > > > > > >> > > > > >>> > > >> operator collects watermarks, the data
> > > > processing
> > > > > > is as
> > > > > > > >> > > > > described
> > > > > > > >> > > > > >>> in
> > > > > > > >> > > > > >>> > our
> > > > > > > >> > > > > >>> > > >> FLIP, and the data cannot be aligned
> > through
> > > > the
> > > > > > > >> barrier
> > > > > > > >> > > like
> > > > > > > >> > > > > >>> > > Checkpoint.
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> 5. Given the MetaService role, it feels
> > like
> > > > this
> > > > > > is
> > > > > > > >> > > > > introducing a
> > > > > > > >> > > > > >>> > tight
> > > > > > > >> > > > > >>> > > >> dependency between Flink and the Table
> > Store.
> > > > How
> > > > > > > >> > pluggable
> > > > > > > >> > > is
> > > > > > > >> > > > > >>> this
> > > > > > > >> > > > > >>> > > >> solution, given the changes that need to
> be
> > > > made
> > > > > to
> > > > > > > >> Flink
> > > > > > > >> > in
> > > > > > > >> > > > > >>> order to
> > > > > > > >> > > > > >>> > > >> support this?
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> This is a good question, and I will try
> to
> > > > expand
> > > > > > it.
> > > > > > > >> Most
> > > > > > > >> > > of
> > > > > > > >> > > > > the
> > > > > > > >> > > > > >>> work
> > > > > > > >> > > > > >>> > > >> will
> > > > > > > >> > > > > >>> > > >> be completed in the Table Store, such as
> > the
> > > > new
> > > > > > > >> > > > SplitEnumerator
> > > > > > > >> > > > > >>> and
> > > > > > > >> > > > > >>> > > >> Source
> > > > > > > >> > > > > >>> > > >> implementation. The changes in Flink are
> as
> > > > > > followed:
> > > > > > > >> > > > > >>> > > >> 1) Flink job should put its job id in
> > context
> > > > > when
> > > > > > > >> > creating
> > > > > > > >> > > > > >>> > source/sink
> > > > > > > >> > > > > >>> > > to
> > > > > > > >> > > > > >>> > > >> help MetaService to create relationship
> > > between
> > > > > > source
> > > > > > > >> and
> > > > > > > >> > > > sink
> > > > > > > >> > > > > >>> > tables,
> > > > > > > >> > > > > >>> > > >> it's tiny
> > > > > > > >> > > > > >>> > > >> 2) Notify a listener when job is
> terminated
> > > in
> > > > > > Flink,
> > > > > > > >> and
> > > > > > > >> > > the
> > > > > > > >> > > > > >>> listener
> > > > > > > >> > > > > >>> > > >> implementation in Table Store will send
> > > "delete
> > > > > > event"
> > > > > > > >> to
> > > > > > > >> > > > > >>> MetaService.
> > > > > > > >> > > > > >>> > > >> 3) The changes are related to Flink
> > > Checkpoint
> > > > > > includes
> > > > > > > >> > > > > >>> > > >>   a) Support triggering checkpoint with
> > > > > checkpoint
> > > > > > id
> > > > > > > >> by
> > > > > > > >> > > > > >>> > SplitEnumerator
> > > > > > > >> > > > > >>> > > >>   b) Create the SplitEnumerator in Table
> > > Store
> > > > > > with a
> > > > > > > >> > > strategy
> > > > > > > >> > > > > to
> > > > > > > >> > > > > >>> > > perform
> > > > > > > >> > > > > >>> > > >> the specific checkpoint when all
> > > > > > "SplitEnumerator"s in
> > > > > > > >> the
> > > > > > > >> > > job
> > > > > > > >> > > > > >>> manager
> > > > > > > >> > > > > >>> > > >> trigger it.
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> Best,
> > > > > > > >> > > > > >>> > > >> Shammon
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn
> > > Visser <
> > > > > > > >> > > > > >>> > martijnvis...@apache.org
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > > >> wrote:
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >> > Hi all,
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> > A couple of first comments on this:
> > > > > > > >> > > > > >>> > > >> > 1. I'm missing the problem statement in
> > the
> > > > > > overall
> > > > > > > >> > > > > >>> introduction. It
> > > > > > > >> > > > > >>> > > >> > immediately goes into proposal mode, I
> > > would
> > > > > > like to
> > > > > > > >> > first
> > > > > > > >> > > > > read
> > > > > > > >> > > > > >>> what
> > > > > > > >> > > > > >>> > > is
> > > > > > > >> > > > > >>> > > >> the
> > > > > > > >> > > > > >>> > > >> > actual problem, before diving into
> > > solutions.
> > > > > > > >> > > > > >>> > > >> > 2. "Each ETL job creates snapshots with
> > > > > > checkpoint
> > > > > > > >> info
> > > > > > > >> > on
> > > > > > > >> > > > > sink
> > > > > > > >> > > > > >>> > tables
> > > > > > > >> > > > > >>> > > >> in
> > > > > > > >> > > > > >>> > > >> > Table Store"  -> That reads like you're
> > > > > proposing
> > > > > > > >> that
> > > > > > > >> > > > > snapshots
> > > > > > > >> > > > > >>> > need
> > > > > > > >> > > > > >>> > > >> to be
> > > > > > > >> > > > > >>> > > >> > written to Table Store?
> > > > > > > >> > > > > >>> > > >> > 3. If you introduce a MetaService, it
> > > becomes
> > > > > the
> > > > > > > >> single
> > > > > > > >> > > > point
> > > > > > > >> > > > > >>> of
> > > > > > > >> > > > > >>> > > >> failure
> > > > > > > >> > > > > >>> > > >> > because it coordinates everything. But
> I
> > > > can't
> > > > > > find
> > > > > > > >> > > anything
> > > > > > > >> > > > > in
> > > > > > > >> > > > > >>> the
> > > > > > > >> > > > > >>> > > >> FLIP on
> > > > > > > >> > > > > >>> > > >> > making the MetaService high available
> or
> > > how
> > > > to
> > > > > > deal
> > > > > > > >> > with
> > > > > > > >> > > > > >>> failovers
> > > > > > > >> > > > > >>> > > >> there.
> > > > > > > >> > > > > >>> > > >> > 4. The FLIP states under Rejected
> > > > Alternatives
> > > > > > > >> > "Currently
> > > > > > > >> > > > > >>> watermark
> > > > > > > >> > > > > >>> > in
> > > > > > > >> > > > > >>> > > >> > Flink cannot align data." which is not
> > > true,
> > > > > > given
> > > > > > > >> that
> > > > > > > >> > > > there
> > > > > > > >> > > > > is
> > > > > > > >> > > > > >>> > > >> FLIP-182
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> > 5. Given the MetaService role, it feels
> > > like
> > > > > > this is
> > > > > > > >> > > > > >>> introducing a
> > > > > > > >> > > > > >>> > > tight
> > > > > > > >> > > > > >>> > > >> > dependency between Flink and the Table
> > > Store.
> > > > > How
> > > > > > > >> > > pluggable
> > > > > > > >> > > > is
> > > > > > > >> > > > > >>> this
> > > > > > > >> > > > > >>> > > >> > solution, given the changes that need
> to
> > be
> > > > > made
> > > > > > to
> > > > > > > >> > Flink
> > > > > > > >> > > in
> > > > > > > >> > > > > >>> order
> > > > > > > >> > > > > >>> > to
> > > > > > > >> > > > > >>> > > >> > support this?
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> > Best regards,
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> > Martijn
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon
> > FY <
> > > > > > > >> > > > zjur...@gmail.com>
> > > > > > > >> > > > > >>> > wrote:
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >> > > Hi devs:
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > > I'd like to start a discussion about
> > > > > FLIP-276:
> > > > > > Data
> > > > > > > >> > > > > >>> Consistency of
> > > > > > > >> > > > > >>> > > >> > > Streaming and Batch ETL in Flink and
> > > Table
> > > > > > > >> Store[1].
> > > > > > > >> > In
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> whole
> > > > > > > >> > > > > >>> > > data
> > > > > > > >> > > > > >>> > > >> > > stream processing, there are
> > consistency
> > > > > > problems
> > > > > > > >> such
> > > > > > > >> > > as
> > > > > > > >> > > > > how
> > > > > > > >> > > > > >>> to
> > > > > > > >> > > > > >>> > > >> manage
> > > > > > > >> > > > > >>> > > >> > the
> > > > > > > >> > > > > >>> > > >> > > dependencies of multiple jobs and
> > tables,
> > > > how
> > > > > > to
> > > > > > > >> > define
> > > > > > > >> > > > and
> > > > > > > >> > > > > >>> handle
> > > > > > > >> > > > > >>> > > E2E
> > > > > > > >> > > > > >>> > > >> > > delays, and how to ensure the data
> > > > > consistency
> > > > > > of
> > > > > > > >> > > queries
> > > > > > > >> > > > on
> > > > > > > >> > > > > >>> > flowing
> > > > > > > >> > > > > >>> > > >> > data?
> > > > > > > >> > > > > >>> > > >> > > This FLIP aims to support data
> > > consistency
> > > > > and
> > > > > > > >> answer
> > > > > > > >> > > > these
> > > > > > > >> > > > > >>> > > questions.
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > > I'v discussed the details of this
> FLIP
> > > with
> > > > > > > >> @Jingsong
> > > > > > > >> > > Lee
> > > > > > > >> > > > > and
> > > > > > > >> > > > > >>> > > >> @libenchao
> > > > > > > >> > > > > >>> > > >> > > offline several times. We hope to
> > support
> > > > > data
> > > > > > > >> > > consistency
> > > > > > > >> > > > > of
> > > > > > > >> > > > > >>> > > queries
> > > > > > > >> > > > > >>> > > >> on
> > > > > > > >> > > > > >>> > > >> > > tables, managing relationships
> between
> > > > Flink
> > > > > > jobs
> > > > > > > >> and
> > > > > > > >> > > > tables
> > > > > > > >> > > > > >>> and
> > > > > > > >> > > > > >>> > > >> revising
> > > > > > > >> > > > > >>> > > >> > > tables on streaming in Flink and
> Table
> > > > Store
> > > > > to
> > > > > > > >> > improve
> > > > > > > >> > > > the
> > > > > > > >> > > > > >>> whole
> > > > > > > >> > > > > >>> > > data
> > > > > > > >> > > > > >>> > > >> > > stream processing.
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > > Looking forward to your feedback.
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > > [1]
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> > > Best,
> > > > > > > >> > > > > >>> > > >> > > Shammon
> > > > > > > >> > > > > >>> > > >> > >
> > > > > > > >> > > > > >>> > > >> >
> > > > > > > >> > > > > >>> > > >>
> > > > > > > >> > > > > >>> > > >
> > > > > > > >> > > > > >>> > >
> > > > > > > >> > > > > >>> >
> > > > > > > >> > > > > >>>
> > > > > > > >> > > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to