Hi Hongshun,

1. In case of unsupported API, the kafka-client library throws an error,
which will failover in Flink. We could make the error more user friendly
but I think fail over is correct instead of silently switching to an
undesired strategy. David already asked if we could check that during
validation of the plan, but imho it's too situational as you may not have
access to the Kafka cluster when you build the plan. I'm worried about
false positives here but I'm open to suggestions.

2. This FLIP is about the cleanup period and is not touching the write path
at all.
Transactional commits happen on notifyCheckpointComplete where they block
the task thread in case where committer and writer are chained (common
case). Have you seen an impact on performance?
I'm also not exactly sure where your concurrency is coming from. Could you
elaborate? I'm assuming you see more producers open because
notifyCheckpointComplete may occur after having multiple snapshotState. But
I don't see how to improve the situation except foregoing exactly once and
setting it at least once.
Checkpointing interval is an orthogonal configuration. I don't see how
anything on the connector side can change that. That sounds like a radical
new idea and deserves its own FLIP. If I get you correctly, you want to
decline/delay new checkpoints until old once are committed such that you
have no more than X ongoing transactions. That sounds very useful but it's
about automatically configuring the lowest possible checkpointing interval
that satisfies certain conditions. This is way outside of the scope of
connectors because it requires deep changes in the checkpoint coordinator
(only).

3. Yes, we will keep the list of returned transactions as small as
possible. The current call chain is
- The Flink sink may be configured to write into a (set of) static topics
or with a pattern target topic (target patterns are supported in OSS Flink
only). In the latter case, run ListTopics to collect the list of topics.
- Run DescribeProducer with the list of topics to get a list of producer
ids.
- Run ListTransaction API with the list of producer ids.
- Remove all transactions that are closed (ideally filter those with
request options already).
- Remove all transactions that are not owned by the subtask s by extracting
the original subtask id from the transactional id s’ and applying the
predicate s == s’ % p where p is the current parallelism.
- Remove all transactions that are part of the writer state (finalized
transactions).
- Abort all of these transactions by calling initTransaction.
We are currently working with Kafka devs to replace the first three calls
with a single call to ListTransaction with a new parameter that filters on
prefix. But that is available earliest in Kafka 4.1, so we need the current
approach anyhow.

Note that the FLIP is already approved, so I can't change anything on it
anymore. Both 1. and 3. can be discussed on the pull request however [1].
2. is clearly out of scope.

Best,

Arvid

[1] https://github.com/apache/flink-connector-kafka/pull/154


