Hi Alexander,

Yes, Kafka’s exactly-once semantics are used to avoid duplicated records in
case of producer retries
or failovers. But as I explained above, it can’t avoid intentionally
duplicated records. Actually, I would
like to call them "upsert records" instead of "duplicates", that's why the
connector is named "upsert-kafka",
to make Kafka work like a database that supports updating and deleting by
key.

For example, there is a SQL query:

SELECT URL, COUNT(*) page_views
FROM access_logs
GROUP BY URL;

This is a continuous query[1] that continuously emits a new <url,
page_views> record once a new URL
access entry is received. The same URLs in the log may be far away and be
processed in different checkpoints.

It's easy to make upsert-kafka to support exactly-once delivery guarantee,
but as we discussed above,
it's unnecessary to support it and we intend to expose as few
configurations to users as possible.


Best,
Jark

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/



On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

> Hi Jark,
>
> To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent
> producers prevent duplicated records[1], at least in the cases when
> upstream does not produce them intentionally and across checkpoints.
>
> Could you please elaborate or point me to the docs that explain the reason
> for duplicated records upstream and across checkpoints? I am relatively new
> to Flink and not aware of it. According to the kafka connector
> documentation, it does support exactly once semantics by configuring '
> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
> can't make upsert-kafka configurable in the same way to support this
> delivery guarantee.
>
> Thank you,
> Alexander
>
> 1.
>
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> 2.
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
>
>
> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Alexander,
> >
> > I’m not sure I fully understand the reasons. I left my comments inline.
> >
> > > 1. There might be other non-Flink topic consumers that would rather not
> > have duplicated records.
> >
> > Exactly once can’t avoid producing duplicated records. Because the
> upstream
> > produces duplicated records intentionally and across checkpoints. Exactly
> > once
> > can’t recognize duplicated records and drop duplications.  That means
> > duplicated
> > records are written into topics even if exactly-once mode is enabled.
> >
> >
> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> > previous values.
> >
> > Sorry, I don’t understand how exactly once can prevent this rollback
> > behavior.
> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and
> > b5,
> > then back a5 if jobs perform checkpoints after producing records.
> >
> >
> > Best,
> > Jark
> >
> >
> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io
> .INVALID>
> > 写道:
> > >
> > > Hello Flink community,
> > >
> > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> > > semantics to upsert-kafka connector. According to upsert-kafka docs[1]
> > and
> > > ReducingUpsertSink javadoc[2], the connector is correct even with
> > duplicate
> > > records under AT_LEAST_ONCE because the records are idempotent, and the
> > > read path de-duplicates them. However, there are at least 2 reasons to
> > > configure the connector with EXACTLY_ONCE:
> > >
> > > 1. There might be other non-Flink topic consumers that would rather not
> > > have duplicated records.
> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> > > previous values. Consider a scenario where 2 producing jobs A and B
> write
> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from
> the
> > > topic. Both producers write unique, monotonically increasing sequences
> to
> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> > sequence:
> > >
> > >   1. Job A produces x=a5.
> > >   2. Job B produces x=b5.
> > >   3. Job A produces the duplicate write x=5.
> > >
> > > The consuming job would observe x going to a5, then to b5, then back
> a5.
> > > EXACTLY_ONCE would prevent this behavior.
> > >
> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP
> > patch
> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> > > community thinks about it before moving forward with it.
> > >
> > > Thanks,
> > > Alexander
> > >
> > > 1.
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> > > 2.
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
> >
> >
>

Reply via email to