Hi Heise,
I agree with you. Let us push forward. I will review
https://github.com/apache/flink-connector-kafka/pull/154 and discuss it. I
will reply for some time because there seems to be too much code in this PR.

Best
Hongshun


On Mon, Apr 7, 2025 at 3:12 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Hongshun,
>
> 1. In case of unsupported API, the kafka-client library throws an error,
> which will failover in Flink. We could make the error more user friendly
> but I think fail over is correct instead of silently switching to an
> undesired strategy. David already asked if we could check that during
> validation of the plan, but imho it's too situational as you may not have
> access to the Kafka cluster when you build the plan. I'm worried about
> false positives here but I'm open to suggestions.
>
> 2. This FLIP is about the cleanup period and is not touching the write path
> at all.
> Transactional commits happen on notifyCheckpointComplete where they block
> the task thread in case where committer and writer are chained (common
> case). Have you seen an impact on performance?
> I'm also not exactly sure where your concurrency is coming from. Could you
> elaborate? I'm assuming you see more producers open because
> notifyCheckpointComplete may occur after having multiple snapshotState. But
> I don't see how to improve the situation except foregoing exactly once and
> setting it at least once.
> Checkpointing interval is an orthogonal configuration. I don't see how
> anything on the connector side can change that. That sounds like a radical
> new idea and deserves its own FLIP. If I get you correctly, you want to
> decline/delay new checkpoints until old once are committed such that you
> have no more than X ongoing transactions. That sounds very useful but it's
> about automatically configuring the lowest possible checkpointing interval
> that satisfies certain conditions. This is way outside of the scope of
> connectors because it requires deep changes in the checkpoint coordinator
> (only).
>
> 3. Yes, we will keep the list of returned transactions as small as
> possible. The current call chain is
> - The Flink sink may be configured to write into a (set of) static topics
> or with a pattern target topic (target patterns are supported in OSS Flink
> only). In the latter case, run ListTopics to collect the list of topics.
> - Run DescribeProducer with the list of topics to get a list of producer
> ids.
> - Run ListTransaction API with the list of producer ids.
> - Remove all transactions that are closed (ideally filter those with
> request options already).
> - Remove all transactions that are not owned by the subtask s by extracting
> the original subtask id from the transactional id s’ and applying the
> predicate s == s’ % p where p is the current parallelism.
> - Remove all transactions that are part of the writer state (finalized
> transactions).
> - Abort all of these transactions by calling initTransaction.
> We are currently working with Kafka devs to replace the first three calls
> with a single call to ListTransaction with a new parameter that filters on
> prefix. But that is available earliest in Kafka 4.1, so we need the current
> approach anyhow.
>
> Note that the FLIP is already approved, so I can't change anything on it
> anymore. Both 1. and 3. can be discussed on the pull request however [1].
> 2. is clearly out of scope.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink-connector-kafka/pull/154
>
>
> On Mon, Apr 7, 2025 at 4:05 AM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi Heise,
> >
> > This sounds like a good idea. Just some tiny questions left:
> >
> >    1.
> >
> >    Behavior when ListTransactions is unsupported:
> >    If the server does not support the ListTransactions API, what should
> the
> >    system’s behavior be?
> >    - Should it throw an exception to signal incompatibility?
> >       - Or fallback to the legacy Kafka connector behavior (e.g.,
> >       checkpointing without transactional checks)?
> >       Clear guidance here would help users and implementers understand
> >       pitfall recovery.
> >    2.
> >
> >    Impact of frequent checkpoints on write performance:
> >    When the checkpoint interval is small (e.g., configured to occur very
> >    frequently), could this lead to write performance bottlenecks if
> >    multiple producers (e.g., 3 producers) block the write pipeline?
> >    Could this FLIP include a tunable configuration parameter to let users
> >    adjust the checkpoint interval or producers’ concurrency limits if
> > needed?
> >    3.
> >
> >    Optimizing transaction lookup with producerIdFilters:
> >    The proposal currently states: *"The writer uses ListTransactions to
> >    receive a list of all open transactions."*
> >    Would it be feasible to instead filter transactions using
> >    producerIdFilters (if supported) to retrieve only those transactions
> >    relevant to this writer? Doing so could reduce unnecessary overhead,
> >    especially in environments with many transactions.
> >
> >
> > I will be grateful if you can answer me and add these details to FLIP.
> >
> > Best
> > Hongshun
> >
> > On Thu, Mar 27, 2025 at 1:25 AM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Hi David,
> > >
> > > Thank you for taking your time to go over the proposal.
> > >
> > > >  *   Can we check that we have the appropriate permissions and the
> > > appropriate Kafka level(at least API key 66)  to validate this option.
> > > That's probably possible but we can do that only during runtime (we
> > > may not have access to the Kafka cluster during pipeline building).
> > > Did you see the pattern being used in other connectors? Could you
> > > point me to it?
> > > So the alternative is to just let the AdminClient fail with a
> > > respective error since it performs these checks under the hood
> > > anyways.
> > >
> > > >  *   I am wondering what we do for the 2 errors that the Kafka API
> can
> > > give namely
> > > Could you point me to that? I haven't seen the errors yet. Is this
> > > relevant only during the time a Kafka cluster starts? How often does
> > > that happen?
> > > If it's really rare, a simple alternative is to rely on Flink failover
> > > because transaction abortion happens only during start of the Flink
> > > application, so a failover is comparably cheap.
> > >
> > > > *   Do we need to do anything special for find-hanging transactions?
> > > Yes, that's what this whole proposal and FLIP is about. We look for
> > > all transactions with the respective transactional id prefix that are
> > > open, ignore transactions that still should be committed, and abort
> > > them all.
> > >
> > > >   *   Reusing transaction ids could be confusing in debugging.  I
> > wonder
> > > if we should not eagerly reuse transaction ids so the debugging would
> be
> > > more straight forward. Better still cpi;d be use a uuid as part of the
> > > identifier; this would seem more normal practise rather than managing
> an
> > > incrementing counter and filling the gaps as they become available.
> > > Please refer to FLINK-34554 [1] on why this is simply necessary. It's
> > > an unfortunate design decision in Kafka server such that you should
> > > not have many unique transactional ids.
> > > We can use uuids internally to ease debugging but we can't create
> > > transactional ids based on that. (We internally use UUIDs for the
> > > prefix at Confluent, maybe that's what you mean).
> > > Alternatively, we attach the current checkpoint id to the log messages
> > > such that the pair transaction id/checkpoint id is unique since a
> > > transaction id is reused for a later checkpoint.
> > >
> > > >   *   I am not very familiar with the KIP - I am unsure what you mean
> > by
> > > “ it also knows which transactions still need to be re-committed. It
> > aborts
> > > all other open transactions.”. When/why do we need to re-commit
> > > transactions and when do we abort?
> > > To which KIP do you refer?
> > > Flink uses a 2PC to guarantee exactly once. On pre-commit, we finalize
> > > a transaction and put its state into the checkpoint. If all tasks
> > > succeed checkpointing, notifyCheckpointCompleted is called and we
> > > actually commit. Between pre-commit and commit, a failure may happen
> > > such that we need to re-commit all transactions on recovery that are
> > > part of the checkpoint because for consistency they are supposed to be
> > > committed.
> > >
> > > >   *   I wonder why “we expect 3 transactional ids to be in use per
> > > subtask”.
> > > Per checkpoint and subtask, we have one transaction ongoing for the
> > > writer at all times. We also have one transaction that is being
> > > finished by the previous checkpoint and that awaits the
> > > notifyCheckpointCompleted. Lastly on recovery, we also have an id from
> > > the previous run that we can't yet reuse. In many cases, we probably
> > > re-commit the recovered id before we finish another checkpoint, so
> > > it's rather 2 ids per subtask.
> > >
> > > >  *   I wonder why “There is at least one finalized transaction
> waiting
> > > to be committed” Do you mean the transaction is in state
> CompleteCommit?
> > > Can we not be the first transaction so have no older ones – or have I
> > > misunderstood?
> > > See above. It's unrelated to internal Kafka states - they are all
> > Ongoing.
> > > You are right that for the very first attempt, we have no such
> > > transaction until the first checkpoint is snapshotted.
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-34554
> > >
> > > On Wed, Mar 26, 2025 at 5:28 PM David Radley <david_rad...@uk.ibm.com>
> > > wrote:
> > > >
> > > > Hi Arvid,
> > > > This sounds like a good idea. I wondered:
> > > >
> > > >   *   Can we check that we have the appropriate permissions and the
> > > appropriate Kafka level(at least API key 66)  to validate this option.
> > > >   *   I am wondering what we do for the 2 errors that the Kafka API
> can
> > > give namely
> > > >      *   COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the
> > process
> > > of loading its state.
> > > >      *   COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the
> > > request is being shutdown.=
> > > >
> > > > In the case of COORDINATOR_LOAD_IN_PROGRESS I guess we could have
> some
> > > sort of retry loop – governed by a configuration parameter.
> > > >
> > > >   *   Do we need to do anything special for find-hanging
> transactions?
> > > >   *   Reusing transaction ids could be confusing in debugging.  I
> > wonder
> > > if we should not eagerly reuse transaction ids so the debugging would
> be
> > > more straight forward. Better still cpi;d be use a uuid as part of the
> > > identifier; this would seem more normal practise rather than managing
> an
> > > incrementing counter and filling the gaps as they become available.
> > > >   *   I am not very familiar with the KIP - I am unsure what you mean
> > by
> > > “ it also knows which transactions still need to be re-committed. It
> > aborts
> > > all other open transactions.”. When/why do we need to re-commit
> > > transactions and when do we abort?
> > > >   *   I wonder why “we expect 3 transactional ids to be in use per
> > > subtask”.
> > > >   *   I wonder why “There is at least one finalized transaction
> waiting
> > > to be committed” Do you mean the transaction is in state
> CompleteCommit?
> > > Can we not be the first transaction so have no older ones – or have I
> > > misunderstood?
> > > >
> > > > Kind regards, David.
> > > >
> > > >
> > > > From: Arvid Heise <ahe...@confluent.io.INVALID>
> > > > Date: Wednesday, 26 March 2025 at 06:49
> > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-511: Support transaction id
> > > pooling in Kafka connector
> > > > Hi Hongshun,
> > > >
> > > > this thread is about addressing your first pain point. It would be
> > > > awesome if you could read the FLIP and provide feedback whether this
> > > > approach would work in your setup.
> > > >
> > > > The other issues won't need a FLIP but need a smaller bugfix.
> > > >
> > > > Best,
> > > >
> > > > Arvid
> > > >
> > > > On Tue, Mar 25, 2025 at 7:57 AM Hongshun Wang <
> loserwang1...@gmail.com
> > >
> > > wrote:
> > > > >
> > > > > Hi heise,
> > > > > Thanks for pushing this job. Currently, kafka sinks in transactions
> > > are not
> > > > > productive, which will influence the whole kafka cluster.
> > > > > As we discussed before, I have three problems when using Kafka
> > > > >  exactly-once sink :
> > > > > 1. Too much transaction id
> > > https://issues.apache.org/jira/browse/FLINK-34554
> > > > > 2. Too much unclosed producer ( if commit is lag too much from
> > > pre-commit:
> > > > > https://issues.apache.org/jira/browse/FLINK-36569
> > > > > 3. maybe Invalid state:
> > > https://issues.apache.org/jira/browse/FLINK-37356
> > > > >
> > > > > I do hope this flip can show how to solve this problem.
> > > > >
> > > > > Best
> > > > > Hongshun Wang
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Feb 28, 2025 at 10:01 PM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > >
> > > > > > Dear Flink devs,
> > > > > >
> > > > > > I'd like to start discussion around an improvement of the
> > > exactly-once
> > > > > > Kafka sink. Because of its current design, the sink currently
> puts
> > a
> > > > > > large strain on the main memory of the Kafka broker [1], which
> > > hinders
> > > > > > adoption of the KafkaSink. Since the KafkaSink will be the only
> way
> > > to
> > > > > > produce into Kafka for Flink 2.0, this limitation may in turn
> also
> > > > > > limit Flink 2 adoption.
> > > > > >
> > > > > > I appreciate feedback not only on technical aspects but also on
> the
> > > > > > requirements of the new approach [2]:
> > > > > > - Are most users/customers already on Kafka 3.0 broker?
> > > > > > - The new approach requires READ permission on the target topic
> > (for
> > > > > > DescribeProducer API) and DESCRIBE permission on the transaction
> > (for
> > > > > > ListTransaction API). Are there any concerns around this?
> > > > > >
> > > > > > The new behavior would be opt-in in the first phase while the
> > current
> > > > > > behavior remains the DEFAULT. If the community accepts the new
> > > > > > behavior, we can turn it on by default and provide an option to
> > > > > > opt-out.
> > > > > >
> > > > > > Note that this improvement is borderline FLIP-worthy; it touches
> > the
> > > > > > public interfaces of the connector only. However, by doing an
> > > official
> > > > > > FLIP, I'm positive that we can gather feedback more quickly.
> > > > > >
> > > > > > I'm also looking for volunteers that test the implementation on
> > their
> > > > > > dev clusters [3].
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Arvid
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-34554
> > > > > > [2]
> > > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector
> > > > > > [3] https://github.com/apache/flink-connector-kafka/pull/154
> > > > > >
> > > >
> > > > Unless otherwise stated above:
> > > >
> > > > IBM United Kingdom Limited
> > > > Registered in England and Wales with number 741598
> > > > Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> > > Winchester, Hampshire SO21 2JN
> > >
> >
>

Reply via email to