Hi David,

Thank you for taking your time to go over the proposal.

>  *   Can we check that we have the appropriate permissions and the 
> appropriate Kafka level(at least API key 66)  to validate this option.
That's probably possible but we can do that only during runtime (we
may not have access to the Kafka cluster during pipeline building).
Did you see the pattern being used in other connectors? Could you
point me to it?
So the alternative is to just let the AdminClient fail with a
respective error since it performs these checks under the hood
anyways.

>  *   I am wondering what we do for the 2 errors that the Kafka API can give 
> namely
Could you point me to that? I haven't seen the errors yet. Is this
relevant only during the time a Kafka cluster starts? How often does
that happen?
If it's really rare, a simple alternative is to rely on Flink failover
because transaction abortion happens only during start of the Flink
application, so a failover is comparably cheap.

> *   Do we need to do anything special for find-hanging transactions?
Yes, that's what this whole proposal and FLIP is about. We look for
all transactions with the respective transactional id prefix that are
open, ignore transactions that still should be committed, and abort
them all.

>   *   Reusing transaction ids could be confusing in debugging.  I wonder if 
> we should not eagerly reuse transaction ids so the debugging would be more 
> straight forward. Better still cpi;d be use a uuid as part of the identifier; 
> this would seem more normal practise rather than managing an incrementing 
> counter and filling the gaps as they become available.
Please refer to FLINK-34554 [1] on why this is simply necessary. It's
an unfortunate design decision in Kafka server such that you should
not have many unique transactional ids.
We can use uuids internally to ease debugging but we can't create
transactional ids based on that. (We internally use UUIDs for the
prefix at Confluent, maybe that's what you mean).
Alternatively, we attach the current checkpoint id to the log messages
such that the pair transaction id/checkpoint id is unique since a
transaction id is reused for a later checkpoint.

>   *   I am not very familiar with the KIP - I am unsure what you mean by “ it 
> also knows which transactions still need to be re-committed. It aborts all 
> other open transactions.”. When/why do we need to re-commit transactions and 
> when do we abort?
To which KIP do you refer?
Flink uses a 2PC to guarantee exactly once. On pre-commit, we finalize
a transaction and put its state into the checkpoint. If all tasks
succeed checkpointing, notifyCheckpointCompleted is called and we
actually commit. Between pre-commit and commit, a failure may happen
such that we need to re-commit all transactions on recovery that are
part of the checkpoint because for consistency they are supposed to be
committed.

>   *   I wonder why “we expect 3 transactional ids to be in use per subtask”.
Per checkpoint and subtask, we have one transaction ongoing for the
writer at all times. We also have one transaction that is being
finished by the previous checkpoint and that awaits the
notifyCheckpointCompleted. Lastly on recovery, we also have an id from
the previous run that we can't yet reuse. In many cases, we probably
re-commit the recovered id before we finish another checkpoint, so
it's rather 2 ids per subtask.

>  *   I wonder why “There is at least one finalized transaction waiting to be 
> committed” Do you mean the transaction is in state CompleteCommit? Can we not 
> be the first transaction so have no older ones – or have I misunderstood?
See above. It's unrelated to internal Kafka states - they are all Ongoing.
You are right that for the very first attempt, we have no such
transaction until the first checkpoint is snapshotted.

Best,

Arvid

[1] https://issues.apache.org/jira/browse/FLINK-34554

On Wed, Mar 26, 2025 at 5:28 PM David Radley <david_rad...@uk.ibm.com> wrote:
>
> Hi Arvid,
> This sounds like a good idea. I wondered:
>
>   *   Can we check that we have the appropriate permissions and the 
> appropriate Kafka level(at least API key 66)  to validate this option.
>   *   I am wondering what we do for the 2 errors that the Kafka API can give 
> namely
>      *   COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the process of 
> loading its state.
>      *   COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the request 
> is being shutdown.=
>
> In the case of COORDINATOR_LOAD_IN_PROGRESS I guess we could have some sort 
> of retry loop – governed by a configuration parameter.
>
>   *   Do we need to do anything special for find-hanging transactions?
>   *   Reusing transaction ids could be confusing in debugging.  I wonder if 
> we should not eagerly reuse transaction ids so the debugging would be more 
> straight forward. Better still cpi;d be use a uuid as part of the identifier; 
> this would seem more normal practise rather than managing an incrementing 
> counter and filling the gaps as they become available.
>   *   I am not very familiar with the KIP - I am unsure what you mean by “ it 
> also knows which transactions still need to be re-committed. It aborts all 
> other open transactions.”. When/why do we need to re-commit transactions and 
> when do we abort?
>   *   I wonder why “we expect 3 transactional ids to be in use per subtask”.
>   *   I wonder why “There is at least one finalized transaction waiting to be 
> committed” Do you mean the transaction is in state CompleteCommit? Can we not 
> be the first transaction so have no older ones – or have I misunderstood?
>
> Kind regards, David.
>
>
> From: Arvid Heise <ahe...@confluent.io.INVALID>
> Date: Wednesday, 26 March 2025 at 06:49
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-511: Support transaction id pooling in 
> Kafka connector
> Hi Hongshun,
>
> this thread is about addressing your first pain point. It would be
> awesome if you could read the FLIP and provide feedback whether this
> approach would work in your setup.
>
> The other issues won't need a FLIP but need a smaller bugfix.
>
> Best,
>
> Arvid
>
> On Tue, Mar 25, 2025 at 7:57 AM Hongshun Wang <loserwang1...@gmail.com> wrote:
> >
> > Hi heise,
> > Thanks for pushing this job. Currently, kafka sinks in transactions are not
> > productive, which will influence the whole kafka cluster.
> > As we discussed before, I have three problems when using Kafka
> >  exactly-once sink :
> > 1. Too much transaction id https://issues.apache.org/jira/browse/FLINK-34554
> > 2. Too much unclosed producer ( if commit is lag too much from pre-commit:
> > https://issues.apache.org/jira/browse/FLINK-36569
> > 3. maybe Invalid state: https://issues.apache.org/jira/browse/FLINK-37356
> >
> > I do hope this flip can show how to solve this problem.
> >
> > Best
> > Hongshun Wang
> >
> >
> >
> > On Fri, Feb 28, 2025 at 10:01 PM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Dear Flink devs,
> > >
> > > I'd like to start discussion around an improvement of the exactly-once
> > > Kafka sink. Because of its current design, the sink currently puts a
> > > large strain on the main memory of the Kafka broker [1], which hinders
> > > adoption of the KafkaSink. Since the KafkaSink will be the only way to
> > > produce into Kafka for Flink 2.0, this limitation may in turn also
> > > limit Flink 2 adoption.
> > >
> > > I appreciate feedback not only on technical aspects but also on the
> > > requirements of the new approach [2]:
> > > - Are most users/customers already on Kafka 3.0 broker?
> > > - The new approach requires READ permission on the target topic (for
> > > DescribeProducer API) and DESCRIBE permission on the transaction (for
> > > ListTransaction API). Are there any concerns around this?
> > >
> > > The new behavior would be opt-in in the first phase while the current
> > > behavior remains the DEFAULT. If the community accepts the new
> > > behavior, we can turn it on by default and provide an option to
> > > opt-out.
> > >
> > > Note that this improvement is borderline FLIP-worthy; it touches the
> > > public interfaces of the connector only. However, by doing an official
> > > FLIP, I'm positive that we can gather feedback more quickly.
> > >
> > > I'm also looking for volunteers that test the implementation on their
> > > dev clusters [3].
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-34554
> > > [2]
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector
> > > [3] https://github.com/apache/flink-connector-kafka/pull/154
> > >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
> Winchester, Hampshire SO21 2JN

Reply via email to