Hi all

Thanks for all the feedback. If there are no more questions related to this
FLIP, we'd like to start the next work. After discussed with @JingsongLee, we
will create some FLIPs to provide detailed design for `MetaService` and
`Timestamp Barrier Mechanism`. Please feel free to comment in this thread
if you have any questions later. THX


Best,
Shammon


On Tue, Feb 7, 2023 at 11:53 AM Shammon FY <zjur...@gmail.com> wrote:

> Hi Piotr
>
> Thanks for your feedback.
>
> > - stateless operators, could completely ignore the issue and process the
> records normally, as they are doing right now
> > - stateful operators, should either:
> >     - if the business doesn't require ordering, they could process the
> records immediately
> >     - or buffer the records internally, like currently windowed/temporal
> operators are doing. Non windowed joins/aggregations could also work in a
> similar manner, like pre-aggregate data per each "epoch" (as demarcated by
> timestamp barriers).
> > - sinks implementation would have to match what external system support:
> >     - if the external system requires ordered writes (something like
> Kafka topic?), the sinks would have to buffer the writes until a "timestamp
> barrier" arrives
> >     - some sinks might support writing the data simultaneously to
> different "epochs". For example writing files bucketed by each epoch. Each
> bucket/epoch could be committed independently
>
> It sounds good to me and I totally agree with the proposal. We need to
> give users more choices to meet different business needs and storage
> support. I have updated the key points in the FLINK section[1]
>
> > Ok, I get it now. Indeed the terminology is confusing. Maybe we
> shouldn't say that the timestamp barrier has been committed, but that all
> records for given "epoch" have been processed/written, but not yet
> committed, so they can still be rolled-back?
>
> Nice! According to your suggestion, I have updated the FLIP for "epoch" as:
> 1. It is PROCESSED when records are written to a table
> 2. It is WRITTEN when the records are in a snapshot
> 3. It is PRECOMMIT when all tables are PROCESSED but not WRITTEN
> 4. It is COMMIT when all tables are WRITTEN
> Records not WRITTEN in a table will be rolled back due to job failure.
>
>
> > Why do we need to do that? Only to disallow this? To forbid writing from
> two jobs into a single table? If so, can we not push this responsibility
> down to the connector? Like sink/source operator coordinators should
> negotiate with respective external systems if the given read/write is
> allowed? So if there is a need for such meta service, Flink doesn't need to
> know about it?
>
> As I mentioned, MetaService will do some atomic operations to check and
> disallow some operations when jobs are submitted concurrently. But I'm
> sorry that I may not have explained the relationship between it and
> sink/source clearly. Generally speaking, the interactive between Flink and
> MetaService is as:
> 1. When the Client submits a flink job (streaming&batch), it interacts
> with MetaService through Catalog in CatalogManager, including getting the
> table version, registering the source/link table relationship for ETL.
> 2. When the flink job is running, JobManager collects data processing
> progress (Timestamp Barrier and Checkpoint) from source/link subtasks and
> reports them to MetaService.
> We can implement the above functions in a MetaService node. Of course, it
> can also be based on an atomic system (such as Zookeeper), with Client and
> JobManager doing their own work.
>
> Of course, source and sink also need some special work, such as reading
> timestamp barrier, collecting timestamp barrier, writing timestamp barrier,
> etc. But source/sink subtasks will not interact with MetaService directly.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-GlobalTimestampBarrierMechanism
>
>
> Best,
> Shammon
>
>
> On Tue, Feb 7, 2023 at 1:26 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>>  Hi,
>>
>> Thanks for the answers.
>>
>> >> Are you proposing that all of the inputs to stateful operators would
>> have to be sorted?
>> >>
>> > Records in stream don't need to be sorted, but it should be managed by
>> `Timestamp Barrier`, which means
>> > 1. Records belonging to a specific `Timestamp Barrier` are disordered.
>> > 2. Computations in different timestamp barriers are ordered. For the
>> above
>> > example, each stateful subtask can start computation for T2 only after
>> it
>> > finishes computation for T1. Subtasks are independent of each other.
>>
>> Wouldn't that add significant latency to processing the records? You would
>> basically introduce a batch processing concept in Flink?
>>
>> Have you considered some alternative solutions? Like for example letting
>> each operator/function/sink to take care of the data disorder? For
>> example:
>> - stateless operators, could completely ignore the issue and process the
>> records normally, as they are doing right now
>> - stateful operators, should either:
>>     - if the business doesn't require ordering, they could process the
>> records immediately
>>     - or buffer the records internally, like currently windowed/temporal
>> operators are doing. Non windowed joins/aggregations could also work in a
>> similar manner, like pre-aggregate data per each "epoch" (as demarcated by
>> timestamp barriers).
>> - sinks implementation would have to match what external system support:
>>     - if the external system requires ordered writes (something like Kafka
>> topic?), the sinks would have to buffer the writes until a "timestamp
>> barrier" arrives
>>     - some sinks might support writing the data simultaneously to
>> different
>> "epochs". For example writing files bucketed by each epoch. Each
>> bucket/epoch could be committed independently
>>
>> This way, latency would be behaving very much like it currently does in
>> Flink. For example if we have a following streaming SQL:
>>
>> INSERT INTO alerts_with_user SELECT * FROM alerts a, users u WHERE
>> a.user_id = u.id
>>
>> If there is some lag in the users table, alerts would be still generated.
>> Downstream applications could process and react to newly generated
>> `alerts_with_user`, while at the same time, we could have a consistent
>> view
>> across those three tables (users, alerts, alerts_with_user) if needed.
>>
>> > I call the data of the timetamp barrier "committed" if the data
>> > is written to a table according to the barrier without a snapshot, and
>> the
>> > data may be "rolled back" due to job failure. (sorry that the
>> "committed"
>> > here may not be appropriate)
>>
>> Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't
>> say that the timestamp barrier has been committed, but that all records
>> for
>> given "epoch" have been processed/written, but not yet committed, so they
>> can still be rolled-back?
>>
>> > For example, when multiple jobs start at the same time and register
>> themselves in `MetaService`,
>> > it needs to serially check whether they write to the same table
>>
>> Why do we need to do that? Only to disallow this? To forbid writing from
>> two jobs into a single table? If so, can we not push this responsibility
>> down to the connector? Like sink/source operator coordinators should
>> negotiate with respective external systems if the given read/write is
>> allowed? So if there is a need for such meta service, Flink doesn't need
>> to
>> know about it?
>>
>> Best,
>> Piotrek
>>
>> pon., 6 lut 2023 o 10:44 Shammon FY <zjur...@gmail.com> napisał(a):
>>
>> > Hi Piotr,
>> >
>> > Thanks for your feedback. In general, I think `Timesamp Barrier` is a
>> > special `Watermark` that all sources send watermarks with the same
>> > timestamp as `Timestamp Barrier` and aggregation operators will align
>> data
>> > by it. For example, all source subtasks are assigned two unified
>> watermarks
>> > T1 and T2, T1 < T2. All records with timestamp <= T1 will be aligned by
>> T1,
>> > and records with timestamp (T1, T2] will be aligned by T2.
>> >
>> > > Are you proposing that all of the inputs to stateful operators would
>> have
>> > to be sorted?
>> >
>> > Records in stream don't need to be sorted, but it should be managed by
>> > `Timestamp Barrier`, which means
>> > 1. Records belonging to a specific `Timestamp Barrier` are disordered.
>> > 2. Computations in different timestamp barriers are ordered. For the
>> above
>> > example, each stateful subtask can start computation for T2 only after
>> it
>> > finishes computation for T1. Subtasks are independent of each other.
>> >
>> > > Can you explain why do you need those 3 states? Why can committed
>> records
>> > be rolled back?
>> >
>> > Here I try to define the states of data in tables according to Timestamp
>> > Barrier and Snapshot, and I found that the 3 states are incomplete. For
>> > example, there is timestamp barrier T associated with checkpoint P, and
>> > sink operator will create snapshot S for P in tables. The data states in
>> > tables are as follows
>> > 1. Sink finishes writing data of timestamp barrier T to a table, but
>> > snapshot P is not created in the table and T is not finished in all
>> tables.
>> > 2. Sink finishes writing data of timestamp barrier T to a table, creates
>> > snapshot P according to checkpoint C, but the T1 is not finished in all
>> > tables.
>> > 3. Timestamp barrier T is finished in all tables, but snapshot P is not
>> > created in all tables.
>> > 4. Timestamp barrier T is finished in all tables, and snapshot P is
>> created
>> > in all tables too.
>> >
>> > Currently users can only get data from snapshots in Table Store and
>> other
>> > storages such as Iceberg. Users can get different "versioned" data from
>> > tables according to their data freshness and consistency requirements.
>> > I think we should support getting data with a timestamp barrier even
>> before
>> > the sink operator finishes creating the snapshot in the future. In this
>> > situation, I call the data of the timetamp barrier "committed" if the
>> data
>> > is written to a table according to the barrier without a snapshot, and
>> the
>> > data may be "rolled back" due to job failure. (sorry that the
>> "committed"
>> > here may not be appropriate)
>> >
>> > > I'm not sure if I follow. Generally speaking, why do we need
>> MetaService
>> > at all? Why can we only support writes to and reads from TableStore, and
>> > not any source/sink that implements some specific interface?
>> >
>> > It's a good point. I added a `MetaService` node in FLIP mainly to
>> perform
>> > some atomic operations. For example, when multiple jobs start at the
>> same
>> > time and register themselves in `MetaService`, it needs to serially
>> check
>> > whether they write to the same table. If we do not use an
>> > independent `MetaService Node`, we may need to introduce some other
>> "atomic
>> > dependency" such as ZooKeeper. But removing `MetaService Node` can make
>> the
>> > system more flexible, I think it's also valuable. Maybe we can carefully
>> > design MetaService API and support different deployment modes in the
>> next
>> > FLIP? WDYT?
>> >
>> >
>> > Best,
>> > Shammon
>> >
>> >
>> > On Fri, Feb 3, 2023 at 10:43 PM Piotr Nowojski <pnowoj...@apache.org>
>> > wrote:
>> >
>> > > Hi Shammon,
>> > >
>> > > Thanks for pushing the topic further. I'm not sure how this new
>> proposal
>> > is
>> > > supposed to be working? How should timestamp barrier interplay with
>> event
>> > > time and watermarks? Or is timestamp barrier supposed to completely
>> > replace
>> > > watermarks?
>> > >
>> > > > stateful and temporal operators should align them (records)
>> according
>> > to
>> > > their timestamp field.
>> > >
>> > > Are you proposing that all of the inputs to stateful operators would
>> have
>> > > to be sorted?
>> > >
>> > > > There're three states in a table for specific transaction :
>> PreCommit,
>> > > Commit and Snapshot
>> > >
>> > > Can you explain why do you need those 3 states? Why can committed
>> records
>> > > be rolled back?
>> > >
>> > > >> 10. Have you considered proposing a general consistency mechanism
>> > > instead
>> > > >> of restricting it to TableStore+ETL graphs? For example, it seems
>> to
>> > me
>> > > to
>> > > >> be possible and valuable to define instead the contract that
>> > > sources/sinks
>> > > >> need to implement in order to participate in globally consistent
>> > > snapshots.
>> > > >
>> > > > A general consistency mechanism is cool! In my mind, the overall
>> > > > `consistency system` consists of three components: Streaming & Batch
>> > ETL,
>> > > > Streaming & Batch Storage and MetaService. MetaService is decoupled
>> > from
>> > > > Storage Layer, but it stores consistency information in persistent
>> > > storage.
>> > > > It can be started as an independent node or a component in a large
>> > Flink
>> > > > cluster. In the FLIP we use TableStore as the Storage Layer. As you
>> > > > mentioned, we plan to implement specific source and sink on the
>> > > TableStore
>> > > > in the first phase, and may consider other storage in the future
>> > >
>> > > I'm not sure if I follow. Generally speaking, why do we need
>> MetaService
>> > at
>> > > all? Why can we only support writes to and reads from TableStore, and
>> not
>> > > any source/sink that implements some specific interface?
>> > >
>> > > Best,
>> > > Piotrek
>> > >
>> > > niedz., 29 sty 2023 o 12:11 Shammon FY <zjur...@gmail.com>
>> napisał(a):
>> > >
>> > > > Hi @Vicky
>> > > >
>> > > > Thank you for your suggestions about consistency and they're very
>> nice
>> > to
>> > > > me!
>> > > >
>> > > > I have updated the examples and consistency types[1] in FLIP. In
>> > > general, I
>> > > > regard the Timestamp Barrier processing as a transaction and divide
>> the
>> > > > data consistency supported in FLIP into three types
>> > > >
>> > > > 1. Read Uncommitted: Read data from tables even when a transaction
>> is
>> > not
>> > > > committed.
>> > > > 2. Read Committed: Read data from tables according to the committed
>> > > > transaction.
>> > > > 3. Repeatable Read: Read data from tables according to the committed
>> > > > transaction in snapshots.
>> > > >
>> > > > You can get more information from the updated FLIP. Looking forward
>> to
>> > > your
>> > > > feedback, THX
>> > > >
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-DataConsistencyType
>> > > >
>> > > > Best,
>> > > > Shammon
>> > > >
>> > > >
>> > > > On Sat, Jan 28, 2023 at 4:42 AM Vasiliki Papavasileiou
>> > > > <vpapavasile...@confluent.io.invalid> wrote:
>> > > >
>> > > > > Hi Shammon,
>> > > > >
>> > > > >
>> > > > > Thank you for opening this FLIP which is very interesting and
>> such an
>> > > > > important feature to add to the Flink ecosystem. I have a couple
>> of
>> > > > > suggestions/questions:
>> > > > >
>> > > > >
>> > > > >
>> > > > >    -
>> > > > >
>> > > > >    Consistency is a very broad term with different meanings. There
>> > are
>> > > > many
>> > > > >    variations between the two extremes of weak and strong
>> consistency
>> > > > that
>> > > > >    tradeoff latency for consistency.
>> https://jepsen.io/consistency
>> > It
>> > > > > would
>> > > > >    be great if we could devise an approach that allows the user to
>> > > choose
>> > > > >    which consistency level they want to use for a query.
>> > > > >
>> > > > >
>> > > > > Example: In your figure where you have a DAG, assume a user
>> queries
>> > > only
>> > > > > Table1 for a specific key. Then, a failure happens and the table
>> > > restores
>> > > > > from a checkpoint. The user issues the same query, looking up the
>> > same
>> > > > key.
>> > > > > What value does she see? With monotonic-reads, the system
>> guarantees
>> > > that
>> > > > > she will only see the same or newer values but not older, hence
>> will
>> > > not
>> > > > > experience time-travel. This is a very useful property for a
>> system
>> > to
>> > > > have
>> > > > > albeit it is at the weaker-end of consistency guarantees. But it
>> is a
>> > > > good
>> > > > > stepping stone.
>> > > > >
>> > > > >
>> > > > > Another example, assume the user queries Table1 for key K1 and
>> gets
>> > the
>> > > > > value V11. Then, she queries Table2 that is derived from Table1
>> for
>> > the
>> > > > > same key, K1, that returns value V21. What is the relationship
>> > between
>> > > > V21
>> > > > > and V11? Is V21 derived from V11 or can it be an older value V1
>> (the
>> > > > > previous value of K1)? What if value V21 is not yet in table
>> Table2?
>> > > What
>> > > > > should she see when she queries Table1? Should she see the key
>> V11 or
>> > > > not?
>> > > > > Should the requirement be that a record is not visible in any of
>> the
>> > > > tables
>> > > > > in a DAG unless it is available in all of them?
>> > > > >
>> > > > >
>> > > > >
>> > > > >    -
>> > > > >
>> > > > >    It would we good to have a set of examples with consistency
>> > > anomalies
>> > > > >    that can happen (like the examples above) and what consistency
>> > > levels
>> > > > we
>> > > > >    want the system to offer to prevent them.
>> > > > >    Moreover, for each such example, it would be good to have a
>> > > > description
>> > > > >    of how the approach (Timestamp Barriers) will work in practice
>> to
>> > > > > prevent
>> > > > >    such anomalies.
>> > > > >
>> > > > >
>> > > > > Thank you,
>> > > > > Vicky
>> > > > >
>> > > > >
>> > > > > On Fri, Jan 27, 2023 at 4:46 PM John Roesler <vvcep...@apache.org
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Hello Shammon and all,
>> > > > > >
>> > > > > > Thanks for this FLIP! I've been working toward this kind of
>> global
>> > > > > > consistency across large scale data infrastructure for a long
>> time,
>> > > and
>> > > > > > it's fantastic to see a high-profile effort like this come into
>> > play.
>> > > > > >
>> > > > > > I have been lurking in the discussion for a while and delaying
>> my
>> > > > > response
>> > > > > > while I collected my thoughts. However, I've realized at some
>> > point,
>> > > > > > delaying more is not as useful as just asking a few questions,
>> so
>> > I'm
>> > > > > sorry
>> > > > > > if some of this seems beside the point. I'll number these to not
>> > > > collide
>> > > > > > with prior discussion points:
>> > > > > >
>> > > > > > 10. Have you considered proposing a general consistency
>> mechanism
>> > > > instead
>> > > > > > of restricting it to TableStore+ETL graphs? For example, it
>> seems
>> > to
>> > > me
>> > > > > to
>> > > > > > be possible and valuable to define instead the contract that
>> > > > > sources/sinks
>> > > > > > need to implement in order to participate in globally consistent
>> > > > > snapshots.
>> > > > > >
>> > > > > > 11. It seems like this design is assuming that the "ETL
>> Topology"
>> > > under
>> > > > > > the envelope of the consistency model is a well-ordered set of
>> > jobs,
>> > > > but
>> > > > > I
>> > > > > > suspect this is not the case for many organizations. It may be
>> > > > > > aspirational, but I think the gold-standard here would be to
>> > provide
>> > > an
>> > > > > > entire organization with a consistency model spanning a loosely
>> > > coupled
>> > > > > > ecosystem of jobs and data flows spanning teams and systems that
>> > are
>> > > > > > organizationally far apart.
>> > > > > >
>> > > > > > I realize that may be kind of abstract. Here's some examples of
>> > > what's
>> > > > on
>> > > > > > my mind here:
>> > > > > >
>> > > > > > 11a. Engineering may operate one Flink cluster, and some other
>> org,
>> > > > like
>> > > > > > Finance may operate another. In most cases, those are separate
>> > > domains
>> > > > > that
>> > > > > > don't typically get mixed together in jobs, but some people,
>> like
>> > the
>> > > > > CEO,
>> > > > > > would still benefit from being able to make a consistent query
>> that
>> > > > spans
>> > > > > > arbitrary contexts within the business. How well can a feature
>> like
>> > > > this
>> > > > > > transcend a single Flink infrastructure? Does it make sense to
>> > > > consider a
>> > > > > > model in which snapshots from different domains can be
>> composable?
>> > > > > >
>> > > > > > 11b. Some groups may have a relatively stable set of
>> long-running
>> > > jobs,
>> > > > > > while others (like data science, skunkworks, etc) may adopt a
>> more
>> > > > > > experimental, iterative approach with lots of jobs entering and
>> > > exiting
>> > > > > the
>> > > > > > ecosystem over time. It's still valuable to have them
>> participate
>> > in
>> > > > the
>> > > > > > consistency model, but it seems like the consistency system will
>> > have
>> > > > to
>> > > > > > deal with more chaos than I see in the design. For example, how
>> can
>> > > > this
>> > > > > > feature tolerate things like zombie jobs (which are registered
>> in
>> > the
>> > > > > > system, but fail to check in for a long time, and then come back
>> > > > later).
>> > > > > >
>> > > > > > 12. I didn't see any statements about patterns like cycles in
>> the
>> > ETL
>> > > > > > Topology. I'm aware that there are fundamental constraints on
>> how
>> > > well
>> > > > > > cyclic topologies can be supported by a distributed snapshot
>> > > algorithm.
>> > > > > > However, there are a range of approaches/compromises that we can
>> > > apply
>> > > > to
>> > > > > > cyclic topologies. At the very least, we can state that we will
>> > > detect
>> > > > > > cycles and produce a warning, etc.
>> > > > > >
>> > > > > > 13. I'm not sure how heavily you're waiting the query syntax
>> part
>> > of
>> > > > the
>> > > > > > proposal, so please feel free to defer this point. It looked to
>> me
>> > > like
>> > > > > the
>> > > > > > proposal assumes people want to query either the latest
>> consistent
>> > > > > snapshot
>> > > > > > or the latest inconsistent state. However, it seems like
>> there's a
>> > > > > > significant opportunity to maintain a manifest of historical
>> > > snapshots
>> > > > > and
>> > > > > > allow people to query as of old points in time. That can be
>> > valuable
>> > > > for
>> > > > > > individuals answering data questions, building products, and
>> > > crucially
>> > > > > > supporting auditability use cases. To that latter point, it
>> seems
>> > > nice
>> > > > to
>> > > > > > provide not only a mechanism to query arbitrary snapshots, but
>> also
>> > > to
>> > > > > > define a TTL/GC model that allows users to keep hourly snapshots
>> > for
>> > > N
>> > > > > > hours, daily snapshots for N days, weekly snapshots for N weeks,
>> > and
>> > > > the
>> > > > > > same for monthly, quarterly, and yearly snapshots.
>> > > > > >
>> > > > > > Ok, that's all I have for now :) I'd also like to understand
>> some
>> > > > > > lower-level details, but I wanted to get these high-level
>> questions
>> > > off
>> > > > > my
>> > > > > > chest.
>> > > > > >
>> > > > > > Thanks again for the FLIP!
>> > > > > > -John
>> > > > > >
>> > > > > > On 2023/01/13 11:43:28 Shammon FY wrote:
>> > > > > > > Hi Piotr,
>> > > > > > >
>> > > > > > > I discussed with @jinsong lee about `Timestamp Barrier` and
>> > > `Aligned
>> > > > > > > Checkpoint` for data consistency in FLIP, we think there are
>> many
>> > > > > defects
>> > > > > > > indeed in using `Aligned Checkpoint` to support data
>> consistency
>> > as
>> > > > you
>> > > > > > > mentioned.
>> > > > > > >
>> > > > > > > According to our historical discussion, I think we have
>> reached
>> > an
>> > > > > > > agreement on an important point: we finally need `Timestamp
>> > Barrier
>> > > > > > > Mechanism` to support data consistency. But according to our
>> > > > (@jinsong
>> > > > > > lee
>> > > > > > > and I) opinions, the total design and implementation based on
>> > > > > 'Timestamp
>> > > > > > > Barrier' will be too complex, and it's also too big in one
>> FLIP.
>> > > > > > >
>> > > > > > > So we‘d like to use FLIP-276[1] as an overview design of data
>> > > > > consistency
>> > > > > > > in Flink Streaming and Batch ETL based on `Timestamp Barrier`.
>> > > > @jinsong
>> > > > > > and
>> > > > > > > I hope that we can reach an agreement on the overall design in
>> > > > > FLINK-276
>> > > > > > > first, and then on the basic of FLIP-276 we can create other
>> > FLIPs
>> > > > with
>> > > > > > > detailed design according to modules and drive them. Finally,
>> we
>> > > can
>> > > > > > > support data consistency based on Timestamp in Flink.
>> > > > > > >
>> > > > > > > I have updated FLIP-276, deleted the Checkpoint section, and
>> > added
>> > > > the
>> > > > > > > overall design of  `Timestamp Barrier`. Here I briefly
>> describe
>> > the
>> > > > > > modules
>> > > > > > > of `Timestamp Barrier` as follows
>> > > > > > > 1. Generation: JobManager must coordinate all source subtasks
>> and
>> > > > > > generate
>> > > > > > > a unified timestamp barrier from System Time or Event Time for
>> > them
>> > > > > > > 2. Checkpoint: Store <checkpoint, timestamp barrier> when the
>> > > > timestamp
>> > > > > > > barrier is generated, so that the job can recover the same
>> > > timestamp
>> > > > > > > barrier for the uncompleted checkpoint.
>> > > > > > > 3. Replay data: Store <timestamp barrier, offset> for source
>> when
>> > > it
>> > > > > > > broadcasts timestamp barrier, so that the source can replay
>> the
>> > > same
>> > > > > data
>> > > > > > > according to the same timestamp barrier.
>> > > > > > > 4. Align data: Align data for stateful operator(aggregation,
>> join
>> > > and
>> > > > > > etc.)
>> > > > > > > and temporal operator(window)
>> > > > > > > 5. Computation: Operator computation for a specific timestamp
>> > > barrier
>> > > > > > based
>> > > > > > > on the results of a previous timestamp barrier.
>> > > > > > > 6. Output: Operator outputs or commits results when it
>> collects
>> > all
>> > > > the
>> > > > > > > timestamp barriers, including operators with data buffer or
>> async
>> > > > > > > operations.
>> > > > > > >
>> > > > > > > I also list the main work in Flink and Table Store in
>> FLIP-276.
>> > > > Please
>> > > > > > help
>> > > > > > > to review the FLIP when you're free and feel free to give any
>> > > > comments.
>> > > > > > >
>> > > > > > > Looking forward for your feedback, THX
>> > > > > > >
>> > > > > > > [1]
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Shammon
>> > > > > > >
>> > > > > > >
>> > > > > > > On Tue, Dec 20, 2022 at 10:01 AM Shammon FY <
>> zjur...@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Piotr,
>> > > > > > > >
>> > > > > > > > Thanks for your syncing. I will update the FLIP later and
>> keep
>> > > this
>> > > > > > > > discussion open. Looking forward to your feedback, thanks
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Shammon
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski <
>> > > > > pnowoj...@apache.org>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > >> Hi Shammon,
>> > > > > > > >>
>> > > > > > > >> I've tried to sync with Timo, David Moravek and Dawid
>> > Wysakowicz
>> > > > > about
>> > > > > > > >> this
>> > > > > > > >> subject. We have only briefly chatted and exchanged some
>> > > > > > thoughts/ideas,
>> > > > > > > >> but unfortunately we were not able to finish the
>> discussions
>> > > > before
>> > > > > > the
>> > > > > > > >> holiday season/vacations. Can we get back to this topic in
>> > > > January?
>> > > > > > > >>
>> > > > > > > >> Best,
>> > > > > > > >> Piotrek
>> > > > > > > >>
>> > > > > > > >> pt., 16 gru 2022 o 10:53 Shammon FY <zjur...@gmail.com>
>> > > > napisał(a):
>> > > > > > > >>
>> > > > > > > >> > Hi Piotr,
>> > > > > > > >> >
>> > > > > > > >> > I found there may be several points in our discussion, it
>> > will
>> > > > > cause
>> > > > > > > >> > misunderstanding between us when we focus on different
>> one.
>> > I
>> > > > list
>> > > > > > each
>> > > > > > > >> > point in our discussion as follows
>> > > > > > > >> >
>> > > > > > > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to
>> > > > guarantee
>> > > > > > data
>> > > > > > > >> > consistency in the current Flink implementation, and
>> > > "Watermark"
>> > > > > and
>> > > > > > > >> > "Aligned Checkpoint cannot do that?
>> > > > > > > >> > My answer is "Yes", the "Aligned Checkpoint" is the only
>> one
>> > > due
>> > > > > to
>> > > > > > its
>> > > > > > > >> > "Align Data" ability, we can do it in the first stage.
>> > > > > > > >> >
>> > > > > > > >> > > Point2: Can the combination of "Checkpoint Barrier" and
>> > > > > > "Watermark"
>> > > > > > > >> > support the complete consistency semantics based on
>> > > "Timestamp"
>> > > > in
>> > > > > > the
>> > > > > > > >> > current Flink implementation?
>> > > > > > > >> > My answer is "No", we need a new "Timestamp Barrier"
>> > mechanism
>> > > > to
>> > > > > do
>> > > > > > > >> that
>> > > > > > > >> > which may be upgraded from current "Watermark" or a new
>> > > > mechanism,
>> > > > > > we
>> > > > > > > >> can
>> > > > > > > >> > do it in the next second or third stage.
>> > > > > > > >> >
>> > > > > > > >> > > Point3: Are the "Checkpoint" and the new "Timestamp
>> > Barrier"
>> > > > > > > >> completely
>> > > > > > > >> > independent? The "Checkpoint" whatever "Aligned" or
>> > > "Unaligned"
>> > > > or
>> > > > > > "Task
>> > > > > > > >> > Local" supports the "Exactly-Once" between ETLs, and the
>> > > > > "Timestamp
>> > > > > > > >> > Barrier" mechanism guarantees data consistency between
>> > tables
>> > > > > > according
>> > > > > > > >> to
>> > > > > > > >> > timestamp for queries.
>> > > > > > > >> > My answer is "Yes", I totally agree with you. Let
>> > "Checkpoint"
>> > > > be
>> > > > > > > >> > responsible for fault tolerance and "Timestamp Barrier"
>> for
>> > > > > > consistency
>> > > > > > > >> > independently.
>> > > > > > > >> >
>> > > > > > > >> > @Piotr, What do you think? If I am missing or
>> > misunderstanding
>> > > > > > anything,
>> > > > > > > >> > please correct me, thanks
>> > > > > > > >> >
>> > > > > > > >> > Best,
>> > > > > > > >> > Shammon
>> > > > > > > >> >
>> > > > > > > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski <
>> > > > > > pnowoj...@apache.org>
>> > > > > > > >> > wrote:
>> > > > > > > >> >
>> > > > > > > >> > > Hi Shammon,
>> > > > > > > >> > >
>> > > > > > > >> > > > I don't think we can combine watermarks and
>> checkpoint
>> > > > > barriers
>> > > > > > > >> > together
>> > > > > > > >> > > to
>> > > > > > > >> > > > guarantee data consistency. There will be a
>> "Timestamp
>> > > > > Barrier"
>> > > > > > in
>> > > > > > > >> our
>> > > > > > > >> > > > system to "commit data", "single etl failover", "low
>> > > latency
>> > > > > > between
>> > > > > > > >> > > ETLs"
>> > > > > > > >> > > > and "strong data consistency with completed
>> semantics"
>> > in
>> > > > the
>> > > > > > end.
>> > > > > > > >> > >
>> > > > > > > >> > > Why do you think so? I've described to you above an
>> > > > alternative
>> > > > > > where
>> > > > > > > >> we
>> > > > > > > >> > > could be using watermarks for data consistency,
>> regardless
>> > > of
>> > > > > what
>> > > > > > > >> > > checkpointing/fault tolerance mechanism Flink would be
>> > > using.
>> > > > > Can
>> > > > > > you
>> > > > > > > >> > > explain what's wrong with that approach? Let me
>> rephrase
>> > it:
>> > > > > > > >> > >
>> > > > > > > >> > > 1. There is an independent mechanism that provides
>> > > > exactly-once
>> > > > > > > >> > guarantees,
>> > > > > > > >> > > committing records/watermarks/events and taking care of
>> > the
>> > > > > > failover.
>> > > > > > > >> It
>> > > > > > > >> > > might be aligned, unaligned or task local
>> checkpointing -
>> > > this
>> > > > > > doesn't
>> > > > > > > >> > > matter. Let's just assume we have such a mechanism.
>> > > > > > > >> > > 2. There is a watermarking mechanism (it can be some
>> kind
>> > of
>> > > > > > system
>> > > > > > > >> > > versioning re-using watermarks code path if a user
>> didn't
>> > > > > > configure
>> > > > > > > >> > > watermarks), that takes care of the data consistency.
>> > > > > > > >> > >
>> > > > > > > >> > > Because watermarks from 2. are also subject to the
>> > > > exactly-once
>> > > > > > > >> > guarantees
>> > > > > > > >> > > from the 1., once they are committed downstream systems
>> > > (Flink
>> > > > > > jobs or
>> > > > > > > >> > > other 3rd party systems) could just easily work with
>> the
>> > > > > committed
>> > > > > > > >> > > watermarks to provide consistent view/snapshot of the
>> > > tables.
>> > > > > Any
>> > > > > > > >> > > downstream system could always check what are the
>> > committed
>> > > > > > > >> watermarks,
>> > > > > > > >> > > select the watermark value (for example min across all
>> > used
>> > > > > > tables),
>> > > > > > > >> and
>> > > > > > > >> > > ask every table: please give me all of the data up
>> until
>> > the
>> > > > > > selected
>> > > > > > > >> > > watermark. Or give me all tables in the version for the
>> > > > selected
>> > > > > > > >> > watermark.
>> > > > > > > >> > >
>> > > > > > > >> > > Am I missing something? To me it seems like this way we
>> > can
>> > > > > fully
>> > > > > > > >> > decouple
>> > > > > > > >> > > the fault tolerance mechanism from the subject of the
>> data
>> > > > > > > >> consistency.
>> > > > > > > >> > >
>> > > > > > > >> > > Best,
>> > > > > > > >> > > Piotrek
>> > > > > > > >> > >
>> > > > > > > >> > > czw., 15 gru 2022 o 13:01 Shammon FY <
>> zjur...@gmail.com>
>> > > > > > napisał(a):
>> > > > > > > >> > >
>> > > > > > > >> > > > Hi Piotr,
>> > > > > > > >> > > >
>> > > > > > > >> > > > It's kind of amazing about the image, it's a simple
>> > > example
>> > > > > and
>> > > > > > I
>> > > > > > > >> have
>> > > > > > > >> > to
>> > > > > > > >> > > > put it in a document
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink
>> > > > > > > >> > > > :)
>> > > > > > > >> > > >
>> > > > > > > >> > > > > Does it have to be combining watermarks and
>> checkpoint
>> > > > > > barriers
>> > > > > > > >> > > together?
>> > > > > > > >> > > >
>> > > > > > > >> > > > It's an interesting question. As we discussed above,
>> > what
>> > > we
>> > > > > > need
>> > > > > > > >> from
>> > > > > > > >> > > > "Checkpoint" is the "Align Data Ability", and from
>> > > > "Watermark"
>> > > > > > is
>> > > > > > > >> the
>> > > > > > > >> > > > "Consistency Semantics",
>> > > > > > > >> > > >
>> > > > > > > >> > > > 1) Only "Align Data" can reach data consistency when
>> > > > > performing
>> > > > > > > >> queries
>> > > > > > > >> > > on
>> > > > > > > >> > > > upstream and downstream tables. I gave an example of
>> > > "Global
>> > > > > > Count
>> > > > > > > >> > > Tables"
>> > > > > > > >> > > > in our previous discussion. We need a "Align Event"
>> in
>> > the
>> > > > > > streaming
>> > > > > > > >> > > > processing, it's the most basic.
>> > > > > > > >> > > >
>> > > > > > > >> > > > 2) Only "Timestamp" can provide complete consistency
>> > > > > semantics.
>> > > > > > You
>> > > > > > > >> > gave
>> > > > > > > >> > > > some good examples about "Window" and ect operators.
>> > > > > > > >> > > >
>> > > > > > > >> > > > I don't think we can combine watermarks and
>> checkpoint
>> > > > > barriers
>> > > > > > > >> > together
>> > > > > > > >> > > to
>> > > > > > > >> > > > guarantee data consistency. There will be a
>> "Timestamp
>> > > > > Barrier"
>> > > > > > in
>> > > > > > > >> our
>> > > > > > > >> > > > system to "commit data", "single etl failover", "low
>> > > latency
>> > > > > > between
>> > > > > > > >> > > ETLs"
>> > > > > > > >> > > > and "strong data consistency with completed
>> semantics"
>> > in
>> > > > the
>> > > > > > end.
>> > > > > > > >> > > >
>> > > > > > > >> > > > At the beginning I think we can do the simplest thing
>> > > first:
>> > > > > > > >> guarantee
>> > > > > > > >> > > the
>> > > > > > > >> > > > basic data consistency with a "Barrier Mechanism". In
>> > the
>> > > > > > current
>> > > > > > > >> Flink
>> > > > > > > >> > > > there's "Aligned Checkpoint" only, that's why we
>> choose
>> > > > > > > >> "Checkpoint" in
>> > > > > > > >> > > our
>> > > > > > > >> > > > FLIP.
>> > > > > > > >> > > >
>> > > > > > > >> > > > > I don't see an actual connection in the the
>> > > implementation
>> > > > > > steps
>> > > > > > > >> > > between
>> > > > > > > >> > > > the checkpoint barriers approach and the
>> watermark-like
>> > > > > approach
>> > > > > > > >> > > >
>> > > > > > > >> > > > As I mentioned above, we choose "Checkpoint" to
>> > guarantee
>> > > > the
>> > > > > > basic
>> > > > > > > >> > data
>> > > > > > > >> > > > consistency. But as we discussed, the most ideal
>> > solution
>> > > is
>> > > > > > > >> "Timestamp
>> > > > > > > >> > > > Barrier". After the first stage is completed based on
>> > the
>> > > > > > > >> "Checkpoint",
>> > > > > > > >> > > we
>> > > > > > > >> > > > need to evolve it to our ideal solution "Timestamp
>> > > Barrier"
>> > > > > > > >> > > (watermark-like
>> > > > > > > >> > > > approach) in the next second or third stage. This
>> does
>> > not
>> > > > > mean
>> > > > > > > >> > upgrading
>> > > > > > > >> > > > "Checkpoint Mechanism" in Flink. It means that after
>> we
>> > > > > > implement a
>> > > > > > > >> new
>> > > > > > > >> > > > "Timestamp Barrier" or upgrade "Watermark" to support
>> > it,
>> > > we
>> > > > > can
>> > > > > > > >> use it
>> > > > > > > >> > > > instead of the current "Checkpoint Mechanism"
>> directly
>> > in
>> > > > our
>> > > > > > > >> > > "MetaService"
>> > > > > > > >> > > > and "Table Store".
>> > > > > > > >> > > >
>> > > > > > > >> > > > In the discussion between @David and me, I summarized
>> > the
>> > > > work
>> > > > > > of
>> > > > > > > >> > > upgrading
>> > > > > > > >> > > > "Watermark" to support "Timestamp Barrier". It looks
>> > like
>> > > a
>> > > > > big
>> > > > > > job
>> > > > > > > >> and
>> > > > > > > >> > > you
>> > > > > > > >> > > > can find the details in our discussion. I think we
>> don't
>> > > > need
>> > > > > > to do
>> > > > > > > >> > that
>> > > > > > > >> > > in
>> > > > > > > >> > > > our first stage.
>> > > > > > > >> > > >
>> > > > > > > >> > > > Also in that discussion (my reply to @David) too, I
>> > > briefly
>> > > > > > > >> summarized
>> > > > > > > >> > > the
>> > > > > > > >> > > > work that needs to be done to use the new mechanism
>> > > > (Timestamp
>> > > > > > > >> Barrier)
>> > > > > > > >> > > > after we implement the basic function on
>> "Checkpoint".
>> > It
>> > > > > seems
>> > > > > > that
>> > > > > > > >> > the
>> > > > > > > >> > > > work is not too big on my side, and it is feasible on
>> > the
>> > > > > whole.
>> > > > > > > >> > > >
>> > > > > > > >> > > > Based on the above points, I think we can support
>> basic
>> > > data
>> > > > > > > >> > consistency
>> > > > > > > >> > > on
>> > > > > > > >> > > > "Checkpoint" in the first stage which is described in
>> > > FLIP,
>> > > > > and
>> > > > > > > >> > continue
>> > > > > > > >> > > to
>> > > > > > > >> > > > evolve it to "Timestamp Barrier" to support low
>> latency
>> > > > > between
>> > > > > > ETLs
>> > > > > > > >> > and
>> > > > > > > >> > > > completed semantics in the second or third stage
>> later.
>> > > > What
>> > > > > > do you
>> > > > > > > >> > > think?
>> > > > > > > >> > > >
>> > > > > > > >> > > > Best,
>> > > > > > > >> > > > Shammon
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski <
>> > > > > > > >> pnowoj...@apache.org>
>> > > > > > > >> > > > wrote:
>> > > > > > > >> > > >
>> > > > > > > >> > > > > Hi Shammon,
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > > The following is a simple example. Data is
>> > transferred
>> > > > > > between
>> > > > > > > >> > ETL1,
>> > > > > > > >> > > > ETL2
>> > > > > > > >> > > > > and ETL3 in Intermediate Table by Timestamp.
>> > > > > > > >> > > > > > [image: simple_example.jpg]
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > This time it's your image that doesn't want to
>> load :)
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > >  Timestamp Barrier
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Does it have to be combining watermarks and
>> checkpoint
>> > > > > > barriers
>> > > > > > > >> > > together?
>> > > > > > > >> > > > > Can we not achieve the same result with two
>> > independent
>> > > > > > processes
>> > > > > > > >> > > > > checkpointing (regardless if this is a global
>> > > > > > aligned/unaligned
>> > > > > > > >> > > > checkpoint,
>> > > > > > > >> > > > > or a task local checkpoint) plus watermarking?
>> > > > Checkpointing
>> > > > > > would
>> > > > > > > >> > > > provide
>> > > > > > > >> > > > > exactly-once guarantees, and actually committing
>> the
>> > > > > results,
>> > > > > > and
>> > > > > > > >> it
>> > > > > > > >> > > > would
>> > > > > > > >> > > > > be actually committing the last emitted watermark?
>> > From
>> > > > the
>> > > > > > > >> > perspective
>> > > > > > > >> > > > of
>> > > > > > > >> > > > > the sink/table, it shouldn't really matter how the
>> > > > > > exactly-once is
>> > > > > > > >> > > > > achieved, and whether the job has performed an
>> > unaligned
>> > > > > > > >> checkpoint
>> > > > > > > >> > or
>> > > > > > > >> > > > > something completely different. It seems to me that
>> > the
>> > > > > > sink/table
>> > > > > > > >> > > > > could/should be able to understand/work with only
>> the
>> > > > basic
>> > > > > > > >> > > information:
>> > > > > > > >> > > > > here are records and watermarks (with at that
>> point of
>> > > > time
>> > > > > > > >> already
>> > > > > > > >> > > fixed
>> > > > > > > >> > > > > order), they are committed and will never change.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > > However, from the perspective of implementation
>> > > > > complexity,
>> > > > > > I
>> > > > > > > >> > > > personally
>> > > > > > > >> > > > > think using Checkpoint in the first phase makes
>> sense,
>> > > > what
>> > > > > > do you
>> > > > > > > >> > > think?
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Maybe I'm missing something, but I don't see an
>> actual
>> > > > > > connection
>> > > > > > > >> in
>> > > > > > > >> > > the
>> > > > > > > >> > > > > implementation steps between the checkpoint
>> barriers
>> > > > > approach
>> > > > > > and
>> > > > > > > >> the
>> > > > > > > >> > > > > watermark-like approach. They seem to me (from the
>> > > > > > perspective of
>> > > > > > > >> > Flink
>> > > > > > > >> > > > > runtime at least) like two completely different
>> > > > mechanisms.
>> > > > > > Not
>> > > > > > > >> one
>> > > > > > > >> > > > leading
>> > > > > > > >> > > > > to the other.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Best,
>> > > > > > > >> > > > > Piotrek
>> > > > > > > >> > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > śr., 14 gru 2022 o 15:19 Shammon FY <
>> > zjur...@gmail.com>
>> > > > > > > >> napisał(a):
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > > Hi Piotr,
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Thanks for your valuable input which makes me
>> > consider
>> > > > the
>> > > > > > core
>> > > > > > > >> > point
>> > > > > > > >> > > > of
>> > > > > > > >> > > > > > data consistency in deep. I'd like to define the
>> > data
>> > > > > > > >> consistency
>> > > > > > > >> > on
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > whole streaming & batch processing as follows
>> and I
>> > > hope
>> > > > > > that we
>> > > > > > > >> > can
>> > > > > > > >> > > > have
>> > > > > > > >> > > > > > an agreement on it:
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > BOutput = Fn(BInput), BInput is a bounded input
>> > which
>> > > is
>> > > > > > > >> splitted
>> > > > > > > >> > > from
>> > > > > > > >> > > > > > unbounded streaming, Fn is the computation of a
>> node
>> > > or
>> > > > > ETL,
>> > > > > > > >> > BOutput
>> > > > > > > >> > > is
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > bounded output of BInput. All the data in BInput
>> and
>> > > > > > BOutput are
>> > > > > > > >> > > > > unordered,
>> > > > > > > >> > > > > > and BInput and BOutput are data consistent.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > The key points above include 1) the segment
>> > semantics
>> > > of
>> > > > > > > >> BInput; 2)
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > computation semantics of Fn
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 1. The segment semantics of BInput
>> > > > > > > >> > > > > > a) Transactionality of data. It is necessary to
>> > ensure
>> > > > the
>> > > > > > > >> semantic
>> > > > > > > >> > > > > > transaction of the bounded data set when it is
>> > > splitted
>> > > > > > from the
>> > > > > > > >> > > > > unbounded
>> > > > > > > >> > > > > > streaming. For example, we cannot split multiple
>> > > records
>> > > > > in
>> > > > > > one
>> > > > > > > >> > > > > transaction
>> > > > > > > >> > > > > > to different bounded data sets.
>> > > > > > > >> > > > > > b) Timeliness of data. Some data is related with
>> > time,
>> > > > > such
>> > > > > > as
>> > > > > > > >> > > boundary
>> > > > > > > >> > > > > > data for a window. It is necessary to consider
>> > whether
>> > > > the
>> > > > > > > >> bounded
>> > > > > > > >> > > data
>> > > > > > > >> > > > > set
>> > > > > > > >> > > > > > needs to include a watermark which can trigger
>> the
>> > > > window
>> > > > > > > >> result.
>> > > > > > > >> > > > > > c) Constraints of data. The Timestamp Barrier
>> should
>> > > > > perform
>> > > > > > > >> some
>> > > > > > > >> > > > > specific
>> > > > > > > >> > > > > > operations after computation in operators, for
>> > > example,
>> > > > > > force
>> > > > > > > >> flush
>> > > > > > > >> > > > data.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Checkpoint Barrier misses all the semantics
>> above,
>> > and
>> > > > we
>> > > > > > should
>> > > > > > > >> > > > support
>> > > > > > > >> > > > > > user to define Timestamp for data on Event Time
>> or
>> > > > System
>> > > > > > Time
>> > > > > > > >> > > > according
>> > > > > > > >> > > > > to
>> > > > > > > >> > > > > > the job and computation later.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 2. The computation semantics of Fn
>> > > > > > > >> > > > > > a) Deterministic computation
>> > > > > > > >> > > > > > Most computations are deterministic such as map,
>> > > filter,
>> > > > > > count,
>> > > > > > > >> sum
>> > > > > > > >> > > and
>> > > > > > > >> > > > > > ect. They generate the same unordered result from
>> > the
>> > > > same
>> > > > > > > >> > unordered
>> > > > > > > >> > > > > input
>> > > > > > > >> > > > > > every time, and we can easily define data
>> > consistency
>> > > on
>> > > > > the
>> > > > > > > >> input
>> > > > > > > >> > > and
>> > > > > > > >> > > > > > output for them.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > b) Non-deterministic computation
>> > > > > > > >> > > > > > Some computations are non-deterministic. They
>> will
>> > > > produce
>> > > > > > > >> > different
>> > > > > > > >> > > > > > results from the same input every time. I try to
>> > > divide
>> > > > > them
>> > > > > > > >> into
>> > > > > > > >> > the
>> > > > > > > >> > > > > > following types:
>> > > > > > > >> > > > > > 1) Non-deterministic computation semantics, such
>> as
>> > > rank
>> > > > > > > >> operator.
>> > > > > > > >> > > When
>> > > > > > > >> > > > > it
>> > > > > > > >> > > > > > computes multiple times (for example, failover),
>> the
>> > > > first
>> > > > > > or
>> > > > > > > >> last
>> > > > > > > >> > > > output
>> > > > > > > >> > > > > > results can both be the final result which will
>> > cause
>> > > > > > different
>> > > > > > > >> > > > failover
>> > > > > > > >> > > > > > handlers for downstream jobs. I will expand it
>> > later.
>> > > > > > > >> > > > > > 2) Non-deterministic computation optimization,
>> such
>> > as
>> > > > > async
>> > > > > > > >> io. It
>> > > > > > > >> > > is
>> > > > > > > >> > > > > > necessary to sync these operations when the
>> barrier
>> > of
>> > > > > input
>> > > > > > > >> > arrives.
>> > > > > > > >> > > > > > 3) Deviation caused by data segmentat and
>> > computation
>> > > > > > semantics,
>> > > > > > > >> > such
>> > > > > > > >> > > > as
>> > > > > > > >> > > > > > Window. This requires that the users should
>> > customize
>> > > > the
>> > > > > > data
>> > > > > > > >> > > > > segmentation
>> > > > > > > >> > > > > > according to their needs correctly.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Checkpoint Barrier matches a) and Timestamp
>> Barrier
>> > > can
>> > > > > > match
>> > > > > > > >> all
>> > > > > > > >> > a)
>> > > > > > > >> > > > and
>> > > > > > > >> > > > > > b).
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > We define data consistency of BInput and BOutput
>> > based
>> > > > all
>> > > > > > > >> above.
>> > > > > > > >> > The
>> > > > > > > >> > > > > > BOutput of upstream ETL will be the BInput of the
>> > next
>> > > > > ETL,
>> > > > > > and
>> > > > > > > >> > > > multiple
>> > > > > > > >> > > > > > ETL jobs form a complex "ETL Topology".
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Based on the above definitions, I'd like to give
>> a
>> > > > general
>> > > > > > > >> proposal
>> > > > > > > >> > > > with
>> > > > > > > >> > > > > > "Timetamp Barrier" in my mind, it's not very
>> > detailed
>> > > > and
>> > > > > > please
>> > > > > > > >> > help
>> > > > > > > >> > > > to
>> > > > > > > >> > > > > > review it and feel free to comment @David, @Piotr
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 1. Data segment with Timestamp
>> > > > > > > >> > > > > > a) Users can define the Timestamp Barrier with
>> > System
>> > > > > Time,
>> > > > > > > >> Event
>> > > > > > > >> > > Time.
>> > > > > > > >> > > > > > b) Source nodes generate the same Timestamp
>> Barrier
>> > > > after
>> > > > > > > >> reading
>> > > > > > > >> > > data
>> > > > > > > >> > > > > > from RootTable
>> > > > > > > >> > > > > > c) There is a same Timetamp data in each record
>> > > > according
>> > > > > to
>> > > > > > > >> > > Timestamp
>> > > > > > > >> > > > > > Barrier, such as (a, T), (b, T), (c, T), (T,
>> > barrier)
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 2. Computation with Timestamp
>> > > > > > > >> > > > > > a) Records are unordered with the same Timestamp.
>> > > > > Stateless
>> > > > > > > >> > operators
>> > > > > > > >> > > > > such
>> > > > > > > >> > > > > > as map/flatmap/filter can process data without
>> > > aligning
>> > > > > > > >> Timestamp
>> > > > > > > >> > > > > Barrier,
>> > > > > > > >> > > > > > which is different from Checkpoint Barrier.
>> > > > > > > >> > > > > > b) Records between Timestamp are ordered.
>> Stateful
>> > > > > operators
>> > > > > > > >> must
>> > > > > > > >> > > align
>> > > > > > > >> > > > > > data and compute by each Timestamp, then compute
>> by
>> > > > > Timetamp
>> > > > > > > >> > > sequence.
>> > > > > > > >> > > > > > c) Stateful operators will output results of
>> > specific
>> > > > > > Timestamp
>> > > > > > > >> > after
>> > > > > > > >> > > > > > computation.
>> > > > > > > >> > > > > > d) Sink operator "commit records" with specific
>> > > > Timestamp
>> > > > > > and
>> > > > > > > >> > report
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > status to JobManager
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 3. Read data with Timestamp
>> > > > > > > >> > > > > > a) Downstream ETL reads data according to
>> Timestamp
>> > > > after
>> > > > > > > >> upstream
>> > > > > > > >> > > ETL
>> > > > > > > >> > > > > > "commit" it.
>> > > > > > > >> > > > > > b) Stateful operators interact with state when
>> > > computing
>> > > > > > data of
>> > > > > > > >> > > > > > Timestamp, but they won't trigger checkpoint for
>> > every
>> > > > > > > >> Timestamp.
>> > > > > > > >> > > > > Therefore
>> > > > > > > >> > > > > > source ETL job can generate Timestamp every few
>> > > seconds
>> > > > or
>> > > > > > even
>> > > > > > > >> > > > hundreds
>> > > > > > > >> > > > > of
>> > > > > > > >> > > > > > milliseconds
>> > > > > > > >> > > > > > c) Based on Timestamp the delay between ETL jobs
>> > will
>> > > be
>> > > > > > very
>> > > > > > > >> > small,
>> > > > > > > >> > > > and
>> > > > > > > >> > > > > > in the best case the E2E latency maybe only tens
>> of
>> > > > > seconds.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 4. Failover and Recovery
>> > > > > > > >> > > > > > ETL jobs are cascaded through the Intermediate
>> > Table.
>> > > > > After
>> > > > > > a
>> > > > > > > >> > single
>> > > > > > > >> > > > ETL
>> > > > > > > >> > > > > > job fails, it needs to replay the input data and
>> > > > recompute
>> > > > > > the
>> > > > > > > >> > > results.
>> > > > > > > >> > > > > As
>> > > > > > > >> > > > > > you mentioned, whether the cascaded ETL jobs are
>> > > > restarted
>> > > > > > > >> depends
>> > > > > > > >> > on
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > determinacy of the intermediate data between
>> them.
>> > > > > > > >> > > > > > a) An ETL job will rollback and reread data from
>> > > > upstream
>> > > > > > ETL by
>> > > > > > > >> > > > specific
>> > > > > > > >> > > > > > Timestamp according to the Checkpoint.
>> > > > > > > >> > > > > > b) According to the management of Checkpoint and
>> > > > > Timestamp,
>> > > > > > ETL
>> > > > > > > >> can
>> > > > > > > >> > > > > replay
>> > > > > > > >> > > > > > all Timestamp and data after failover, which
>> means
>> > > > BInput
>> > > > > > is the
>> > > > > > > >> > same
>> > > > > > > >> > > > > > before and after failover.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > c) For deterministic Fn, it generates the same
>> > BOutput
>> > > > > from
>> > > > > > the
>> > > > > > > >> > same
>> > > > > > > >> > > > > BInput
>> > > > > > > >> > > > > > 1) If there's no data of the specific Timestamp
>> in
>> > the
>> > > > > sink
>> > > > > > > >> table,
>> > > > > > > >> > > ETL
>> > > > > > > >> > > > > > just "commit" it as normal.
>> > > > > > > >> > > > > > 2) If the Timestamp data exists in the sink
>> table,
>> > ETL
>> > > > can
>> > > > > > just
>> > > > > > > >> > > discard
>> > > > > > > >> > > > > > the new data.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > d) For non-deterministic Fn, it generates
>> different
>> > > > > BOutput
>> > > > > > from
>> > > > > > > >> > the
>> > > > > > > >> > > > same
>> > > > > > > >> > > > > > BInput before and after failover. For example,
>> > > BOutput1
>> > > > > > before
>> > > > > > > >> > > failover
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > > BOutput2 after failover. The state in ETL is
>> > > consistent
>> > > > > with
>> > > > > > > >> > > BOutput2.
>> > > > > > > >> > > > > > There are two cases according to users'
>> requirements
>> > > > > > > >> > > > > > 1) Users can accept BOutput1 as the final output
>> and
>> > > > > > downstream
>> > > > > > > >> > ETLs
>> > > > > > > >> > > > > don't
>> > > > > > > >> > > > > > need to restart. Sink in ETL can discard BOutput2
>> > > > directly
>> > > > > > if
>> > > > > > > >> the
>> > > > > > > >> > > > > Timestamp
>> > > > > > > >> > > > > > exists in the sink table.
>> > > > > > > >> > > > > > 2) Users only accept BOutput2 as the final
>> output,
>> > > then
>> > > > > all
>> > > > > > the
>> > > > > > > >> > > > > downstream
>> > > > > > > >> > > > > > ETLs and Intermediate Table should rollback to
>> > > specific
>> > > > > > > >> Timestamp,
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > downstream ETLs should be restarted too.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > The following is a simple example. Data is
>> > transferred
>> > > > > > between
>> > > > > > > >> > ETL1,
>> > > > > > > >> > > > ETL2
>> > > > > > > >> > > > > > and ETL3 in Intermediate Table by Timestamp.
>> > > > > > > >> > > > > > [image: simple_example.jpg]
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Besides Timestamp, there's a big challenge in
>> > > > Intermediate
>> > > > > > > >> Table.
>> > > > > > > >> > It
>> > > > > > > >> > > > > > should support a highly implemented "commit
>> > Timestamp
>> > > > > > snapshot"
>> > > > > > > >> > with
>> > > > > > > >> > > > high
>> > > > > > > >> > > > > > throughput, which requires the Table Store to
>> > enhance
>> > > > > > streaming
>> > > > > > > >> > > > > > capabilities like pulsar or kafka.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > In this FLIP, we plan to implement the proposal
>> with
>> > > > > > Checkpoint,
>> > > > > > > >> > the
>> > > > > > > >> > > > > above
>> > > > > > > >> > > > > > Timestamp can be replaced by Checkpoint. Of
>> course,
>> > > > > > Checkpoint
>> > > > > > > >> has
>> > > > > > > >> > > some
>> > > > > > > >> > > > > > problems. I think we have reached some consensus
>> in
>> > > the
>> > > > > > > >> discussion
>> > > > > > > >> > > > about
>> > > > > > > >> > > > > > the Checkpoint problems, including data segment
>> > > > semantics,
>> > > > > > flush
>> > > > > > > >> > data
>> > > > > > > >> > > > of
>> > > > > > > >> > > > > > some operators, and the increase of E2E delay.
>> > > However,
>> > > > > > from the
>> > > > > > > >> > > > > > perspective of implementation complexity, I
>> > personally
>> > > > > think
>> > > > > > > >> using
>> > > > > > > >> > > > > > Checkpoint in the first phase makes sense, what
>> do
>> > you
>> > > > > > think?
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Finally, I think I misunderstood the "Rolling
>> > > > Checkpoint"
>> > > > > > and
>> > > > > > > >> "All
>> > > > > > > >> > at
>> > > > > > > >> > > > > once
>> > > > > > > >> > > > > > Checkpoint" in my last explanation which you and
>> > > @David
>> > > > > > > >> mentioned.
>> > > > > > > >> > I
>> > > > > > > >> > > > > > thought their differences were mainly to select
>> > > > different
>> > > > > > table
>> > > > > > > >> > > > versions
>> > > > > > > >> > > > > > for queries. According to your reply, I think it
>> is
>> > > > > whether
>> > > > > > > >> there
>> > > > > > > >> > are
>> > > > > > > >> > > > > > multiple "rolling checkpoints" in each ETL job,
>> > right?
>> > > > If
>> > > > > I
>> > > > > > > >> > > understand
>> > > > > > > >> > > > > > correctly, the "Rolling Checkpoint" is a good
>> idea,
>> > > and
>> > > > we
>> > > > > > can
>> > > > > > > >> > > > guarantee
>> > > > > > > >> > > > > > "Strong Data Consistency" between multiple
>> tables in
>> > > > > > MetaService
>> > > > > > > >> > for
>> > > > > > > >> > > > > > queries. Thanks.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Best,
>> > > > > > > >> > > > > > Shammon
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski <
>> > > > > > > >> > pnowoj...@apache.org
>> > > > > > > >> > > >
>> > > > > > > >> > > > > > wrote:
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >> Hi Shammon,
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Thanks for the explanations, I think I
>> understand
>> > the
>> > > > > > problem
>> > > > > > > >> > better
>> > > > > > > >> > > > > now.
>> > > > > > > >> > > > > >> I have a couple of follow up questions, but
>> first:
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> >> 3. I'm pretty sure there are counter
>> examples,
>> > > where
>> > > > > > your
>> > > > > > > >> > > proposed
>> > > > > > > >> > > > > >> mechanism of using checkpoints (even aligned!)
>> will
>> > > > > produce
>> > > > > > > >> > > > > >> inconsistent data from the perspective of the
>> event
>> > > > time.
>> > > > > > > >> > > > > >> >>  a) For example what if one of your "ETL"
>> jobs,
>> > > has
>> > > > > the
>> > > > > > > >> > following
>> > > > > > > >> > > > > DAG:
>> > > > > > > >> > > > > >> >>
>> > > > > > > >> > > > > >> >>  Even if you use aligned checkpoints for
>> > > committing
>> > > > > the
>> > > > > > > >> data to
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >> sink table, the watermarks of "Window1" and
>> > "Window2"
>> > > > are
>> > > > > > > >> > completely
>> > > > > > > >> > > > > >> independent. The sink table might easily have
>> data
>> > > from
>> > > > > the
>> > > > > > > >> > > > Src1/Window1
>> > > > > > > >> > > > > >> from the event time T1 and Src2/Window2 from
>> later
>> > > > event
>> > > > > > time
>> > > > > > > >> T2.
>> > > > > > > >> > > > > >> >>  b) I think the same applies if you have two
>> > > > > completely
>> > > > > > > >> > > > > >> independent ETL jobs writing either to the same
>> > sink
>> > > > > > table, or
>> > > > > > > >> two
>> > > > > > > >> > > to
>> > > > > > > >> > > > > >> different sink tables (that are both later used
>> in
>> > > the
>> > > > > same
>> > > > > > > >> > > downstream
>> > > > > > > >> > > > > job).
>> > > > > > > >> > > > > >> >
>> > > > > > > >> > > > > >> > Thank you for your feedback. I cannot see the
>> DAG
>> > > in
>> > > > > 3.a
>> > > > > > in
>> > > > > > > >> your
>> > > > > > > >> > > > > reply,
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> I've attached the image directly. I hope you can
>> > see
>> > > it
>> > > > > > now.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Basically what I meant is that if you have a
>> > topology
>> > > > > like
>> > > > > > > >> (from
>> > > > > > > >> > the
>> > > > > > > >> > > > > >> attached image):
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> window1 = src1.keyBy(...).window(...)
>> > > > > > > >> > > > > >> window2 = src2.keyBy(...).window(...)
>> > > > > > > >> > > > > >> window1.join(window2, ...).addSink(sink)
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> or with even simpler (note no keyBy between
>> `src`
>> > and
>> > > > > > > >> `process`):
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >>
>> > > > > src.process(some_function_that_buffers_data)..addSink(sink)
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> you will have the same problem. Generally
>> speaking
>> > if
>> > > > > > there is
>> > > > > > > >> an
>> > > > > > > >> > > > > >> operator buffering some data, and if the data
>> are
>> > not
>> > > > > > flushed
>> > > > > > > >> on
>> > > > > > > >> > > every
>> > > > > > > >> > > > > >> checkpoint (any windowed or temporal operator,
>> > > > > > > >> AsyncWaitOperator,
>> > > > > > > >> > > CEP,
>> > > > > > > >> > > > > >> ...), you can design a graph that will produce
>> > > > > > "inconsistent"
>> > > > > > > >> data
>> > > > > > > >> > > as
>> > > > > > > >> > > > > part
>> > > > > > > >> > > > > >> of a checkpoint.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Apart from that a couple of other
>> questions/issues.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> > 1) Global Checkpoint Commit: a) "rolling
>> fashion"
>> > > or
>> > > > b)
>> > > > > > > >> > altogether
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Do we need to support the "altogether" one?
>> Rolling
>> > > > > > > >> checkpoint, as
>> > > > > > > >> > > > it's
>> > > > > > > >> > > > > >> more independent, I could see it scale much
>> better,
>> > > and
>> > > > > > avoid a
>> > > > > > > >> > lot
>> > > > > > > >> > > of
>> > > > > > > >> > > > > >> problems that I mentioned before.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> > 1) Checkpoint VS Watermark
>> > > > > > > >> > > > > >> >
>> > > > > > > >> > > > > >> > 1. Stateful Computation is aligned according
>> to
>> > > > > Timestamp
>> > > > > > > >> > Barrier
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Indeed the biggest obstacle I see here, is that
>> we
>> > > > would
>> > > > > > indeed
>> > > > > > > >> > most
>> > > > > > > >> > > > > >> likely have:
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> > b) Similar to the window operator, align data
>> in
>> > > > memory
>> > > > > > > >> > according
>> > > > > > > >> > > to
>> > > > > > > >> > > > > >> Timestamp.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> for every operator.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> > 4. Failover supports Timestamp fine-grained
>> data
>> > > > > recovery
>> > > > > > > >> > > > > >> >
>> > > > > > > >> > > > > >> > As we mentioned in the FLIP, each ETL is a
>> > complex
>> > > > > single
>> > > > > > > >> node.
>> > > > > > > >> > A
>> > > > > > > >> > > > > single
>> > > > > > > >> > > > > >> > ETL job failover should not cause the failure
>> of
>> > > the
>> > > > > > entire
>> > > > > > > >> "ETL
>> > > > > > > >> > > > > >> Topology".
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> I don't understand this point. Regardless if we
>> are
>> > > > using
>> > > > > > > >> > > > > >> rolling checkpoints, all at once checkpoints or
>> > > > > > watermarks, I
>> > > > > > > >> see
>> > > > > > > >> > > the
>> > > > > > > >> > > > > same
>> > > > > > > >> > > > > >> problems with non determinism, if we want to
>> > preserve
>> > > > the
>> > > > > > > >> > > requirement
>> > > > > > > >> > > > to
>> > > > > > > >> > > > > >> not fail over the whole topology at once.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Both Watermarks and "rolling checkpoint" I think
>> > have
>> > > > the
>> > > > > > same
>> > > > > > > >> > > issue,
>> > > > > > > >> > > > > >> that either require deterministic logic, or
>> global
>> > > > > > failover, or
>> > > > > > > >> > > > > downstream
>> > > > > > > >> > > > > >> jobs can only work on the already committed by
>> the
>> > > > > upstream
>> > > > > > > >> > records.
>> > > > > > > >> > > > But
>> > > > > > > >> > > > > >> working with only "committed records" would
>> either
>> > > > brake
>> > > > > > > >> > consistency
>> > > > > > > >> > > > > >> between different jobs, or would cause huge
>> delay
>> > in
>> > > > > > > >> checkpointing
>> > > > > > > >> > > and
>> > > > > > > >> > > > > e2e
>> > > > > > > >> > > > > >> latency, as:
>> > > > > > > >> > > > > >> 1. upstream job has to produce some data,
>> > downstream
>> > > > can
>> > > > > > not
>> > > > > > > >> > process
>> > > > > > > >> > > > it,
>> > > > > > > >> > > > > >> downstream can not process this data yet
>> > > > > > > >> > > > > >> 2. checkpoint 42 is triggered on the upstream
>> job
>> > > > > > > >> > > > > >> 3. checkpoint 42 is completed on the upstream
>> job,
>> > > data
>> > > > > > > >> processed
>> > > > > > > >> > > > since
>> > > > > > > >> > > > > >> last checkpoint has been committed
>> > > > > > > >> > > > > >> 4. upstream job can continue producing more data
>> > > > > > > >> > > > > >> 5. only now downstream can start processing the
>> > data
>> > > > > > produced
>> > > > > > > >> in
>> > > > > > > >> > 1.,
>> > > > > > > >> > > > but
>> > > > > > > >> > > > > >> it can not read the not-yet-committed data from
>> 4.
>> > > > > > > >> > > > > >> 6. once downstream finishes processing data from
>> > 1.,
>> > > it
>> > > > > can
>> > > > > > > >> > trigger
>> > > > > > > >> > > > > >> checkpoint 42
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> The "all at once checkpoint", I can see only
>> > working
>> > > > with
>> > > > > > > >> global
>> > > > > > > >> > > > > failover
>> > > > > > > >> > > > > >> of everything.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> This is assuming exactly-once mode.
>> at-least-once
>> > > would
>> > > > > be
>> > > > > > much
>> > > > > > > >> > > > easier.
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> Best,
>> > > > > > > >> > > > > >> Piotrek
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >> wt., 13 gru 2022 o 08:57 Shammon FY <
>> > > zjur...@gmail.com
>> > > > >
>> > > > > > > >> > napisał(a):
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > > >>> Hi David,
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> Thanks for the comments from you and @Piotr.
>> I'd
>> > > like
>> > > > to
>> > > > > > > >> explain
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> details about the FLIP first.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 1) Global Checkpoint Commit: a) "rolling
>> fashion"
>> > or
>> > > > b)
>> > > > > > > >> > altogether
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> This mainly depends on the needs of users.
>> Users
>> > can
>> > > > > > decide
>> > > > > > > >> the
>> > > > > > > >> > > data
>> > > > > > > >> > > > > >>> version of tables in their queries according to
>> > > > > different
>> > > > > > > >> > > > requirements
>> > > > > > > >> > > > > >>> for
>> > > > > > > >> > > > > >>> data consistency and freshness. Since we manage
>> > > > multiple
>> > > > > > > >> versions
>> > > > > > > >> > > for
>> > > > > > > >> > > > > >>> each
>> > > > > > > >> > > > > >>> table, this will not bring too much complexity
>> to
>> > > the
>> > > > > > system.
>> > > > > > > >> We
>> > > > > > > >> > > only
>> > > > > > > >> > > > > >>> need
>> > > > > > > >> > > > > >>> to support different strategies when
>> calculating
>> > > table
>> > > > > > > >> versions
>> > > > > > > >> > for
>> > > > > > > >> > > > > >>> query.
>> > > > > > > >> > > > > >>> So we give this decision to users, who can use
>> > > > > > > >> "consistency.type"
>> > > > > > > >> > > to
>> > > > > > > >> > > > > set
>> > > > > > > >> > > > > >>> different consistency in "Catalog". We can
>> > continue
>> > > to
>> > > > > > refine
>> > > > > > > >> > this
>> > > > > > > >> > > > > later.
>> > > > > > > >> > > > > >>> For example, dynamic parameters support
>> different
>> > > > > > consistency
>> > > > > > > >> > > > > >>> requirements
>> > > > > > > >> > > > > >>> for each query
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 2) MetaService module
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> Many Flink streaming jobs use application mode,
>> > and
>> > > > they
>> > > > > > are
>> > > > > > > >> > > > > independent
>> > > > > > > >> > > > > >>> of
>> > > > > > > >> > > > > >>> each other. So we currently assume that
>> > MetaService
>> > > is
>> > > > > an
>> > > > > > > >> > > independent
>> > > > > > > >> > > > > >>> node.
>> > > > > > > >> > > > > >>> In the first phase, it will be started in
>> > > standalone,
>> > > > > and
>> > > > > > HA
>> > > > > > > >> will
>> > > > > > > >> > > be
>> > > > > > > >> > > > > >>> supported later. This node will reuse many
>> Flink
>> > > > > modules,
>> > > > > > > >> > including
>> > > > > > > >> > > > > REST,
>> > > > > > > >> > > > > >>> Gateway-RpcServer, etc. We hope that the core
>> > > > functions
>> > > > > of
>> > > > > > > >> > > > MetaService
>> > > > > > > >> > > > > >>> can
>> > > > > > > >> > > > > >>> be developed as a component. When Flink
>> > subsequently
>> > > > > uses
>> > > > > > a
>> > > > > > > >> large
>> > > > > > > >> > > > > session
>> > > > > > > >> > > > > >>> cluster to support various computations, it
>> can be
>> > > > > > integrated
>> > > > > > > >> > into
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> "ResourceManager" as a plug-in component.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> Besides above, I'd like to describe the
>> Checkpoint
>> > > and
>> > > > > > > >> Watermark
>> > > > > > > >> > > > > >>> mechanisms
>> > > > > > > >> > > > > >>> in detail as follows.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 1) Checkpoint VS Watermark
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> As you mentioned, I think it's very correct
>> that
>> > > what
>> > > > we
>> > > > > > want
>> > > > > > > >> in
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> Checkpoint is to align streaming computation
>> and
>> > > data
>> > > > > > > >> according
>> > > > > > > >> > to
>> > > > > > > >> > > > > >>> certain
>> > > > > > > >> > > > > >>> semantics. Timestamp is a very ideal solution.
>> To
>> > > > > achieve
>> > > > > > this
>> > > > > > > >> > > goal,
>> > > > > > > >> > > > we
>> > > > > > > >> > > > > >>> can
>> > > > > > > >> > > > > >>> think of the following functions that need to
>> be
>> > > > > > supported in
>> > > > > > > >> the
>> > > > > > > >> > > > > >>> Watermark
>> > > > > > > >> > > > > >>> mechanism:
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 1. Stateful Computation is aligned according to
>> > > > > Timestamp
>> > > > > > > >> Barrier
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> As the "three tables example" we discussed
>> above,
>> > we
>> > > > > need
>> > > > > > to
>> > > > > > > >> > align
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> stateful operator computation according to the
>> > > barrier
>> > > > > to
>> > > > > > > >> ensure
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> consistency of the result data. In order to
>> align
>> > > the
>> > > > > > > >> > computation,
>> > > > > > > >> > > > > there
>> > > > > > > >> > > > > >>> are two ways in my mind
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> a) Similar to the Aligned Checkpoint Barrier.
>> > > > Timestamp
>> > > > > > > >> Barrier
>> > > > > > > >> > > > aligns
>> > > > > > > >> > > > > >>> data
>> > > > > > > >> > > > > >>> according to the channel, which will lead to
>> > > > > backpressure
>> > > > > > just
>> > > > > > > >> > like
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> aligned checkpoint. It seems not a good idea.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> b) Similar to the window operator, align data
>> in
>> > > > memory
>> > > > > > > >> according
>> > > > > > > >> > > to
>> > > > > > > >> > > > > >>> Timestamp. Two steps need to be supported here:
>> > > first,
>> > > > > > data is
>> > > > > > > >> > > > aligned
>> > > > > > > >> > > > > by
>> > > > > > > >> > > > > >>> timestamp for state operators; secondly,
>> Timestamp
>> > > is
>> > > > > > strictly
>> > > > > > > >> > > > > >>> sequential,
>> > > > > > > >> > > > > >>> global aggregation operators need to perform
>> > > > aggregation
>> > > > > > in
>> > > > > > > >> > > timestamp
>> > > > > > > >> > > > > >>> order
>> > > > > > > >> > > > > >>> and output the final results.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 2. Coordinate multiple source nodes to assign
>> > > unified
>> > > > > > > >> Timestamp
>> > > > > > > >> > > > > Barriers
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> Since the stateful operator needs to be aligned
>> > > > > according
>> > > > > > to
>> > > > > > > >> the
>> > > > > > > >> > > > > >>> Timestamp
>> > > > > > > >> > > > > >>> Barrier, source subtasks of multiple jobs
>> should
>> > > > > generate
>> > > > > > the
>> > > > > > > >> > same
>> > > > > > > >> > > > > >>> Timestamp Barrier. ETL jobs consuming RootTable
>> > > should
>> > > > > > > >> interact
>> > > > > > > >> > > with
>> > > > > > > >> > > > > >>> "MetaService" to generate the same Timestamp
>> T1,
>> > T2,
>> > > > T3
>> > > > > > ...
>> > > > > > > >> and
>> > > > > > > >> > so
>> > > > > > > >> > > > on.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 3. JobManager needs to manage the completed
>> > > Timestamp
>> > > > > > Barrier
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> When the Timestamp Barrier of the ETL job has
>> been
>> > > > > > completed,
>> > > > > > > >> it
>> > > > > > > >> > > > means
>> > > > > > > >> > > > > >>> that
>> > > > > > > >> > > > > >>> the data of the specified Timestamp can be
>> queried
>> > > by
>> > > > > > users.
>> > > > > > > >> > > > JobManager
>> > > > > > > >> > > > > >>> needs to summarize its Timestamp processing and
>> > > report
>> > > > > the
>> > > > > > > >> > > completed
>> > > > > > > >> > > > > >>> Timestamp and data snapshots to the MetaServer.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 4. Failover supports Timestamp fine-grained
>> data
>> > > > > recovery
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> As we mentioned in the FLIP, each ETL is a
>> complex
>> > > > > single
>> > > > > > > >> node. A
>> > > > > > > >> > > > > single
>> > > > > > > >> > > > > >>> ETL job failover should not cause the failure
>> of
>> > the
>> > > > > > entire
>> > > > > > > >> "ETL
>> > > > > > > >> > > > > >>> Topology".
>> > > > > > > >> > > > > >>> This requires that the result data of Timestamp
>> > > > > generated
>> > > > > > by
>> > > > > > > >> > > upstream
>> > > > > > > >> > > > > ETL
>> > > > > > > >> > > > > >>> should be deterministic.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> a) The determinacy of Timestamp, that is,
>> before
>> > and
>> > > > > > after ETL
>> > > > > > > >> > job
>> > > > > > > >> > > > > >>> failover, the same Timestamp sequence must be
>> > > > generated.
>> > > > > > Each
>> > > > > > > >> > > > > Checkpoint
>> > > > > > > >> > > > > >>> needs to record the included Timestamp list,
>> > > > especially
>> > > > > > the
>> > > > > > > >> > source
>> > > > > > > >> > > > node
>> > > > > > > >> > > > > >>> of
>> > > > > > > >> > > > > >>> the RootTable. After Failover, it needs to
>> > > regenerate
>> > > > > > > >> Timestamp
>> > > > > > > >> > > > > according
>> > > > > > > >> > > > > >>> to the Timestamp list.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> b) The determinacy of Timestamp data, that is,
>> the
>> > > > same
>> > > > > > > >> Timestamp
>> > > > > > > >> > > > needs
>> > > > > > > >> > > > > >>> to
>> > > > > > > >> > > > > >>> replay the same data before and after Failover,
>> > and
>> > > > > > generate
>> > > > > > > >> the
>> > > > > > > >> > > same
>> > > > > > > >> > > > > >>> results in Sink Table. Each Timestamp must save
>> > > start
>> > > > > and
>> > > > > > end
>> > > > > > > >> > > offsets
>> > > > > > > >> > > > > (or
>> > > > > > > >> > > > > >>> snapshot id) of RootTable. After failover, the
>> > > source
>> > > > > > nodes
>> > > > > > > >> need
>> > > > > > > >> > to
>> > > > > > > >> > > > > >>> replay
>> > > > > > > >> > > > > >>> the data according to the offset to ensure that
>> > the
>> > > > data
>> > > > > > of
>> > > > > > > >> each
>> > > > > > > >> > > > > >>> Timestamp
>> > > > > > > >> > > > > >>> is consistent before and after Failover.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> For the specific requirements and complexity,
>> > please
>> > > > > help
>> > > > > > to
>> > > > > > > >> > review
>> > > > > > > >> > > > > when
>> > > > > > > >> > > > > >>> you are free @David @Piotr, thanks :)
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> 2) Evolution from Checkpoint to Timestamp
>> > Mechanism
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> You give a very important question in your
>> reply
>> > > > which I
>> > > > > > > >> missed
>> > > > > > > >> > > > before:
>> > > > > > > >> > > > > >>> if
>> > > > > > > >> > > > > >>> Aligned Checkpoint is used in the first stage,
>> how
>> > > > > > complex is
>> > > > > > > >> the
>> > > > > > > >> > > > > >>> evolution
>> > > > > > > >> > > > > >>> from Checkpoint to Timestamp later? I made a
>> > general
>> > > > > > > >> comparison
>> > > > > > > >> > > here,
>> > > > > > > >> > > > > >>> which
>> > > > > > > >> > > > > >>> may not be very detailed. There are three
>> roles in
>> > > the
>> > > > > > whole
>> > > > > > > >> > > system:
>> > > > > > > >> > > > > >>> MetaService, Flink ETL Job and Table Store.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> a) MetaService
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> It manages the data consistency among multiple
>> ETL
>> > > > jobs,
>> > > > > > > >> > including
>> > > > > > > >> > > > > >>> coordinating the Barrier for the Source ETL
>> nodes,
>> > > > > > setting the
>> > > > > > > >> > > > starting
>> > > > > > > >> > > > > >>> Barrier for ETL job startup, and calculating
>> the
>> > > Table
>> > > > > > version
>> > > > > > > >> > for
>> > > > > > > >> > > > > >>> queries
>> > > > > > > >> > > > > >>> according to different strategies. It has
>> little
>> > to
>> > > do
>> > > > > > with
>> > > > > > > >> > > > Checkpoint
>> > > > > > > >> > > > > in
>> > > > > > > >> > > > > >>> fact, we can pay attention to it when designing
>> > the
>> > > > API
>> > > > > > and
>> > > > > > > >> > > > > implementing
>> > > > > > > >> > > > > >>> the functions.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> b) Flink ETL Job
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> At present, the workload is relatively small
>> and
>> > we
>> > > > need
>> > > > > > to
>> > > > > > > >> > trigger
>> > > > > > > >> > > > > >>> checkpoints in CheckpointCoordinator manually
>> by
>> > > > > > > >> SplitEnumerator.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> c) Table Store
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> Table Store mainly provides the ability to
>> write
>> > and
>> > > > > read
>> > > > > > > >> data.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> c.1) Write data. At present, Table Store
>> generates
>> > > > > > snapshots
>> > > > > > > >> > > > according
>> > > > > > > >> > > > > to
>> > > > > > > >> > > > > >>> two phases in Flink. When using Checkpoint as
>> > > > > consistency
>> > > > > > > >> > > management,
>> > > > > > > >> > > > > we
>> > > > > > > >> > > > > >>> need to write checkpoint information to
>> snapshots.
>> > > > After
>> > > > > > using
>> > > > > > > >> > > > > Timestamp
>> > > > > > > >> > > > > >>> Barrier, the snapshot in Table Store may be
>> > > > disassembled
>> > > > > > more
>> > > > > > > >> > > finely,
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > >>> we need to write Timestamp information to the
>> data
>> > > > > file. A
>> > > > > > > >> > > > > "checkpointed
>> > > > > > > >> > > > > >>> snapshot" may contain multiple "Timestamp
>> > > snapshots".
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> c.2) Read data. The SplitEnumerator that reads
>> > data
>> > > > from
>> > > > > > the
>> > > > > > > >> > Table
>> > > > > > > >> > > > > Store
>> > > > > > > >> > > > > >>> will manage multiple splits according to the
>> > version
>> > > > > > number.
>> > > > > > > >> > After
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> specified splits are completed, it sends a
>> Barrier
>> > > > > > command to
>> > > > > > > >> > > > trigger a
>> > > > > > > >> > > > > >>> checkpoint in the ETL job. The source node will
>> > > > > broadcast
>> > > > > > the
>> > > > > > > >> > > > > checkpoint
>> > > > > > > >> > > > > >>> barrier downstream after receiving it. When
>> using
>> > > > > > Timestamp
>> > > > > > > >> > > Barrier,
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > >>> overall process is similar, but the
>> > SplitEnumerator
>> > > > does
>> > > > > > not
>> > > > > > > >> need
>> > > > > > > >> > > to
>> > > > > > > >> > > > > >>> trigger a checkpoint to the Flink ETL, and the
>> > > Source
>> > > > > node
>> > > > > > > >> needs
>> > > > > > > >> > to
>> > > > > > > >> > > > > >>> support
>> > > > > > > >> > > > > >>> broadcasting Timestamp Barrier to the
>> downstream
>> > at
>> > > > that
>> > > > > > time.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> From the above overall, the evolution
>> complexity
>> > > from
>> > > > > > > >> Checkpoint
>> > > > > > > >> > to
>> > > > > > > >> > > > > >>> Timestamp seems controllable, but the specific
>> > > > > > implementation
>> > > > > > > >> > needs
>> > > > > > > >> > > > > >>> careful
>> > > > > > > >> > > > > >>> design, and the concept and features of
>> Checkpoint
>> > > > > should
>> > > > > > not
>> > > > > > > >> be
>> > > > > > > >> > > > > >>> introduced
>> > > > > > > >> > > > > >>> too much into relevant interfaces and
>> functions.
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> What do you think of it? Looking forward to
>> your
>> > > > > feedback,
>> > > > > > > >> thanks
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> Best,
>> > > > > > > >> > > > > >>> Shammon
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek
>> <
>> > > > > > > >> d...@apache.org>
>> > > > > > > >> > > > > wrote:
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>> > Hi Shammon,
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > I'm starting to see what you're trying to
>> > achieve,
>> > > > and
>> > > > > > it's
>> > > > > > > >> > > really
>> > > > > > > >> > > > > >>> > exciting. I share Piotr's concerns about e2e
>> > > latency
>> > > > > and
>> > > > > > > >> > > disability
>> > > > > > > >> > > > > to
>> > > > > > > >> > > > > >>> use
>> > > > > > > >> > > > > >>> > unaligned checkpoints.
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > I have a couple of questions that are not
>> clear
>> > to
>> > > > me
>> > > > > > from
>> > > > > > > >> > going
>> > > > > > > >> > > > over
>> > > > > > > >> > > > > >>> the
>> > > > > > > >> > > > > >>> > FLIP:
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > 1) Global Checkpoint Commit
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > Are you planning on committing the
>> checkpoints
>> > in
>> > > > a) a
>> > > > > > > >> "rolling
>> > > > > > > >> > > > > >>> fashion" -
>> > > > > > > >> > > > > >>> > one pipeline after another, or b) altogether
>> -
>> > > once
>> > > > > the
>> > > > > > data
>> > > > > > > >> > have
>> > > > > > > >> > > > > been
>> > > > > > > >> > > > > >>> > processed by all pipelines?
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > Option a) would be eventually consistent (for
>> > > batch
>> > > > > > queries,
>> > > > > > > >> > > you'd
>> > > > > > > >> > > > > >>> need to
>> > > > > > > >> > > > > >>> > use the last checkpoint produced by the most
>> > > > > downstream
>> > > > > > > >> table),
>> > > > > > > >> > > > > >>> whereas b)
>> > > > > > > >> > > > > >>> > would be strongly consistent at the cost of
>> > > > increasing
>> > > > > > the
>> > > > > > > >> e2e
>> > > > > > > >> > > > > latency
>> > > > > > > >> > > > > >>> even
>> > > > > > > >> > > > > >>> > more.
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > I feel that option a) is what this should be
>> > > headed
>> > > > > for.
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > 2) MetaService
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > Should this be a new general Flink component
>> or
>> > > one
>> > > > > > > >> specific to
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> Flink
>> > > > > > > >> > > > > >>> > Table Store?
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > 3) Follow-ups
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > From the above discussion, there is a
>> consensus
>> > > > that,
>> > > > > > in the
>> > > > > > > >> > > ideal
>> > > > > > > >> > > > > >>> case,
>> > > > > > > >> > > > > >>> > watermarks would be a way to go, but there is
>> > some
>> > > > > > > >> underlying
>> > > > > > > >> > > > > mechanism
>> > > > > > > >> > > > > >>> > missing. It would be great to discuss this
>> > option
>> > > in
>> > > > > > more
>> > > > > > > >> > detail
>> > > > > > > >> > > to
>> > > > > > > >> > > > > >>> compare
>> > > > > > > >> > > > > >>> > the solutions in terms of implementation
>> cost,
>> > > maybe
>> > > > > it
>> > > > > > > >> could
>> > > > > > > >> > not
>> > > > > > > >> > > > be
>> > > > > > > >> > > > > as
>> > > > > > > >> > > > > >>> > complex.
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > All in all, I don't feel that checkpoints are
>> > > > suitable
>> > > > > > for
>> > > > > > > >> > > > providing
>> > > > > > > >> > > > > >>> > consistent table versioning between multiple
>> > > > > pipelines.
>> > > > > > The
>> > > > > > > >> > main
>> > > > > > > >> > > > > >>> reason is
>> > > > > > > >> > > > > >>> > that they are designed to be a fault
>> tolerance
>> > > > > > mechanism.
>> > > > > > > >> > > Somewhere
>> > > > > > > >> > > > > >>> between
>> > > > > > > >> > > > > >>> > the lines, you've already noted that the
>> > primitive
>> > > > > > you're
>> > > > > > > >> > looking
>> > > > > > > >> > > > for
>> > > > > > > >> > > > > >>> is
>> > > > > > > >> > > > > >>> > cross-pipeline barrier alignment, which is
>> the
>> > > > > > mechanism a
>> > > > > > > >> > subset
>> > > > > > > >> > > > of
>> > > > > > > >> > > > > >>> > currently supported checkpointing
>> > implementations
>> > > > > > happen to
>> > > > > > > >> be
>> > > > > > > >> > > > using.
>> > > > > > > >> > > > > >>> Is
>> > > > > > > >> > > > > >>> > that correct?
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > My biggest concern is that tying this with a
>> > > > > > "side-effect"
>> > > > > > > >> of
>> > > > > > > >> > the
>> > > > > > > >> > > > > >>> > checkpointing mechanism could block us from
>> > > evolving
>> > > > > it
>> > > > > > > >> > further.
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > Best,
>> > > > > > > >> > > > > >>> > D.
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY <
>> > > > > > > >> zjur...@gmail.com>
>> > > > > > > >> > > > > wrote:
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>> > > Hi Piotr,
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Thank you for your feedback. I cannot see
>> the
>> > > DAG
>> > > > in
>> > > > > > 3.a
>> > > > > > > >> in
>> > > > > > > >> > > your
>> > > > > > > >> > > > > >>> reply,
>> > > > > > > >> > > > > >>> > but
>> > > > > > > >> > > > > >>> > > I'd like to answer some questions first.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Your understanding is very correct. We
>> want to
>> > > > align
>> > > > > > the
>> > > > > > > >> data
>> > > > > > > >> > > > > >>> versions of
>> > > > > > > >> > > > > >>> > > all intermediate tables through checkpoint
>> > > > mechanism
>> > > > > > in
>> > > > > > > >> > Flink.
>> > > > > > > >> > > > I'm
>> > > > > > > >> > > > > >>> sorry
>> > > > > > > >> > > > > >>> > > that I have omitted some default
>> constraints
>> > in
>> > > > > FLIP,
>> > > > > > > >> > including
>> > > > > > > >> > > > > only
>> > > > > > > >> > > > > >>> > > supporting aligned checkpoints; one table
>> can
>> > > only
>> > > > > be
>> > > > > > > >> written
>> > > > > > > >> > > by
>> > > > > > > >> > > > > one
>> > > > > > > >> > > > > >>> ETL
>> > > > > > > >> > > > > >>> > > job. I will add these later.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Why can't the watermark mechanism achieve
>> the
>> > > data
>> > > > > > > >> > consistency
>> > > > > > > >> > > we
>> > > > > > > >> > > > > >>> wanted?
>> > > > > > > >> > > > > >>> > > For example, there are 3 tables, Table1 is
>> > word
>> > > > > table,
>> > > > > > > >> Table2
>> > > > > > > >> > > is
>> > > > > > > >> > > > > >>> > word->cnt
>> > > > > > > >> > > > > >>> > > table and Table3 is cnt1->cnt2 table.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO
>> > > Table2
>> > > > > > SELECT
>> > > > > > > >> > word,
>> > > > > > > >> > > > > >>> count(*)
>> > > > > > > >> > > > > >>> > > FROM Table1 GROUP BY word
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO
>> > > Table3
>> > > > > > SELECT
>> > > > > > > >> cnt,
>> > > > > > > >> > > > > >>> count(*)
>> > > > > > > >> > > > > >>> > FROM
>> > > > > > > >> > > > > >>> > > Table2 GROUP BY cnt
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > ETL1 has 2 subtasks to read multiple
>> buckets
>> > > from
>> > > > > > Table1,
>> > > > > > > >> > where
>> > > > > > > >> > > > > >>> subtask1
>> > > > > > > >> > > > > >>> > > reads streaming data as [a, b, c, a, d, a,
>> b,
>> > > c, d
>> > > > > > ...]
>> > > > > > > >> and
>> > > > > > > >> > > > > subtask2
>> > > > > > > >> > > > > >>> > reads
>> > > > > > > >> > > > > >>> > > streaming data as [a, c, d, q, a, v, c, d
>> > ...].
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 1. Unbounded streaming data is divided into
>> > > > multiple
>> > > > > > sets
>> > > > > > > >> > > > according
>> > > > > > > >> > > > > >>> to
>> > > > > > > >> > > > > >>> > some
>> > > > > > > >> > > > > >>> > > semantic requirements. The most extreme
>> may be
>> > > one
>> > > > > > set for
>> > > > > > > >> > each
>> > > > > > > >> > > > > data.
>> > > > > > > >> > > > > >>> > > Assume that the sets of subtask1 and
>> subtask2
>> > > > > > separated by
>> > > > > > > >> > the
>> > > > > > > >> > > > same
>> > > > > > > >> > > > > >>> > > semantics are [a, b, c, a, d] and [a, c, d,
>> > q],
>> > > > > > > >> respectively.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 2. After the above two sets are computed by
>> > > ETL1,
>> > > > > the
>> > > > > > > >> result
>> > > > > > > >> > > data
>> > > > > > > >> > > > > >>> > generated
>> > > > > > > >> > > > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d,
>> 2),
>> > > (q,
>> > > > > > 1)].
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 3. The result data generated in Table 3
>> after
>> > > the
>> > > > > > data in
>> > > > > > > >> > > Table 2
>> > > > > > > >> > > > > is
>> > > > > > > >> > > > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3,
>> 1)]
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > We want to align the data of Table1, Table2
>> > and
>> > > > > > Table3 and
>> > > > > > > >> > > manage
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > >>> > data
>> > > > > > > >> > > > > >>> > > versions. When users execute OLAP/Batch
>> > queries
>> > > > join
>> > > > > > on
>> > > > > > > >> these
>> > > > > > > >> > > > > >>> tables, the
>> > > > > > > >> > > > > >>> > > following consistency data can be found
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q]
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2],
>> [q,
>> > > 1]
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1]
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Users can perform query: SELECT t1.word,
>> > t2.cnt,
>> > > > > > t3.cnt2
>> > > > > > > >> from
>> > > > > > > >> > > > > Table1
>> > > > > > > >> > > > > >>> t1
>> > > > > > > >> > > > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on
>> > t1.word=t2.word
>> > > > and
>> > > > > > > >> > > > > t2.cnt=t3.cnt1;
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > In the view of users, the data is
>> consistent
>> > on
>> > > a
>> > > > > > unified
>> > > > > > > >> > > > "version"
>> > > > > > > >> > > > > >>> > between
>> > > > > > > >> > > > > >>> > > Table1, Table2 and Table3.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > In the current Flink implementation, the
>> > aligned
>> > > > > > > >> checkpoint
>> > > > > > > >> > can
>> > > > > > > >> > > > > >>> achieve
>> > > > > > > >> > > > > >>> > the
>> > > > > > > >> > > > > >>> > > above capabilities (let's ignore the
>> > > segmentation
>> > > > > > > >> semantics
>> > > > > > > >> > of
>> > > > > > > >> > > > > >>> checkpoint
>> > > > > > > >> > > > > >>> > > first). Because the Checkpoint Barrier will
>> > > align
>> > > > > the
>> > > > > > data
>> > > > > > > >> > when
>> > > > > > > >> > > > > >>> > performing
>> > > > > > > >> > > > > >>> > > the global Count aggregation, we can
>> associate
>> > > the
>> > > > > > > >> snapshot
>> > > > > > > >> > > with
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > >>> > > checkpoint in the Table Store, query the
>> > > specified
>> > > > > > > >> snapshot
>> > > > > > > >> > of
>> > > > > > > >> > > > > >>> > > Table1/Table2/Table3 through the
>> checkpoint,
>> > and
>> > > > > > achieve
>> > > > > > > >> the
>> > > > > > > >> > > > > >>> consistency
>> > > > > > > >> > > > > >>> > > requirements of the above unified
>> "version".
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Current watermark mechanism in Flink cannot
>> > > > achieve
>> > > > > > the
>> > > > > > > >> above
>> > > > > > > >> > > > > >>> > consistency.
>> > > > > > > >> > > > > >>> > > For example, we use watermark to divide
>> data
>> > > into
>> > > > > > multiple
>> > > > > > > >> > sets
>> > > > > > > >> > > > in
>> > > > > > > >> > > > > >>> > subtask1
>> > > > > > > >> > > > > >>> > > and subtask2 as followed
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a,
>> > T1),
>> > > > (d,
>> > > > > > T1)],
>> > > > > > > >> > T1,
>> > > > > > > >> > > > [(a,
>> > > > > > > >> > > > > >>> T2),
>> > > > > > > >> > > > > >>> > > (b, T2), (c, T2), (d, T2)], T2
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1),
>> (q,
>> > > T1)],
>> > > > > T1,
>> > > > > > > >> ....
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > As Flink watermark does not have barriers
>> and
>> > > > cannot
>> > > > > > align
>> > > > > > > >> > > data,
>> > > > > > > >> > > > > ETL1
>> > > > > > > >> > > > > >>> > Count
>> > > > > > > >> > > > > >>> > > operator may compute the data of subtask1
>> > first:
>> > > > > [(a,
>> > > > > > T1),
>> > > > > > > >> > (b,
>> > > > > > > >> > > > T1),
>> > > > > > > >> > > > > >>> (c,
>> > > > > > > >> > > > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b,
>> > T2)],
>> > > > then
>> > > > > > > >> compute
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> data of
>> > > > > > > >> > > > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q,
>> > T1)],
>> > > > T1,
>> > > > > > which
>> > > > > > > >> is
>> > > > > > > >> > > not
>> > > > > > > >> > > > > >>> possible
>> > > > > > > >> > > > > >>> > > in aligned checkpoint.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > In this order, the result output to Table2
>> > after
>> > > > the
>> > > > > > Count
>> > > > > > > >> > > > > >>> aggregation
>> > > > > > > >> > > > > >>> > will
>> > > > > > > >> > > > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1),
>> (a, 2,
>> > > > T1),
>> > > > > > (a, 3,
>> > > > > > > >> > T2),
>> > > > > > > >> > > > (b,
>> > > > > > > >> > > > > >>> 2,
>> > > > > > > >> > > > > >>> > T2),
>> > > > > > > >> > > > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1,
>> > T1),
>> > > > > which
>> > > > > > can
>> > > > > > > >> be
>> > > > > > > >> > > > > >>> simplified
>> > > > > > > >> > > > > >>> > as:
>> > > > > > > >> > > > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4,
>> > T1),
>> > > > (c,
>> > > > > > 2,
>> > > > > > > >> T1),
>> > > > > > > >> > > (d,
>> > > > > > > >> > > > 1,
>> > > > > > > >> > > > > >>> T1),
>> > > > > > > >> > > > > >>> > > (q, 1, T1)]
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > There's no (a, 3, T1), we have been unable
>> to
>> > > > query
>> > > > > > > >> > consistent
>> > > > > > > >> > > > data
>> > > > > > > >> > > > > >>> > results
>> > > > > > > >> > > > > >>> > > on Table1 and Table2 according to T1.
>> Table 3
>> > > has
>> > > > > the
>> > > > > > same
>> > > > > > > >> > > > problem.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > In addition to using Checkpoint Barrier,
>> the
>> > > other
>> > > > > > > >> > > implementation
>> > > > > > > >> > > > > >>> > > supporting watermark above is to convert
>> Count
>> > > > > > aggregation
>> > > > > > > >> > into
>> > > > > > > >> > > > > >>> Window
>> > > > > > > >> > > > > >>> > > Count. After the global Count is converted
>> > into
>> > > > > window
>> > > > > > > >> > > operator,
>> > > > > > > >> > > > it
>> > > > > > > >> > > > > >>> needs
>> > > > > > > >> > > > > >>> > > to support cross window data computation.
>> > > Similar
>> > > > to
>> > > > > > the
>> > > > > > > >> data
>> > > > > > > >> > > > > >>> > relationship
>> > > > > > > >> > > > > >>> > > between the previous and the current
>> > Checkpoint,
>> > > > it
>> > > > > is
>> > > > > > > >> > > equivalent
>> > > > > > > >> > > > > to
>> > > > > > > >> > > > > >>> > > introducing the Watermark Barrier, which
>> > > requires
>> > > > > > > >> adjustments
>> > > > > > > >> > > to
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > >>> > > current Flink Watermark mechanism.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Besides the above global aggregation, there
>> > are
>> > > > > window
>> > > > > > > >> > > operators
>> > > > > > > >> > > > in
>> > > > > > > >> > > > > >>> > Flink.
>> > > > > > > >> > > > > >>> > > I don't know if my understanding is
>> correct(I
>> > > > cannot
>> > > > > > see
>> > > > > > > >> the
>> > > > > > > >> > > DAG
>> > > > > > > >> > > > in
>> > > > > > > >> > > > > >>> your
>> > > > > > > >> > > > > >>> > > example), please correct me if it's wrong.
>> I
>> > > think
>> > > > > you
>> > > > > > > >> raise
>> > > > > > > >> > a
>> > > > > > > >> > > > very
>> > > > > > > >> > > > > >>> > > important and interesting question: how to
>> > > define
>> > > > > data
>> > > > > > > >> > > > consistency
>> > > > > > > >> > > > > in
>> > > > > > > >> > > > > >>> > > different window computations which will
>> > > generate
>> > > > > > > >> different
>> > > > > > > >> > > > > >>> timestamps of
>> > > > > > > >> > > > > >>> > > the same data. This situation also occurs
>> when
>> > > > using
>> > > > > > event
>> > > > > > > >> > time
>> > > > > > > >> > > > to
>> > > > > > > >> > > > > >>> align
>> > > > > > > >> > > > > >>> > > data. At present, what I can think of is to
>> > > store
>> > > > > > these
>> > > > > > > >> > > > information
>> > > > > > > >> > > > > >>> in
>> > > > > > > >> > > > > >>> > > Table Store, users can perform filter or
>> join
>> > on
>> > > > > data
>> > > > > > with
>> > > > > > > >> > > them.
>> > > > > > > >> > > > > This
>> > > > > > > >> > > > > >>> > FLIP
>> > > > > > > >> > > > > >>> > > is our first phase, and the specific
>> > > > implementation
>> > > > > of
>> > > > > > > >> this
>> > > > > > > >> > > will
>> > > > > > > >> > > > be
>> > > > > > > >> > > > > >>> > > designed and considered in the next phase
>> and
>> > > > FLIP.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Although the Checkpoint Barrier can achieve
>> > the
>> > > > most
>> > > > > > basic
>> > > > > > > >> > > > > >>> consistency,
>> > > > > > > >> > > > > >>> > as
>> > > > > > > >> > > > > >>> > > you mentioned, using the Checkpoint
>> mechanism
>> > > will
>> > > > > > cause
>> > > > > > > >> many
>> > > > > > > >> > > > > >>> problems,
>> > > > > > > >> > > > > >>> > > including the increase of checkpoint time
>> for
>> > > > > multiple
>> > > > > > > >> > cascade
>> > > > > > > >> > > > > jobs,
>> > > > > > > >> > > > > >>> the
>> > > > > > > >> > > > > >>> > > increase of E2E data freshness time
>> (several
>> > > > minutes
>> > > > > > or
>> > > > > > > >> even
>> > > > > > > >> > > > dozens
>> > > > > > > >> > > > > >>> of
>> > > > > > > >> > > > > >>> > > minutes), and the increase of the overall
>> > system
>> > > > > > > >> complexity.
>> > > > > > > >> > At
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> same
>> > > > > > > >> > > > > >>> > > time, the semantics of Checkpoint data
>> > > > segmentation
>> > > > > is
>> > > > > > > >> > unclear.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > The current FLIP is the first phase of our
>> > whole
>> > > > > > proposal,
>> > > > > > > >> > and
>> > > > > > > >> > > > you
>> > > > > > > >> > > > > >>> can
>> > > > > > > >> > > > > >>> > find
>> > > > > > > >> > > > > >>> > > the follow-up plan in our future worker. In
>> > the
>> > > > > first
>> > > > > > > >> stage,
>> > > > > > > >> > we
>> > > > > > > >> > > > do
>> > > > > > > >> > > > > >>> not
>> > > > > > > >> > > > > >>> > want
>> > > > > > > >> > > > > >>> > > to modify the Flink mechanism. We'd like to
>> > > > realize
>> > > > > > basic
>> > > > > > > >> > > system
>> > > > > > > >> > > > > >>> > functions
>> > > > > > > >> > > > > >>> > > based on existing mechanisms in Flink,
>> > including
>> > > > the
>> > > > > > > >> > > relationship
>> > > > > > > >> > > > > >>> > > management of ETL and tables, and the basic
>> > data
>> > > > > > > >> consistency,
>> > > > > > > >> > > so
>> > > > > > > >> > > > we
>> > > > > > > >> > > > > >>> > choose
>> > > > > > > >> > > > > >>> > > Global Checkpoint in our FLIP.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > We agree with you very much that event
>> time is
>> > > > more
>> > > > > > > >> suitable
>> > > > > > > >> > > for
>> > > > > > > >> > > > > data
>> > > > > > > >> > > > > >>> > > consistency management. We'd like consider
>> > this
>> > > > > > matter in
>> > > > > > > >> the
>> > > > > > > >> > > > > second
>> > > > > > > >> > > > > >>> or
>> > > > > > > >> > > > > >>> > > third stage after the current FLIP. We
>> hope to
>> > > > > > improve the
>> > > > > > > >> > > > > watermark
>> > > > > > > >> > > > > >>> > > mechanism in Flink to support barriers. As
>> you
>> > > > > > mentioned
>> > > > > > > >> in
>> > > > > > > >> > > your
>> > > > > > > >> > > > > >>> reply,
>> > > > > > > >> > > > > >>> > we
>> > > > > > > >> > > > > >>> > > can achieve data consistency based on
>> > timestamp,
>> > > > > while
>> > > > > > > >> > > > maintaining
>> > > > > > > >> > > > > >>> E2E
>> > > > > > > >> > > > > >>> > data
>> > > > > > > >> > > > > >>> > > freshness of seconds or even milliseconds
>> for
>> > > 10+
>> > > > > > cascaded
>> > > > > > > >> > > jobs.
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > What do you think? Thanks
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > Best,
>> > > > > > > >> > > > > >>> > > Shammon
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr
>> Nowojski
>> > <
>> > > > > > > >> > > > > pnowoj...@apache.org>
>> > > > > > > >> > > > > >>> > > wrote:
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> > > > Hi Shammon,
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > Do I understand it correctly, that you
>> > > > effectively
>> > > > > > want
>> > > > > > > >> to
>> > > > > > > >> > > > expand
>> > > > > > > >> > > > > >>> the
>> > > > > > > >> > > > > >>> > > > checkpoint alignment mechanism across
>> many
>> > > > > different
>> > > > > > > >> jobs
>> > > > > > > >> > and
>> > > > > > > >> > > > > hand
>> > > > > > > >> > > > > >>> over
>> > > > > > > >> > > > > >>> > > > checkpoint barriers from upstream to
>> > > downstream
>> > > > > jobs
>> > > > > > > >> using
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> > > intermediate
>> > > > > > > >> > > > > >>> > > > tables?
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > Re the watermarks for the "Rejected
>> > > > > Alternatives". I
>> > > > > > > >> don't
>> > > > > > > >> > > > > >>> understand
>> > > > > > > >> > > > > >>> > why
>> > > > > > > >> > > > > >>> > > > this has been rejected. Could you
>> elaborate
>> > on
>> > > > > this
>> > > > > > > >> point?
>> > > > > > > >> > > Here
>> > > > > > > >> > > > > >>> are a
>> > > > > > > >> > > > > >>> > > > couple of my thoughts on this matter, but
>> > > please
>> > > > > > > >> correct me
>> > > > > > > >> > > if
>> > > > > > > >> > > > > I'm
>> > > > > > > >> > > > > >>> > wrong,
>> > > > > > > >> > > > > >>> > > > as I haven't dived deeper into this
>> topic.
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > > As shown above, there are 2 watermarks
>> T1
>> > > and
>> > > > > T2,
>> > > > > > T1 <
>> > > > > > > >> > T2.
>> > > > > > > >> > > > > >>> > > > > The StreamTask reads data in order:
>> > > > > > > >> > > > > >>> > > >
>> V11,V12,V21,T1(channel1),V13,T1(channel2).
>> > > > > > > >> > > > > >>> > > > > At this time, StreamTask will confirm
>> that
>> > > > > > watermark
>> > > > > > > >> T1
>> > > > > > > >> > is
>> > > > > > > >> > > > > >>> completed,
>> > > > > > > >> > > > > >>> > > > but the data beyond
>> > > > > > > >> > > > > >>> > > > > T1 has been processed(V13) and the
>> results
>> > > are
>> > > > > > > >> written to
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> sink
>> > > > > > > >> > > > > >>> > > > table.
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > 1. I see the same "problem" with
>> unaligned
>> > > > > > checkpoints
>> > > > > > > >> in
>> > > > > > > >> > > your
>> > > > > > > >> > > > > >>> current
>> > > > > > > >> > > > > >>> > > > proposal.
>> > > > > > > >> > > > > >>> > > > 2. I don't understand why this is a
>> problem?
>> > > > Just
>> > > > > > store
>> > > > > > > >> in
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> "sink
>> > > > > > > >> > > > > >>> > > > table" what's the watermark (T1), and
>> > > downstream
>> > > > > > jobs
>> > > > > > > >> > should
>> > > > > > > >> > > > > >>> process
>> > > > > > > >> > > > > >>> > the
>> > > > > > > >> > > > > >>> > > > data with that "watermark" anyway. Record
>> > > "V13"
>> > > > > > should
>> > > > > > > >> be
>> > > > > > > >> > > > treated
>> > > > > > > >> > > > > >>> as
>> > > > > > > >> > > > > >>> > > > "early" data. Downstream jobs if:
>> > > > > > > >> > > > > >>> > > >  a) they are streaming jobs, for example
>> > they
>> > > > > should
>> > > > > > > >> > > aggregate
>> > > > > > > >> > > > it
>> > > > > > > >> > > > > >>> in
>> > > > > > > >> > > > > >>> > > > windowed/temporal state, but they
>> shouldn't
>> > > > > produce
>> > > > > > the
>> > > > > > > >> > > result
>> > > > > > > >> > > > > that
>> > > > > > > >> > > > > >>> > > > contains it, as the watermark T2 was not
>> yet
>> > > > > > processed.
>> > > > > > > >> Or
>> > > > > > > >> > > they
>> > > > > > > >> > > > > >>> would
>> > > > > > > >> > > > > >>> > > just
>> > > > > > > >> > > > > >>> > > > pass that record as "early" data.
>> > > > > > > >> > > > > >>> > > >  b) they are batch jobs, it looks to me
>> like
>> > > > batch
>> > > > > > jobs
>> > > > > > > >> > > > shouldn't
>> > > > > > > >> > > > > >>> take
>> > > > > > > >> > > > > >>> > > > "all available data", but only consider
>> "all
>> > > the
>> > > > > > data
>> > > > > > > >> until
>> > > > > > > >> > > > some
>> > > > > > > >> > > > > >>> > > > watermark", for example the latest
>> > available:
>> > > T1
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > 3. I'm pretty sure there are counter
>> > examples,
>> > > > > where
>> > > > > > > >> your
>> > > > > > > >> > > > > proposed
>> > > > > > > >> > > > > >>> > > > mechanism of using checkpoints (even
>> > aligned!)
>> > > > > will
>> > > > > > > >> produce
>> > > > > > > >> > > > > >>> > > > inconsistent data from the perspective of
>> > the
>> > > > > event
>> > > > > > > >> time.
>> > > > > > > >> > > > > >>> > > >   a) For example what if one of your
>> "ETL"
>> > > jobs,
>> > > > > > has the
>> > > > > > > >> > > > > following
>> > > > > > > >> > > > > >>> DAG:
>> > > > > > > >> > > > > >>> > > > [image: flip276.jpg]
>> > > > > > > >> > > > > >>> > > >   Even if you use aligned checkpoints for
>> > > > > > committing the
>> > > > > > > >> > data
>> > > > > > > >> > > > to
>> > > > > > > >> > > > > >>> the
>> > > > > > > >> > > > > >>> > sink
>> > > > > > > >> > > > > >>> > > > table, the watermarks of "Window1" and
>> > > "Window2"
>> > > > > are
>> > > > > > > >> > > completely
>> > > > > > > >> > > > > >>> > > > independent. The sink table might easily
>> > have
>> > > > data
>> > > > > > from
>> > > > > > > >> the
>> > > > > > > >> > > > > >>> > Src1/Window1
>> > > > > > > >> > > > > >>> > > > from the event time T1 and Src2/Window2
>> from
>> > > > later
>> > > > > > event
>> > > > > > > >> > time
>> > > > > > > >> > > > T2.
>> > > > > > > >> > > > > >>> > > >   b) I think the same applies if you have
>> > two
>> > > > > > completely
>> > > > > > > >> > > > > >>> independent
>> > > > > > > >> > > > > >>> > ETL
>> > > > > > > >> > > > > >>> > > > jobs writing either to the same sink
>> table,
>> > or
>> > > > two
>> > > > > > to
>> > > > > > > >> > > different
>> > > > > > > >> > > > > >>> sink
>> > > > > > > >> > > > > >>> > > tables
>> > > > > > > >> > > > > >>> > > > (that are both later used in the same
>> > > downstream
>> > > > > > job).
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > 4a) I'm not sure if I like the idea of
>> > > > > centralising
>> > > > > > the
>> > > > > > > >> > whole
>> > > > > > > >> > > > > >>> system in
>> > > > > > > >> > > > > >>> > > > this way. If you have 10 jobs, the
>> > likelihood
>> > > of
>> > > > > the
>> > > > > > > >> > > checkpoint
>> > > > > > > >> > > > > >>> failure
>> > > > > > > >> > > > > >>> > > > will be 10 times higher, and/or the
>> duration
>> > > of
>> > > > > the
>> > > > > > > >> > > checkpoint
>> > > > > > > >> > > > > can
>> > > > > > > >> > > > > >>> be
>> > > > > > > >> > > > > >>> > > much
>> > > > > > > >> > > > > >>> > > > much longer (especially under
>> backpressure).
>> > > And
>> > > > > > this is
>> > > > > > > >> > > > actually
>> > > > > > > >> > > > > >>> > > already a
>> > > > > > > >> > > > > >>> > > > limitation of Apache Flink (global
>> > checkpoints
>> > > > are
>> > > > > > more
>> > > > > > > >> > prone
>> > > > > > > >> > > > to
>> > > > > > > >> > > > > >>> fail
>> > > > > > > >> > > > > >>> > the
>> > > > > > > >> > > > > >>> > > > larger the scale), so I would be anxious
>> > about
>> > > > > > making it
>> > > > > > > >> > > > > >>> potentially
>> > > > > > > >> > > > > >>> > > even a
>> > > > > > > >> > > > > >>> > > > larger issue.
>> > > > > > > >> > > > > >>> > > > 4b) I'm also worried about increased
>> > > complexity
>> > > > of
>> > > > > > the
>> > > > > > > >> > system
>> > > > > > > >> > > > > after
>> > > > > > > >> > > > > >>> > > adding
>> > > > > > > >> > > > > >>> > > > the global checkpoint, and additional
>> > > (single?)
>> > > > > > point of
>> > > > > > > >> > > > failure.
>> > > > > > > >> > > > > >>> > > > 5. Such a design would also not work if
>> we
>> > > ever
>> > > > > > wanted
>> > > > > > > >> to
>> > > > > > > >> > > have
>> > > > > > > >> > > > > task
>> > > > > > > >> > > > > >>> > local
>> > > > > > > >> > > > > >>> > > > checkpoints.
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > All in all, it seems to me like actually
>> the
>> > > > > > watermarks
>> > > > > > > >> and
>> > > > > > > >> > > > even
>> > > > > > > >> > > > > >>> time
>> > > > > > > >> > > > > >>> > are
>> > > > > > > >> > > > > >>> > > > the better concept in this context that
>> > should
>> > > > > have
>> > > > > > been
>> > > > > > > >> > used
>> > > > > > > >> > > > for
>> > > > > > > >> > > > > >>> > > > synchronising and data consistency across
>> > the
>> > > > > whole
>> > > > > > > >> system.
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > Best,
>> > > > > > > >> > > > > >>> > > > Piotrek
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY <
>> > > > > > zjur...@gmail.com>
>> > > > > > > >> > > > > >>> napisał(a):
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > >> Hi @Martijn
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> Thanks for your comments, and I'd like
>> to
>> > > reply
>> > > > > to
>> > > > > > them
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> 1. It sounds good to me, I'll update the
>> > > > content
>> > > > > > > >> structure
>> > > > > > > >> > > in
>> > > > > > > >> > > > > FLIP
>> > > > > > > >> > > > > >>> > later
>> > > > > > > >> > > > > >>> > > >> and give the problems first.
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> 2. "Each ETL job creates snapshots with
>> > > > > checkpoint
>> > > > > > > >> info on
>> > > > > > > >> > > > sink
>> > > > > > > >> > > > > >>> tables
>> > > > > > > >> > > > > >>> > > in
>> > > > > > > >> > > > > >>> > > >> Table Store"  -> That reads like you're
>> > > > proposing
>> > > > > > that
>> > > > > > > >> > > > snapshots
>> > > > > > > >> > > > > >>> need
>> > > > > > > >> > > > > >>> > to
>> > > > > > > >> > > > > >>> > > >> be
>> > > > > > > >> > > > > >>> > > >> written to Table Store?
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> Yes. To support the data consistency in
>> the
>> > > > FLIP,
>> > > > > > we
>> > > > > > > >> need
>> > > > > > > >> > to
>> > > > > > > >> > > > get
>> > > > > > > >> > > > > >>> > through
>> > > > > > > >> > > > > >>> > > >> checkpoints in Flink and snapshots in
>> > store,
>> > > > this
>> > > > > > > >> > requires a
>> > > > > > > >> > > > > close
>> > > > > > > >> > > > > >>> > > >> combination of Flink and store
>> > > implementation.
>> > > > In
>> > > > > > the
>> > > > > > > >> > first
>> > > > > > > >> > > > > stage
>> > > > > > > >> > > > > >>> we
>> > > > > > > >> > > > > >>> > > plan
>> > > > > > > >> > > > > >>> > > >> to implement it based on Flink and Table
>> > > Store
>> > > > > > only,
>> > > > > > > >> > > snapshots
>> > > > > > > >> > > > > >>> written
>> > > > > > > >> > > > > >>> > > to
>> > > > > > > >> > > > > >>> > > >> external storage don't support
>> consistency.
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> 3. If you introduce a MetaService, it
>> > becomes
>> > > > the
>> > > > > > > >> single
>> > > > > > > >> > > point
>> > > > > > > >> > > > > of
>> > > > > > > >> > > > > >>> > > failure
>> > > > > > > >> > > > > >>> > > >> because it coordinates everything. But I
>> > > can't
>> > > > > find
>> > > > > > > >> > anything
>> > > > > > > >> > > > in
>> > > > > > > >> > > > > >>> the
>> > > > > > > >> > > > > >>> > FLIP
>> > > > > > > >> > > > > >>> > > >> on
>> > > > > > > >> > > > > >>> > > >> making the MetaService high available or
>> > how
>> > > to
>> > > > > > deal
>> > > > > > > >> with
>> > > > > > > >> > > > > >>> failovers
>> > > > > > > >> > > > > >>> > > there.
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> I think you raise a very important
>> problem
>> > > and
>> > > > I
>> > > > > > > >> missed it
>> > > > > > > >> > > in
>> > > > > > > >> > > > > >>> FLIP.
>> > > > > > > >> > > > > >>> > The
>> > > > > > > >> > > > > >>> > > >> MetaService is a single point and should
>> > > > support
>> > > > > > > >> failover,
>> > > > > > > >> > > we
>> > > > > > > >> > > > > >>> will do
>> > > > > > > >> > > > > >>> > it
>> > > > > > > >> > > > > >>> > > >> in
>> > > > > > > >> > > > > >>> > > >> future in the first stage we only
>> support
>> > > > > > standalone
>> > > > > > > >> mode,
>> > > > > > > >> > > THX
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> 4. The FLIP states under Rejected
>> > > Alternatives
>> > > > > > > >> "Currently
>> > > > > > > >> > > > > >>> watermark in
>> > > > > > > >> > > > > >>> > > >> Flink cannot align data." which is not
>> > true,
>> > > > > given
>> > > > > > that
>> > > > > > > >> > > there
>> > > > > > > >> > > > is
>> > > > > > > >> > > > > >>> > > FLIP-182
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> Watermark alignment in FLIP-182 is
>> > different
>> > > > from
>> > > > > > > >> > > requirements
>> > > > > > > >> > > > > >>> > > "watermark
>> > > > > > > >> > > > > >>> > > >> align data" in our FLIP. FLIP-182 aims
>> to
>> > fix
>> > > > > > watermark
>> > > > > > > >> > > > > >>> generation in
>> > > > > > > >> > > > > >>> > > >> different sources for "slight imbalance
>> or
>> > > data
>> > > > > > skew",
>> > > > > > > >> > which
>> > > > > > > >> > > > > >>> means in
>> > > > > > > >> > > > > >>> > > some
>> > > > > > > >> > > > > >>> > > >> cases the source must generate watermark
>> > even
>> > > > if
>> > > > > > they
>> > > > > > > >> > should
>> > > > > > > >> > > > > not.
>> > > > > > > >> > > > > >>> When
>> > > > > > > >> > > > > >>> > > the
>> > > > > > > >> > > > > >>> > > >> operator collects watermarks, the data
>> > > > processing
>> > > > > > is as
>> > > > > > > >> > > > > described
>> > > > > > > >> > > > > >>> in
>> > > > > > > >> > > > > >>> > our
>> > > > > > > >> > > > > >>> > > >> FLIP, and the data cannot be aligned
>> > through
>> > > > the
>> > > > > > > >> barrier
>> > > > > > > >> > > like
>> > > > > > > >> > > > > >>> > > Checkpoint.
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> 5. Given the MetaService role, it feels
>> > like
>> > > > this
>> > > > > > is
>> > > > > > > >> > > > > introducing a
>> > > > > > > >> > > > > >>> > tight
>> > > > > > > >> > > > > >>> > > >> dependency between Flink and the Table
>> > Store.
>> > > > How
>> > > > > > > >> > pluggable
>> > > > > > > >> > > is
>> > > > > > > >> > > > > >>> this
>> > > > > > > >> > > > > >>> > > >> solution, given the changes that need
>> to be
>> > > > made
>> > > > > to
>> > > > > > > >> Flink
>> > > > > > > >> > in
>> > > > > > > >> > > > > >>> order to
>> > > > > > > >> > > > > >>> > > >> support this?
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> This is a good question, and I will try
>> to
>> > > > expand
>> > > > > > it.
>> > > > > > > >> Most
>> > > > > > > >> > > of
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > >>> work
>> > > > > > > >> > > > > >>> > > >> will
>> > > > > > > >> > > > > >>> > > >> be completed in the Table Store, such as
>> > the
>> > > > new
>> > > > > > > >> > > > SplitEnumerator
>> > > > > > > >> > > > > >>> and
>> > > > > > > >> > > > > >>> > > >> Source
>> > > > > > > >> > > > > >>> > > >> implementation. The changes in Flink
>> are as
>> > > > > > followed:
>> > > > > > > >> > > > > >>> > > >> 1) Flink job should put its job id in
>> > context
>> > > > > when
>> > > > > > > >> > creating
>> > > > > > > >> > > > > >>> > source/sink
>> > > > > > > >> > > > > >>> > > to
>> > > > > > > >> > > > > >>> > > >> help MetaService to create relationship
>> > > between
>> > > > > > source
>> > > > > > > >> and
>> > > > > > > >> > > > sink
>> > > > > > > >> > > > > >>> > tables,
>> > > > > > > >> > > > > >>> > > >> it's tiny
>> > > > > > > >> > > > > >>> > > >> 2) Notify a listener when job is
>> terminated
>> > > in
>> > > > > > Flink,
>> > > > > > > >> and
>> > > > > > > >> > > the
>> > > > > > > >> > > > > >>> listener
>> > > > > > > >> > > > > >>> > > >> implementation in Table Store will send
>> > > "delete
>> > > > > > event"
>> > > > > > > >> to
>> > > > > > > >> > > > > >>> MetaService.
>> > > > > > > >> > > > > >>> > > >> 3) The changes are related to Flink
>> > > Checkpoint
>> > > > > > includes
>> > > > > > > >> > > > > >>> > > >>   a) Support triggering checkpoint with
>> > > > > checkpoint
>> > > > > > id
>> > > > > > > >> by
>> > > > > > > >> > > > > >>> > SplitEnumerator
>> > > > > > > >> > > > > >>> > > >>   b) Create the SplitEnumerator in Table
>> > > Store
>> > > > > > with a
>> > > > > > > >> > > strategy
>> > > > > > > >> > > > > to
>> > > > > > > >> > > > > >>> > > perform
>> > > > > > > >> > > > > >>> > > >> the specific checkpoint when all
>> > > > > > "SplitEnumerator"s in
>> > > > > > > >> the
>> > > > > > > >> > > job
>> > > > > > > >> > > > > >>> manager
>> > > > > > > >> > > > > >>> > > >> trigger it.
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> Best,
>> > > > > > > >> > > > > >>> > > >> Shammon
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn
>> > > Visser <
>> > > > > > > >> > > > > >>> > martijnvis...@apache.org
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > > >> wrote:
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >> > Hi all,
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> > A couple of first comments on this:
>> > > > > > > >> > > > > >>> > > >> > 1. I'm missing the problem statement
>> in
>> > the
>> > > > > > overall
>> > > > > > > >> > > > > >>> introduction. It
>> > > > > > > >> > > > > >>> > > >> > immediately goes into proposal mode, I
>> > > would
>> > > > > > like to
>> > > > > > > >> > first
>> > > > > > > >> > > > > read
>> > > > > > > >> > > > > >>> what
>> > > > > > > >> > > > > >>> > > is
>> > > > > > > >> > > > > >>> > > >> the
>> > > > > > > >> > > > > >>> > > >> > actual problem, before diving into
>> > > solutions.
>> > > > > > > >> > > > > >>> > > >> > 2. "Each ETL job creates snapshots
>> with
>> > > > > > checkpoint
>> > > > > > > >> info
>> > > > > > > >> > on
>> > > > > > > >> > > > > sink
>> > > > > > > >> > > > > >>> > tables
>> > > > > > > >> > > > > >>> > > >> in
>> > > > > > > >> > > > > >>> > > >> > Table Store"  -> That reads like
>> you're
>> > > > > proposing
>> > > > > > > >> that
>> > > > > > > >> > > > > snapshots
>> > > > > > > >> > > > > >>> > need
>> > > > > > > >> > > > > >>> > > >> to be
>> > > > > > > >> > > > > >>> > > >> > written to Table Store?
>> > > > > > > >> > > > > >>> > > >> > 3. If you introduce a MetaService, it
>> > > becomes
>> > > > > the
>> > > > > > > >> single
>> > > > > > > >> > > > point
>> > > > > > > >> > > > > >>> of
>> > > > > > > >> > > > > >>> > > >> failure
>> > > > > > > >> > > > > >>> > > >> > because it coordinates everything.
>> But I
>> > > > can't
>> > > > > > find
>> > > > > > > >> > > anything
>> > > > > > > >> > > > > in
>> > > > > > > >> > > > > >>> the
>> > > > > > > >> > > > > >>> > > >> FLIP on
>> > > > > > > >> > > > > >>> > > >> > making the MetaService high available
>> or
>> > > how
>> > > > to
>> > > > > > deal
>> > > > > > > >> > with
>> > > > > > > >> > > > > >>> failovers
>> > > > > > > >> > > > > >>> > > >> there.
>> > > > > > > >> > > > > >>> > > >> > 4. The FLIP states under Rejected
>> > > > Alternatives
>> > > > > > > >> > "Currently
>> > > > > > > >> > > > > >>> watermark
>> > > > > > > >> > > > > >>> > in
>> > > > > > > >> > > > > >>> > > >> > Flink cannot align data." which is not
>> > > true,
>> > > > > > given
>> > > > > > > >> that
>> > > > > > > >> > > > there
>> > > > > > > >> > > > > is
>> > > > > > > >> > > > > >>> > > >> FLIP-182
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> > 5. Given the MetaService role, it
>> feels
>> > > like
>> > > > > > this is
>> > > > > > > >> > > > > >>> introducing a
>> > > > > > > >> > > > > >>> > > tight
>> > > > > > > >> > > > > >>> > > >> > dependency between Flink and the Table
>> > > Store.
>> > > > > How
>> > > > > > > >> > > pluggable
>> > > > > > > >> > > > is
>> > > > > > > >> > > > > >>> this
>> > > > > > > >> > > > > >>> > > >> > solution, given the changes that need
>> to
>> > be
>> > > > > made
>> > > > > > to
>> > > > > > > >> > Flink
>> > > > > > > >> > > in
>> > > > > > > >> > > > > >>> order
>> > > > > > > >> > > > > >>> > to
>> > > > > > > >> > > > > >>> > > >> > support this?
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> > Best regards,
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> > Martijn
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon
>> > FY <
>> > > > > > > >> > > > zjur...@gmail.com>
>> > > > > > > >> > > > > >>> > wrote:
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >> > > Hi devs:
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > > I'd like to start a discussion about
>> > > > > FLIP-276:
>> > > > > > Data
>> > > > > > > >> > > > > >>> Consistency of
>> > > > > > > >> > > > > >>> > > >> > > Streaming and Batch ETL in Flink and
>> > > Table
>> > > > > > > >> Store[1].
>> > > > > > > >> > In
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> whole
>> > > > > > > >> > > > > >>> > > data
>> > > > > > > >> > > > > >>> > > >> > > stream processing, there are
>> > consistency
>> > > > > > problems
>> > > > > > > >> such
>> > > > > > > >> > > as
>> > > > > > > >> > > > > how
>> > > > > > > >> > > > > >>> to
>> > > > > > > >> > > > > >>> > > >> manage
>> > > > > > > >> > > > > >>> > > >> > the
>> > > > > > > >> > > > > >>> > > >> > > dependencies of multiple jobs and
>> > tables,
>> > > > how
>> > > > > > to
>> > > > > > > >> > define
>> > > > > > > >> > > > and
>> > > > > > > >> > > > > >>> handle
>> > > > > > > >> > > > > >>> > > E2E
>> > > > > > > >> > > > > >>> > > >> > > delays, and how to ensure the data
>> > > > > consistency
>> > > > > > of
>> > > > > > > >> > > queries
>> > > > > > > >> > > > on
>> > > > > > > >> > > > > >>> > flowing
>> > > > > > > >> > > > > >>> > > >> > data?
>> > > > > > > >> > > > > >>> > > >> > > This FLIP aims to support data
>> > > consistency
>> > > > > and
>> > > > > > > >> answer
>> > > > > > > >> > > > these
>> > > > > > > >> > > > > >>> > > questions.
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > > I'v discussed the details of this
>> FLIP
>> > > with
>> > > > > > > >> @Jingsong
>> > > > > > > >> > > Lee
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > >>> > > >> @libenchao
>> > > > > > > >> > > > > >>> > > >> > > offline several times. We hope to
>> > support
>> > > > > data
>> > > > > > > >> > > consistency
>> > > > > > > >> > > > > of
>> > > > > > > >> > > > > >>> > > queries
>> > > > > > > >> > > > > >>> > > >> on
>> > > > > > > >> > > > > >>> > > >> > > tables, managing relationships
>> between
>> > > > Flink
>> > > > > > jobs
>> > > > > > > >> and
>> > > > > > > >> > > > tables
>> > > > > > > >> > > > > >>> and
>> > > > > > > >> > > > > >>> > > >> revising
>> > > > > > > >> > > > > >>> > > >> > > tables on streaming in Flink and
>> Table
>> > > > Store
>> > > > > to
>> > > > > > > >> > improve
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > >>> whole
>> > > > > > > >> > > > > >>> > > data
>> > > > > > > >> > > > > >>> > > >> > > stream processing.
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > > Looking forward to your feedback.
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > > [1]
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> > > Best,
>> > > > > > > >> > > > > >>> > > >> > > Shammon
>> > > > > > > >> > > > > >>> > > >> > >
>> > > > > > > >> > > > > >>> > > >> >
>> > > > > > > >> > > > > >>> > > >>
>> > > > > > > >> > > > > >>> > > >
>> > > > > > > >> > > > > >>> > >
>> > > > > > > >> > > > > >>> >
>> > > > > > > >> > > > > >>>
>> > > > > > > >> > > > > >>
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to