Hi Heise, I agree with you. Let us push forward. I will review https://github.com/apache/flink-connector-kafka/pull/154 and discuss it. I will reply for some time because there seems to be too much code in this PR.
Best Hongshun On Mon, Apr 7, 2025 at 3:12 PM Arvid Heise <ar...@apache.org> wrote: > Hi Hongshun, > > 1. In case of unsupported API, the kafka-client library throws an error, > which will failover in Flink. We could make the error more user friendly > but I think fail over is correct instead of silently switching to an > undesired strategy. David already asked if we could check that during > validation of the plan, but imho it's too situational as you may not have > access to the Kafka cluster when you build the plan. I'm worried about > false positives here but I'm open to suggestions. > > 2. This FLIP is about the cleanup period and is not touching the write path > at all. > Transactional commits happen on notifyCheckpointComplete where they block > the task thread in case where committer and writer are chained (common > case). Have you seen an impact on performance? > I'm also not exactly sure where your concurrency is coming from. Could you > elaborate? I'm assuming you see more producers open because > notifyCheckpointComplete may occur after having multiple snapshotState. But > I don't see how to improve the situation except foregoing exactly once and > setting it at least once. > Checkpointing interval is an orthogonal configuration. I don't see how > anything on the connector side can change that. That sounds like a radical > new idea and deserves its own FLIP. If I get you correctly, you want to > decline/delay new checkpoints until old once are committed such that you > have no more than X ongoing transactions. That sounds very useful but it's > about automatically configuring the lowest possible checkpointing interval > that satisfies certain conditions. This is way outside of the scope of > connectors because it requires deep changes in the checkpoint coordinator > (only). > > 3. Yes, we will keep the list of returned transactions as small as > possible. The current call chain is > - The Flink sink may be configured to write into a (set of) static topics > or with a pattern target topic (target patterns are supported in OSS Flink > only). In the latter case, run ListTopics to collect the list of topics. > - Run DescribeProducer with the list of topics to get a list of producer > ids. > - Run ListTransaction API with the list of producer ids. > - Remove all transactions that are closed (ideally filter those with > request options already). > - Remove all transactions that are not owned by the subtask s by extracting > the original subtask id from the transactional id s’ and applying the > predicate s == s’ % p where p is the current parallelism. > - Remove all transactions that are part of the writer state (finalized > transactions). > - Abort all of these transactions by calling initTransaction. > We are currently working with Kafka devs to replace the first three calls > with a single call to ListTransaction with a new parameter that filters on > prefix. But that is available earliest in Kafka 4.1, so we need the current > approach anyhow. > > Note that the FLIP is already approved, so I can't change anything on it > anymore. Both 1. and 3. can be discussed on the pull request however [1]. > 2. is clearly out of scope. > > Best, > > Arvid > > [1] https://github.com/apache/flink-connector-kafka/pull/154 > > > On Mon, Apr 7, 2025 at 4:05 AM Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > Hi Heise, > > > > This sounds like a good idea. Just some tiny questions left: > > > > 1. > > > > Behavior when ListTransactions is unsupported: > > If the server does not support the ListTransactions API, what should > the > > system’s behavior be? > > - Should it throw an exception to signal incompatibility? > > - Or fallback to the legacy Kafka connector behavior (e.g., > > checkpointing without transactional checks)? > > Clear guidance here would help users and implementers understand > > pitfall recovery. > > 2. > > > > Impact of frequent checkpoints on write performance: > > When the checkpoint interval is small (e.g., configured to occur very > > frequently), could this lead to write performance bottlenecks if > > multiple producers (e.g., 3 producers) block the write pipeline? > > Could this FLIP include a tunable configuration parameter to let users > > adjust the checkpoint interval or producers’ concurrency limits if > > needed? > > 3. > > > > Optimizing transaction lookup with producerIdFilters: > > The proposal currently states: *"The writer uses ListTransactions to > > receive a list of all open transactions."* > > Would it be feasible to instead filter transactions using > > producerIdFilters (if supported) to retrieve only those transactions > > relevant to this writer? Doing so could reduce unnecessary overhead, > > especially in environments with many transactions. > > > > > > I will be grateful if you can answer me and add these details to FLIP. > > > > Best > > Hongshun > > > > On Thu, Mar 27, 2025 at 1:25 AM Arvid Heise <ar...@apache.org> wrote: > > > > > Hi David, > > > > > > Thank you for taking your time to go over the proposal. > > > > > > > * Can we check that we have the appropriate permissions and the > > > appropriate Kafka level(at least API key 66) to validate this option. > > > That's probably possible but we can do that only during runtime (we > > > may not have access to the Kafka cluster during pipeline building). > > > Did you see the pattern being used in other connectors? Could you > > > point me to it? > > > So the alternative is to just let the AdminClient fail with a > > > respective error since it performs these checks under the hood > > > anyways. > > > > > > > * I am wondering what we do for the 2 errors that the Kafka API > can > > > give namely > > > Could you point me to that? I haven't seen the errors yet. Is this > > > relevant only during the time a Kafka cluster starts? How often does > > > that happen? > > > If it's really rare, a simple alternative is to rely on Flink failover > > > because transaction abortion happens only during start of the Flink > > > application, so a failover is comparably cheap. > > > > > > > * Do we need to do anything special for find-hanging transactions? > > > Yes, that's what this whole proposal and FLIP is about. We look for > > > all transactions with the respective transactional id prefix that are > > > open, ignore transactions that still should be committed, and abort > > > them all. > > > > > > > * Reusing transaction ids could be confusing in debugging. I > > wonder > > > if we should not eagerly reuse transaction ids so the debugging would > be > > > more straight forward. Better still cpi;d be use a uuid as part of the > > > identifier; this would seem more normal practise rather than managing > an > > > incrementing counter and filling the gaps as they become available. > > > Please refer to FLINK-34554 [1] on why this is simply necessary. It's > > > an unfortunate design decision in Kafka server such that you should > > > not have many unique transactional ids. > > > We can use uuids internally to ease debugging but we can't create > > > transactional ids based on that. (We internally use UUIDs for the > > > prefix at Confluent, maybe that's what you mean). > > > Alternatively, we attach the current checkpoint id to the log messages > > > such that the pair transaction id/checkpoint id is unique since a > > > transaction id is reused for a later checkpoint. > > > > > > > * I am not very familiar with the KIP - I am unsure what you mean > > by > > > “ it also knows which transactions still need to be re-committed. It > > aborts > > > all other open transactions.”. When/why do we need to re-commit > > > transactions and when do we abort? > > > To which KIP do you refer? > > > Flink uses a 2PC to guarantee exactly once. On pre-commit, we finalize > > > a transaction and put its state into the checkpoint. If all tasks > > > succeed checkpointing, notifyCheckpointCompleted is called and we > > > actually commit. Between pre-commit and commit, a failure may happen > > > such that we need to re-commit all transactions on recovery that are > > > part of the checkpoint because for consistency they are supposed to be > > > committed. > > > > > > > * I wonder why “we expect 3 transactional ids to be in use per > > > subtask”. > > > Per checkpoint and subtask, we have one transaction ongoing for the > > > writer at all times. We also have one transaction that is being > > > finished by the previous checkpoint and that awaits the > > > notifyCheckpointCompleted. Lastly on recovery, we also have an id from > > > the previous run that we can't yet reuse. In many cases, we probably > > > re-commit the recovered id before we finish another checkpoint, so > > > it's rather 2 ids per subtask. > > > > > > > * I wonder why “There is at least one finalized transaction > waiting > > > to be committed” Do you mean the transaction is in state > CompleteCommit? > > > Can we not be the first transaction so have no older ones – or have I > > > misunderstood? > > > See above. It's unrelated to internal Kafka states - they are all > > Ongoing. > > > You are right that for the very first attempt, we have no such > > > transaction until the first checkpoint is snapshotted. > > > > > > Best, > > > > > > Arvid > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-34554 > > > > > > On Wed, Mar 26, 2025 at 5:28 PM David Radley <david_rad...@uk.ibm.com> > > > wrote: > > > > > > > > Hi Arvid, > > > > This sounds like a good idea. I wondered: > > > > > > > > * Can we check that we have the appropriate permissions and the > > > appropriate Kafka level(at least API key 66) to validate this option. > > > > * I am wondering what we do for the 2 errors that the Kafka API > can > > > give namely > > > > * COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the > > process > > > of loading its state. > > > > * COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the > > > request is being shutdown.= > > > > > > > > In the case of COORDINATOR_LOAD_IN_PROGRESS I guess we could have > some > > > sort of retry loop – governed by a configuration parameter. > > > > > > > > * Do we need to do anything special for find-hanging > transactions? > > > > * Reusing transaction ids could be confusing in debugging. I > > wonder > > > if we should not eagerly reuse transaction ids so the debugging would > be > > > more straight forward. Better still cpi;d be use a uuid as part of the > > > identifier; this would seem more normal practise rather than managing > an > > > incrementing counter and filling the gaps as they become available. > > > > * I am not very familiar with the KIP - I am unsure what you mean > > by > > > “ it also knows which transactions still need to be re-committed. It > > aborts > > > all other open transactions.”. When/why do we need to re-commit > > > transactions and when do we abort? > > > > * I wonder why “we expect 3 transactional ids to be in use per > > > subtask”. > > > > * I wonder why “There is at least one finalized transaction > waiting > > > to be committed” Do you mean the transaction is in state > CompleteCommit? > > > Can we not be the first transaction so have no older ones – or have I > > > misunderstood? > > > > > > > > Kind regards, David. > > > > > > > > > > > > From: Arvid Heise <ahe...@confluent.io.INVALID> > > > > Date: Wednesday, 26 March 2025 at 06:49 > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-511: Support transaction id > > > pooling in Kafka connector > > > > Hi Hongshun, > > > > > > > > this thread is about addressing your first pain point. It would be > > > > awesome if you could read the FLIP and provide feedback whether this > > > > approach would work in your setup. > > > > > > > > The other issues won't need a FLIP but need a smaller bugfix. > > > > > > > > Best, > > > > > > > > Arvid > > > > > > > > On Tue, Mar 25, 2025 at 7:57 AM Hongshun Wang < > loserwang1...@gmail.com > > > > > > wrote: > > > > > > > > > > Hi heise, > > > > > Thanks for pushing this job. Currently, kafka sinks in transactions > > > are not > > > > > productive, which will influence the whole kafka cluster. > > > > > As we discussed before, I have three problems when using Kafka > > > > > exactly-once sink : > > > > > 1. Too much transaction id > > > https://issues.apache.org/jira/browse/FLINK-34554 > > > > > 2. Too much unclosed producer ( if commit is lag too much from > > > pre-commit: > > > > > https://issues.apache.org/jira/browse/FLINK-36569 > > > > > 3. maybe Invalid state: > > > https://issues.apache.org/jira/browse/FLINK-37356 > > > > > > > > > > I do hope this flip can show how to solve this problem. > > > > > > > > > > Best > > > > > Hongshun Wang > > > > > > > > > > > > > > > > > > > > On Fri, Feb 28, 2025 at 10:01 PM Arvid Heise <ar...@apache.org> > > wrote: > > > > > > > > > > > Dear Flink devs, > > > > > > > > > > > > I'd like to start discussion around an improvement of the > > > exactly-once > > > > > > Kafka sink. Because of its current design, the sink currently > puts > > a > > > > > > large strain on the main memory of the Kafka broker [1], which > > > hinders > > > > > > adoption of the KafkaSink. Since the KafkaSink will be the only > way > > > to > > > > > > produce into Kafka for Flink 2.0, this limitation may in turn > also > > > > > > limit Flink 2 adoption. > > > > > > > > > > > > I appreciate feedback not only on technical aspects but also on > the > > > > > > requirements of the new approach [2]: > > > > > > - Are most users/customers already on Kafka 3.0 broker? > > > > > > - The new approach requires READ permission on the target topic > > (for > > > > > > DescribeProducer API) and DESCRIBE permission on the transaction > > (for > > > > > > ListTransaction API). Are there any concerns around this? > > > > > > > > > > > > The new behavior would be opt-in in the first phase while the > > current > > > > > > behavior remains the DEFAULT. If the community accepts the new > > > > > > behavior, we can turn it on by default and provide an option to > > > > > > opt-out. > > > > > > > > > > > > Note that this improvement is borderline FLIP-worthy; it touches > > the > > > > > > public interfaces of the connector only. However, by doing an > > > official > > > > > > FLIP, I'm positive that we can gather feedback more quickly. > > > > > > > > > > > > I'm also looking for volunteers that test the implementation on > > their > > > > > > dev clusters [3]. > > > > > > > > > > > > Best, > > > > > > > > > > > > Arvid > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-34554 > > > > > > [2] > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector > > > > > > [3] https://github.com/apache/flink-connector-kafka/pull/154 > > > > > > > > > > > > > > Unless otherwise stated above: > > > > > > > > IBM United Kingdom Limited > > > > Registered in England and Wales with number 741598 > > > > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > > > Winchester, Hampshire SO21 2JN > > > > > >