Thanks Guozhang for the new proposal. I agree that we could deliver https://issues.apache.org/jira/browse/KAFKA-9878 <https://issues.apache.org/jira/browse/KAFKA-9878?src=confmacro> first and measure the following metrics:
1. The total volume of AddPartitionToTxn requests 2. The time used in propagating the transaction state updates during transaction 3. The time used in transaction marker propagation If those metrics suggest that we are doing a pretty good job already, then the improvement of delivering the entire KIP-609 is minimal. In the meantime, I updated 9878 with more details. Additionally, I realized that we should change the logic for AddPartitionToTxn call so that we could maintain a future queue and wait for all the delta change completions, instead of blocking on the first sent out one. Does that make sense? On Wed, May 20, 2020 at 2:28 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Matthias, > > I have a quick question regarding the motivation of the long-blocking and > batch-add-partitions behavior: do we think the latency primarily comes from > the network round-trips, or from the coordinator-side transaction-log > appends? If we believe it is coming from the latter, then perhaps we can > first consider optimizing that without making any public changes, > specifically: > > 1) We block on the add-partitions in a purgatory, as proposed in your KIP. > > 2) When try-completing the parked add-partitions requests in a purgatory, > we consolidate them as a single txn state transition with a single append > to transaction log. > > 3) *Optionally* on the client side, we can further optimize the behavior: > instead of block on sending any batches as long as there are any txn > requests in flight, we would just query which partitions has successfully > "registered" as part of the txn from add-partitions response and then send > records for those partitions. By doing this we can reduce the end-to-end > blocking time. > > None of the above changes actually requires any public API or protocol > changes. In addition, it would not make things worse even in edge cases > whereas with the proposed API change, if the producer pre-registered a > bunch of partitions but then timed out, the coordinator need to write abort > markers to those pre-registered partitions unnecessarily. We can measure > the avg. number of txn log appends per transaction on the broker side and > see if it can be reduced to close to 1 already. > > > Guozhang > > > On Tue, May 19, 2020 at 10:33 AM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Hey John, > > > > thanks for the insights! Replied inline. > > > > On Tue, May 19, 2020 at 7:48 AM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks for the KIP, Boyang! > > > > > > This looks good and reasonable to me overall. > > > > > > J1: One clarification: you mention that the blocking behavior depends > on > > > a new broker version, which sounds good to me, but I didn't see why > > > we would need to throw any UnsupportedVersionExceptions. It sounds > > > a little like you just want to implement a kind of long polling on the > > > AddPartitionToTxn API, such that the broker would optimistically block > > > for a while when there is a pending prior transaction. > > > > > > Can this just be a behavior change on the broker side, such that both > > > old and new clients would be asked to retry when the broker is older, > > > and both old and new clients would instead see the API call block for > > > longer (but be successful more often) when the broker is newer? > > > > > > Related: is it still possible to get back the "please retry" error from > > the > > > broker, or is it guaranteed to block until the call completes? > > > > > > This is a good observation. I agree the blocking behavior could benefit > > all the producer > > versions older than 0.11, which could be retried. Added to the KIP. > > > > > > > J2: Please forgive my ignorance, but is there any ill effect if a > > producer > > > adds a partition to a transaction and then commits without having added > > > any data to the transaction? > > > > > > I can see this happening, e.g., if I know that my application generally > > > sends to 5 TopicPartitions, I would use the new beginTransaction call > > > and just always give it the same list of partitions, and _then_ do the > > > processing, which may or may not send data to all five potential > > > partitions. > > > > > > > Yes, that's possible, which is the reason why we discussed bumping the > > EndTxn request > > to only include the partitions actually being written to, so that the > > transaction coordinator will only send markers > > to the actually-written partitions. The worst case for mis-used > > pre-registration API > > is to write out more transaction markers than necessary. For once, I do > see > > the benefit of doing that, > > which is a life-saver for a "lazy user" who doesn't want to infer the > > output partitions it would write to, but always > > registers the full set of output partitions. With this observation in > mind, > > bumping EndTxn makes sense. > > > > > > > > Thanks again! > > > -John > > > > > > On Mon, May 18, 2020, at 10:25, Boyang Chen wrote: > > > > Oh, I see your point! Will add that context to the KIP. > > > > > > > > Boyang > > > > > > > > On Sun, May 17, 2020 at 11:39 AM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > My point here is only for the first AddPartitionToTxn request of > the > > > > > transaction, since only that request would potentially be blocked > on > > > the > > > > > previous txn to complete. By deferring it we reduce the blocking > > time. > > > > > > > > > > I think StreamsConfigs override the linger.ms to 100ms not 10ms, > so > > > in the > > > > > best case we can defer the first AddPartitionToTxn of the > transaction > > > by > > > > > 100ms. > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 12:20 PM Boyang Chen < > > > reluctanthero...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thanks Guozhang for the context. The producer batch is either > > > bounded by > > > > > > the size or the linger time. For the default 10ms linger and > 100ms > > > > > > transaction commit time, the producer will be capped by > > > > > > AddPartitionToTxn 10 times in the worst case. I think the > > improvement > > > > > here > > > > > > aims for the worst case scenario for users who didn't realize how > > the > > > > > > internal works, and uses the API calls in a very inefficient way > as > > > the > > > > > > scenario where record processing and send() happen concurrently. > > > > > > > > > > > > Boyang > > > > > > > > > > > > On Sat, May 16, 2020 at 10:19 AM Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hello Boyang, > > > > > > > > > > > > > > Thanks for the proposed KIP, overall it makes sense to me. > > > > > > > > > > > > > > One non-public API related point that I'd like to make though, > is > > > that > > > > > in > > > > > > > KafkaProducer.send call we can potentially defer sending > > > > > > > AddPartitionsToTxn request until the sender is about to send > the > > > first > > > > > > > batch -- this is what I observed from some soak testing > > > investigation > > > > > > such > > > > > > > that the batching effects actually allows the first record to > be > > > sent > > > > > > much > > > > > > > later than the send() call and that can be leveraged to further > > > reduce > > > > > > the > > > > > > > time that we would be blocked on the AddPartitionsToTxn > request. > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Thu, May 14, 2020 at 10:26 PM Boyang Chen < > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > > > I would like to start the discussion for KIP-609: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-609%3A+Use+Pre-registration+and+Blocking+Calls+for+Better+Transaction+Efficiency > > > > > > > > > > > > > > > > This KIP aims to improve the current EOS semantic which makes > > the > > > > > > > > processing more efficient and consolidated. > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > -- > -- Guozhang >