Hi John,

Thank you for your valuable input. It sounds reasonable to me.

>From this point of view, the exactly-once is used to guarantee transaction
semantics other than avoid duplication/upserts.
This is similar to the JDBC connectors that already support eventual
consistency with idempotent updates, but we still add the support of 2PC[1].

Best,
Jark

[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink

On Wed, 12 Apr 2023 at 10:36, John Roesler <vvcep...@apache.org> wrote:

> Hi Jark,
>
> I hope you don’t mind if I chime in.
>
> You have a good point that the sequence of upserts will eventually
> converge to the correct value under the at-least-once delivery guarantee,
> but it can still be important to avoid passing on uncommitted results. Some
> thoughts, numbered for reference:
>
> 1. Most generally, if some result R is written to the sink topic, but then
> the job fails before a checkpoint, rolls back, and reprocesses, producing
> R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> system is concerned, R never happened at all (because it was part of a
> rolled-back batch of processing).
>
> 2. Readers may reasonably wish to impose some meaning on the sequence of
> upserts itself, so including aborted results can lead to wrong semantics
> downstream. Eg: “how many times has ‘x’ been updated today”?
>
> 3. Note that processing may not be deterministic over failures, and,
> building on (2), readers may have an expectation that every record in the
> topic corresponds to a real value that was associated with that key at some
> point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> restart and produce x=2. Under at-least-once, the history of x is[1,99,2],
> while exactly-once would give the correct history of [1,2]. If we set up an
> alert if the value of x is ever greater over 10, then at-least-once will
> erroneously alert us, while exactly-once does not.
>
> 4. Sending results for failed processing can also cause operational
> problems: if you’re processing a high volume of data, and you get into a
> crash loop, you can create a flood of repeated results. I’ve seen this case
> cause real world pain for people, and it’s nice to have a way to avoid it.
>
> I hope some of these examples show why a user might reasonably want to
> configure the connector with the exactly-once guarantee.
>
> Thanks!
> -John
>
> On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > Hi Alexander,
> >
> > Yes, Kafka’s exactly-once semantics are used to avoid duplicated records
> in
> > case of producer retries
> > or failovers. But as I explained above, it can’t avoid intentionally
> > duplicated records. Actually, I would
> > like to call them "upsert records" instead of "duplicates", that's why
> the
> > connector is named "upsert-kafka",
> > to make Kafka work like a database that supports updating and deleting by
> > key.
> >
> > For example, there is a SQL query:
> >
> > SELECT URL, COUNT(*) page_views
> > FROM access_logs
> > GROUP BY URL;
> >
> > This is a continuous query[1] that continuously emits a new <url,
> > page_views> record once a new URL
> > access entry is received. The same URLs in the log may be far away and be
> > processed in different checkpoints.
> >
> > It's easy to make upsert-kafka to support exactly-once delivery
> guarantee,
> > but as we discussed above,
> > it's unnecessary to support it and we intend to expose as few
> > configurations to users as possible.
> >
> >
> > Best,
> > Jark
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> >
> >
> >
> > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> > <asorokou...@confluent.io.invalid> wrote:
> >
> >> Hi Jark,
> >>
> >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with
> idempotent
> >> producers prevent duplicated records[1], at least in the cases when
> >> upstream does not produce them intentionally and across checkpoints.
> >>
> >> Could you please elaborate or point me to the docs that explain the
> reason
> >> for duplicated records upstream and across checkpoints? I am relatively
> new
> >> to Flink and not aware of it. According to the kafka connector
> >> documentation, it does support exactly once semantics by configuring '
> >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
> >> can't make upsert-kafka configurable in the same way to support this
> >> delivery guarantee.
> >>
> >> Thank you,
> >> Alexander
> >>
> >> 1.
> >>
> >>
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> >> 2.
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
> >>
> >>
> >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote:
> >>
> >> > Hi Alexander,
> >> >
> >> > I’m not sure I fully understand the reasons. I left my comments
> inline.
> >> >
> >> > > 1. There might be other non-Flink topic consumers that would rather
> not
> >> > have duplicated records.
> >> >
> >> > Exactly once can’t avoid producing duplicated records. Because the
> >> upstream
> >> > produces duplicated records intentionally and across checkpoints.
> Exactly
> >> > once
> >> > can’t recognize duplicated records and drop duplications.  That means
> >> > duplicated
> >> > records are written into topics even if exactly-once mode is enabled.
> >> >
> >> >
> >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> >> > previous values.
> >> >
> >> > Sorry, I don’t understand how exactly once can prevent this rollback
> >> > behavior.
> >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5,
> and
> >> > b5,
> >> > then back a5 if jobs perform checkpoints after producing records.
> >> >
> >> >
> >> > Best,
> >> > Jark
> >> >
> >> >
> >> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io
> >> .INVALID>
> >> > 写道:
> >> > >
> >> > > Hello Flink community,
> >> > >
> >> > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> >> > > semantics to upsert-kafka connector. According to upsert-kafka
> docs[1]
> >> > and
> >> > > ReducingUpsertSink javadoc[2], the connector is correct even with
> >> > duplicate
> >> > > records under AT_LEAST_ONCE because the records are idempotent, and
> the
> >> > > read path de-duplicates them. However, there are at least 2 reasons
> to
> >> > > configure the connector with EXACTLY_ONCE:
> >> > >
> >> > > 1. There might be other non-Flink topic consumers that would rather
> not
> >> > > have duplicated records.
> >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> >> > > previous values. Consider a scenario where 2 producing jobs A and B
> >> write
> >> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from
> >> the
> >> > > topic. Both producers write unique, monotonically increasing
> sequences
> >> to
> >> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> >> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> >> > sequence:
> >> > >
> >> > >   1. Job A produces x=a5.
> >> > >   2. Job B produces x=b5.
> >> > >   3. Job A produces the duplicate write x=5.
> >> > >
> >> > > The consuming job would observe x going to a5, then to b5, then back
> >> a5.
> >> > > EXACTLY_ONCE would prevent this behavior.
> >> > >
> >> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a
> WIP
> >> > patch
> >> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> >> > > community thinks about it before moving forward with it.
> >> > >
> >> > > Thanks,
> >> > > Alexander
> >> > >
> >> > > 1.
> >> > >
> >> >
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> >> > > 2.
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
> >> >
> >> >
> >>
>

Reply via email to