On Mon, Apr 7, 2025 at 4:05 AM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Heise,
>
> This sounds like a good idea. Just some tiny questions left:
>
>    1.
>
>    Behavior when ListTransactions is unsupported:
>    If the server does not support the ListTransactions API, what should the
>    system’s behavior be?
>    - Should it throw an exception to signal incompatibility?
>       - Or fallback to the legacy Kafka connector behavior (e.g.,
>       checkpointing without transactional checks)?
>       Clear guidance here would help users and implementers understand
>       pitfall recovery.
>    2.
>
>    Impact of frequent checkpoints on write performance:
>    When the checkpoint interval is small (e.g., configured to occur very
>    frequently), could this lead to write performance bottlenecks if
>    multiple producers (e.g., 3 producers) block the write pipeline?
>    Could this FLIP include a tunable configuration parameter to let users
>    adjust the checkpoint interval or producers’ concurrency limits if
> needed?
>    3.
>
>    Optimizing transaction lookup with producerIdFilters:
>    The proposal currently states: *"The writer uses ListTransactions to
>    receive a list of all open transactions."*
>    Would it be feasible to instead filter transactions using
>    producerIdFilters (if supported) to retrieve only those transactions
>    relevant to this writer? Doing so could reduce unnecessary overhead,
>    especially in environments with many transactions.
>
>
> I will be grateful if you can answer me and add these details to FLIP.
>
> Best
> Hongshun
>
> On Thu, Mar 27, 2025 at 1:25 AM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi David,
> >
> > Thank you for taking your time to go over the proposal.
> >
> > >  *   Can we check that we have the appropriate permissions and the
> > appropriate Kafka level(at least API key 66)  to validate this option.
> > That's probably possible but we can do that only during runtime (we
> > may not have access to the Kafka cluster during pipeline building).
> > Did you see the pattern being used in other connectors? Could you
> > point me to it?
> > So the alternative is to just let the AdminClient fail with a
> > respective error since it performs these checks under the hood
> > anyways.
> >
> > >  *   I am wondering what we do for the 2 errors that the Kafka API can
> > give namely
> > Could you point me to that? I haven't seen the errors yet. Is this
> > relevant only during the time a Kafka cluster starts? How often does
> > that happen?
> > If it's really rare, a simple alternative is to rely on Flink failover
> > because transaction abortion happens only during start of the Flink
> > application, so a failover is comparably cheap.
> >
> > > *   Do we need to do anything special for find-hanging transactions?
> > Yes, that's what this whole proposal and FLIP is about. We look for
> > all transactions with the respective transactional id prefix that are
> > open, ignore transactions that still should be committed, and abort
> > them all.
> >
> > >   *   Reusing transaction ids could be confusing in debugging.  I
> wonder
> > if we should not eagerly reuse transaction ids so the debugging would be
> > more straight forward. Better still cpi;d be use a uuid as part of the
> > identifier; this would seem more normal practise rather than managing an
> > incrementing counter and filling the gaps as they become available.
> > Please refer to FLINK-34554 [1] on why this is simply necessary. It's
> > an unfortunate design decision in Kafka server such that you should
> > not have many unique transactional ids.
> > We can use uuids internally to ease debugging but we can't create
> > transactional ids based on that. (We internally use UUIDs for the
> > prefix at Confluent, maybe that's what you mean).
> > Alternatively, we attach the current checkpoint id to the log messages
> > such that the pair transaction id/checkpoint id is unique since a
> > transaction id is reused for a later checkpoint.
> >
> > >   *   I am not very familiar with the KIP - I am unsure what you mean
> by
> > “ it also knows which transactions still need to be re-committed. It
> aborts
> > all other open transactions.”. When/why do we need to re-commit
> > transactions and when do we abort?
> > To which KIP do you refer?
> > Flink uses a 2PC to guarantee exactly once. On pre-commit, we finalize
> > a transaction and put its state into the checkpoint. If all tasks
> > succeed checkpointing, notifyCheckpointCompleted is called and we
> > actually commit. Between pre-commit and commit, a failure may happen
> > such that we need to re-commit all transactions on recovery that are
> > part of the checkpoint because for consistency they are supposed to be
> > committed.
> >
> > >   *   I wonder why “we expect 3 transactional ids to be in use per
> > subtask”.
> > Per checkpoint and subtask, we have one transaction ongoing for the
> > writer at all times. We also have one transaction that is being
> > finished by the previous checkpoint and that awaits the
> > notifyCheckpointCompleted. Lastly on recovery, we also have an id from
> > the previous run that we can't yet reuse. In many cases, we probably
> > re-commit the recovered id before we finish another checkpoint, so
> > it's rather 2 ids per subtask.
> >
> > >  *   I wonder why “There is at least one finalized transaction waiting
> > to be committed” Do you mean the transaction is in state CompleteCommit?
> > Can we not be the first transaction so have no older ones – or have I
> > misunderstood?
> > See above. It's unrelated to internal Kafka states - they are all
> Ongoing.
> > You are right that for the very first attempt, we have no such
> > transaction until the first checkpoint is snapshotted.
> >
> > Best,
> >
> > Arvid
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-34554
> >
> > On Wed, Mar 26, 2025 at 5:28 PM David Radley <david_rad...@uk.ibm.com>
> > wrote:
> > >
> > > Hi Arvid,
> > > This sounds like a good idea. I wondered:
> > >
> > >   *   Can we check that we have the appropriate permissions and the
> > appropriate Kafka level(at least API key 66)  to validate this option.
> > >   *   I am wondering what we do for the 2 errors that the Kafka API can
> > give namely
> > >      *   COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the
> process
> > of loading its state.
> > >      *   COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the
> > request is being shutdown.=
> > >
> > > In the case of COORDINATOR_LOAD_IN_PROGRESS I guess we could have some
> > sort of retry loop – governed by a configuration parameter.
> > >
> > >   *   Do we need to do anything special for find-hanging transactions?
> > >   *   Reusing transaction ids could be confusing in debugging.  I
> wonder
> > if we should not eagerly reuse transaction ids so the debugging would be
> > more straight forward. Better still cpi;d be use a uuid as part of the
> > identifier; this would seem more normal practise rather than managing an
> > incrementing counter and filling the gaps as they become available.
> > >   *   I am not very familiar with the KIP - I am unsure what you mean
> by
> > “ it also knows which transactions still need to be re-committed. It
> aborts
> > all other open transactions.”. When/why do we need to re-commit
> > transactions and when do we abort?
> > >   *   I wonder why “we expect 3 transactional ids to be in use per
> > subtask”.
> > >   *   I wonder why “There is at least one finalized transaction waiting
> > to be committed” Do you mean the transaction is in state CompleteCommit?
> > Can we not be the first transaction so have no older ones – or have I
> > misunderstood?
> > >
> > > Kind regards, David.
> > >
> > >
> > > From: Arvid Heise <ahe...@confluent.io.INVALID>
> > > Date: Wednesday, 26 March 2025 at 06:49
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-511: Support transaction id
> > pooling in Kafka connector
> > > Hi Hongshun,
> > >
> > > this thread is about addressing your first pain point. It would be
> > > awesome if you could read the FLIP and provide feedback whether this
> > > approach would work in your setup.
> > >
> > > The other issues won't need a FLIP but need a smaller bugfix.
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > On Tue, Mar 25, 2025 at 7:57 AM Hongshun Wang <loserwang1...@gmail.com
> >
> > wrote:
> > > >
> > > > Hi heise,
> > > > Thanks for pushing this job. Currently, kafka sinks in transactions
> > are not
> > > > productive, which will influence the whole kafka cluster.
> > > > As we discussed before, I have three problems when using Kafka
> > > >  exactly-once sink :
> > > > 1. Too much transaction id
> > https://issues.apache.org/jira/browse/FLINK-34554
> > > > 2. Too much unclosed producer ( if commit is lag too much from
> > pre-commit:
> > > > https://issues.apache.org/jira/browse/FLINK-36569
> > > > 3. maybe Invalid state:
> > https://issues.apache.org/jira/browse/FLINK-37356
> > > >
> > > > I do hope this flip can show how to solve this problem.
> > > >
> > > > Best
> > > > Hongshun Wang
> > > >
> > > >
> > > >
> > > > On Fri, Feb 28, 2025 at 10:01 PM Arvid Heise <ar...@apache.org>
> wrote:
> > > >
> > > > > Dear Flink devs,
> > > > >
> > > > > I'd like to start discussion around an improvement of the
> > exactly-once
> > > > > Kafka sink. Because of its current design, the sink currently puts
> a
> > > > > large strain on the main memory of the Kafka broker [1], which
> > hinders
> > > > > adoption of the KafkaSink. Since the KafkaSink will be the only way
> > to
> > > > > produce into Kafka for Flink 2.0, this limitation may in turn also
> > > > > limit Flink 2 adoption.
> > > > >
> > > > > I appreciate feedback not only on technical aspects but also on the
> > > > > requirements of the new approach [2]:
> > > > > - Are most users/customers already on Kafka 3.0 broker?
> > > > > - The new approach requires READ permission on the target topic
> (for
> > > > > DescribeProducer API) and DESCRIBE permission on the transaction
> (for
> > > > > ListTransaction API). Are there any concerns around this?
> > > > >
> > > > > The new behavior would be opt-in in the first phase while the
> current
> > > > > behavior remains the DEFAULT. If the community accepts the new
> > > > > behavior, we can turn it on by default and provide an option to
> > > > > opt-out.
> > > > >
> > > > > Note that this improvement is borderline FLIP-worthy; it touches
> the
> > > > > public interfaces of the connector only. However, by doing an
> > official
> > > > > FLIP, I'm positive that we can gather feedback more quickly.
> > > > >
> > > > > I'm also looking for volunteers that test the implementation on
> their
> > > > > dev clusters [3].
> > > > >
> > > > > Best,
> > > > >
> > > > > Arvid
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-34554
> > > > > [2]
> > > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector
> > > > > [3] https://github.com/apache/flink-connector-kafka/pull/154
> > > > >
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> > Winchester, Hampshire SO21 2JN
> >
>

Reply via email to