Hi Heise, This sounds like a good idea. Just some tiny questions left:
1. Behavior when ListTransactions is unsupported: If the server does not support the ListTransactions API, what should the system’s behavior be? - Should it throw an exception to signal incompatibility? - Or fallback to the legacy Kafka connector behavior (e.g., checkpointing without transactional checks)? Clear guidance here would help users and implementers understand pitfall recovery. 2. Impact of frequent checkpoints on write performance: When the checkpoint interval is small (e.g., configured to occur very frequently), could this lead to write performance bottlenecks if multiple producers (e.g., 3 producers) block the write pipeline? Could this FLIP include a tunable configuration parameter to let users adjust the checkpoint interval or producers’ concurrency limits if needed? 3. Optimizing transaction lookup with producerIdFilters: The proposal currently states: *"The writer uses ListTransactions to receive a list of all open transactions."* Would it be feasible to instead filter transactions using producerIdFilters (if supported) to retrieve only those transactions relevant to this writer? Doing so could reduce unnecessary overhead, especially in environments with many transactions. I will be grateful if you can answer me and add these details to FLIP. Best Hongshun On Thu, Mar 27, 2025 at 1:25 AM Arvid Heise <ar...@apache.org> wrote: > Hi David, > > Thank you for taking your time to go over the proposal. > > > * Can we check that we have the appropriate permissions and the > appropriate Kafka level(at least API key 66) to validate this option. > That's probably possible but we can do that only during runtime (we > may not have access to the Kafka cluster during pipeline building). > Did you see the pattern being used in other connectors? Could you > point me to it? > So the alternative is to just let the AdminClient fail with a > respective error since it performs these checks under the hood > anyways. > > > * I am wondering what we do for the 2 errors that the Kafka API can > give namely > Could you point me to that? I haven't seen the errors yet. Is this > relevant only during the time a Kafka cluster starts? How often does > that happen? > If it's really rare, a simple alternative is to rely on Flink failover > because transaction abortion happens only during start of the Flink > application, so a failover is comparably cheap. > > > * Do we need to do anything special for find-hanging transactions? > Yes, that's what this whole proposal and FLIP is about. We look for > all transactions with the respective transactional id prefix that are > open, ignore transactions that still should be committed, and abort > them all. > > > * Reusing transaction ids could be confusing in debugging. I wonder > if we should not eagerly reuse transaction ids so the debugging would be > more straight forward. Better still cpi;d be use a uuid as part of the > identifier; this would seem more normal practise rather than managing an > incrementing counter and filling the gaps as they become available. > Please refer to FLINK-34554 [1] on why this is simply necessary. It's > an unfortunate design decision in Kafka server such that you should > not have many unique transactional ids. > We can use uuids internally to ease debugging but we can't create > transactional ids based on that. (We internally use UUIDs for the > prefix at Confluent, maybe that's what you mean). > Alternatively, we attach the current checkpoint id to the log messages > such that the pair transaction id/checkpoint id is unique since a > transaction id is reused for a later checkpoint. > > > * I am not very familiar with the KIP - I am unsure what you mean by > “ it also knows which transactions still need to be re-committed. It aborts > all other open transactions.”. When/why do we need to re-commit > transactions and when do we abort? > To which KIP do you refer? > Flink uses a 2PC to guarantee exactly once. On pre-commit, we finalize > a transaction and put its state into the checkpoint. If all tasks > succeed checkpointing, notifyCheckpointCompleted is called and we > actually commit. Between pre-commit and commit, a failure may happen > such that we need to re-commit all transactions on recovery that are > part of the checkpoint because for consistency they are supposed to be > committed. > > > * I wonder why “we expect 3 transactional ids to be in use per > subtask”. > Per checkpoint and subtask, we have one transaction ongoing for the > writer at all times. We also have one transaction that is being > finished by the previous checkpoint and that awaits the > notifyCheckpointCompleted. Lastly on recovery, we also have an id from > the previous run that we can't yet reuse. In many cases, we probably > re-commit the recovered id before we finish another checkpoint, so > it's rather 2 ids per subtask. > > > * I wonder why “There is at least one finalized transaction waiting > to be committed” Do you mean the transaction is in state CompleteCommit? > Can we not be the first transaction so have no older ones – or have I > misunderstood? > See above. It's unrelated to internal Kafka states - they are all Ongoing. > You are right that for the very first attempt, we have no such > transaction until the first checkpoint is snapshotted. > > Best, > > Arvid > > [1] https://issues.apache.org/jira/browse/FLINK-34554 > > On Wed, Mar 26, 2025 at 5:28 PM David Radley <david_rad...@uk.ibm.com> > wrote: > > > > Hi Arvid, > > This sounds like a good idea. I wondered: > > > > * Can we check that we have the appropriate permissions and the > appropriate Kafka level(at least API key 66) to validate this option. > > * I am wondering what we do for the 2 errors that the Kafka API can > give namely > > * COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the process > of loading its state. > > * COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the > request is being shutdown.= > > > > In the case of COORDINATOR_LOAD_IN_PROGRESS I guess we could have some > sort of retry loop – governed by a configuration parameter. > > > > * Do we need to do anything special for find-hanging transactions? > > * Reusing transaction ids could be confusing in debugging. I wonder > if we should not eagerly reuse transaction ids so the debugging would be > more straight forward. Better still cpi;d be use a uuid as part of the > identifier; this would seem more normal practise rather than managing an > incrementing counter and filling the gaps as they become available. > > * I am not very familiar with the KIP - I am unsure what you mean by > “ it also knows which transactions still need to be re-committed. It aborts > all other open transactions.”. When/why do we need to re-commit > transactions and when do we abort? > > * I wonder why “we expect 3 transactional ids to be in use per > subtask”. > > * I wonder why “There is at least one finalized transaction waiting > to be committed” Do you mean the transaction is in state CompleteCommit? > Can we not be the first transaction so have no older ones – or have I > misunderstood? > > > > Kind regards, David. > > > > > > From: Arvid Heise <ahe...@confluent.io.INVALID> > > Date: Wednesday, 26 March 2025 at 06:49 > > To: dev@flink.apache.org <dev@flink.apache.org> > > Subject: [EXTERNAL] Re: [DISCUSS] FLIP-511: Support transaction id > pooling in Kafka connector > > Hi Hongshun, > > > > this thread is about addressing your first pain point. It would be > > awesome if you could read the FLIP and provide feedback whether this > > approach would work in your setup. > > > > The other issues won't need a FLIP but need a smaller bugfix. > > > > Best, > > > > Arvid > > > > On Tue, Mar 25, 2025 at 7:57 AM Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > > > > Hi heise, > > > Thanks for pushing this job. Currently, kafka sinks in transactions > are not > > > productive, which will influence the whole kafka cluster. > > > As we discussed before, I have three problems when using Kafka > > > exactly-once sink : > > > 1. Too much transaction id > https://issues.apache.org/jira/browse/FLINK-34554 > > > 2. Too much unclosed producer ( if commit is lag too much from > pre-commit: > > > https://issues.apache.org/jira/browse/FLINK-36569 > > > 3. maybe Invalid state: > https://issues.apache.org/jira/browse/FLINK-37356 > > > > > > I do hope this flip can show how to solve this problem. > > > > > > Best > > > Hongshun Wang > > > > > > > > > > > > On Fri, Feb 28, 2025 at 10:01 PM Arvid Heise <ar...@apache.org> wrote: > > > > > > > Dear Flink devs, > > > > > > > > I'd like to start discussion around an improvement of the > exactly-once > > > > Kafka sink. Because of its current design, the sink currently puts a > > > > large strain on the main memory of the Kafka broker [1], which > hinders > > > > adoption of the KafkaSink. Since the KafkaSink will be the only way > to > > > > produce into Kafka for Flink 2.0, this limitation may in turn also > > > > limit Flink 2 adoption. > > > > > > > > I appreciate feedback not only on technical aspects but also on the > > > > requirements of the new approach [2]: > > > > - Are most users/customers already on Kafka 3.0 broker? > > > > - The new approach requires READ permission on the target topic (for > > > > DescribeProducer API) and DESCRIBE permission on the transaction (for > > > > ListTransaction API). Are there any concerns around this? > > > > > > > > The new behavior would be opt-in in the first phase while the current > > > > behavior remains the DEFAULT. If the community accepts the new > > > > behavior, we can turn it on by default and provide an option to > > > > opt-out. > > > > > > > > Note that this improvement is borderline FLIP-worthy; it touches the > > > > public interfaces of the connector only. However, by doing an > official > > > > FLIP, I'm positive that we can gather feedback more quickly. > > > > > > > > I'm also looking for volunteers that test the implementation on their > > > > dev clusters [3]. > > > > > > > > Best, > > > > > > > > Arvid > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-34554 > > > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector > > > > [3] https://github.com/apache/flink-connector-kafka/pull/154 > > > > > > > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > Winchester, Hampshire SO21 2JN >