Just to clarify on the implementation details: today inside
Sender#runOnce(), we have the following:

```

if (maybeSendAndPollTransactionalRequest()) {
    return;
}

```


Which basically means that as long as we still have any in-flight txn
request; so we are not really blocking on the first sent request but on all
of the types. For add-partitions requests, that makes sense since once
unblocked, all requests would unblock at the same time anyways; but we do
not necessarily need to block on AddOffsetCommitsToTxnRequest and
TxnOffsetCommitRequest.

So we can relax the above restriction as 1) the txn coordinator is known,
2) the producer has a valid PID. 3) all partitions are registered in the
txn.



Guozhang

On Wed, May 20, 2020 at 3:13 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thanks Guozhang for the new proposal. I agree that we could deliver
> https://issues.apache.org/jira/browse/KAFKA-9878
> <https://issues.apache.org/jira/browse/KAFKA-9878?src=confmacro> first and
> measure the following metrics:
>
> 1. The total volume of AddPartitionToTxn requests
> 2. The time used in propagating the transaction state updates during
> transaction
> 3. The time used in transaction marker propagation
>
> If those metrics suggest that we are doing a pretty good job already, then
> the improvement of delivering the entire KIP-609 is minimal. In the
> meantime, I updated 9878 with more details. Additionally, I realized that
> we should change the logic for AddPartitionToTxn call so that we could
> maintain a future queue and wait for all the delta change completions,
> instead of blocking on the first sent out one. Does that make sense?
>
> On Wed, May 20, 2020 at 2:28 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Matthias,
> >
> > I have a quick question regarding the motivation of the long-blocking and
> > batch-add-partitions behavior: do we think the latency primarily comes
> from
> > the network round-trips, or from the coordinator-side transaction-log
> > appends? If we believe it is coming from the latter, then perhaps we can
> > first consider optimizing that without making any public changes,
> > specifically:
> >
> > 1) We block on the add-partitions in a purgatory, as proposed in your
> KIP.
> >
> > 2) When try-completing the parked add-partitions requests in a purgatory,
> > we consolidate them as a single txn state transition with a single append
> > to transaction log.
> >
> > 3) *Optionally* on the client side, we can further optimize the behavior:
> > instead of block on sending any batches as long as there are any txn
> > requests in flight, we would just query which partitions has successfully
> > "registered" as part of the txn from add-partitions response and then
> send
> > records for those partitions. By doing this we can reduce the end-to-end
> > blocking time.
> >
> > None of the above changes actually requires any public API or protocol
> > changes. In addition, it would not make things worse even in edge cases
> > whereas with the proposed API change, if the producer pre-registered a
> > bunch of partitions but then timed out, the coordinator need to write
> abort
> > markers to those pre-registered partitions unnecessarily. We can measure
> > the avg. number of txn log appends per transaction on the broker side and
> > see if it can be reduced to close to 1 already.
> >
> >
> > Guozhang
> >
> >
> > On Tue, May 19, 2020 at 10:33 AM Boyang Chen <reluctanthero...@gmail.com
> >
> > wrote:
> >
> > > Hey John,
> > >
> > > thanks for the insights! Replied inline.
> > >
> > > On Tue, May 19, 2020 at 7:48 AM John Roesler <vvcep...@apache.org>
> > wrote:
> > >
> > > > Thanks for the KIP, Boyang!
> > > >
> > > > This looks good and reasonable to me overall.
> > > >
> > > > J1: One clarification: you mention that the blocking behavior depends
> > on
> > > > a new broker version, which sounds good to me, but I didn't see why
> > > > we would need to throw any UnsupportedVersionExceptions. It sounds
> > > > a little like you just want to implement a kind of long polling on
> the
> > > > AddPartitionToTxn API, such that the broker would optimistically
> block
> > > > for a while when there is a pending prior transaction.
> > > >
> > > > Can this just be a behavior change on the broker side, such that both
> > > > old and new clients would be asked to retry when the broker is older,
> > > > and both old and new clients would instead see the API call block for
> > > > longer (but be successful more often) when the broker is newer?
> > > >
> > > > Related: is it still possible to get back the "please retry" error
> from
> > > the
> > > > broker, or is it guaranteed to block until the call completes?
> > > >
> > > > This is a good observation. I agree the blocking behavior could
> benefit
> > > all the producer
> > > versions older than 0.11, which could be retried. Added to the KIP.
> > >
> > >
> > > > J2: Please forgive my ignorance, but is there any ill effect if a
> > > producer
> > > > adds a partition to a transaction and then commits without having
> added
> > > > any data to the transaction?
> > > >
> > > > I can see this happening, e.g., if I know that my application
> generally
> > > > sends to 5 TopicPartitions, I would use the new beginTransaction call
> > > > and just always give it the same list of partitions, and _then_ do
> the
> > > > processing, which may or may not send data to all five potential
> > > > partitions.
> > > >
> > >
> > > Yes, that's possible, which is the reason why we discussed bumping the
> > > EndTxn request
> > > to only include the partitions actually being written to, so that the
> > > transaction coordinator will only send markers
> > > to the actually-written partitions. The worst case for mis-used
> > > pre-registration API
> > > is to write out more transaction markers than necessary. For once, I do
> > see
> > > the benefit of doing that,
> > > which is a life-saver for a "lazy user" who doesn't want to infer the
> > > output partitions it would write to, but always
> > > registers the full set of output partitions. With this observation in
> > mind,
> > > bumping EndTxn makes sense.
> > >
> > > >
> > > > Thanks again!
> > > > -John
> > > >
> > > > On Mon, May 18, 2020, at 10:25, Boyang Chen wrote:
> > > > > Oh, I see your point! Will add that context to the KIP.
> > > > >
> > > > > Boyang
> > > > >
> > > > > On Sun, May 17, 2020 at 11:39 AM Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > My point here is only for the first AddPartitionToTxn request of
> > the
> > > > > > transaction, since only that request would potentially be blocked
> > on
> > > > the
> > > > > > previous txn to complete. By deferring it we reduce the blocking
> > > time.
> > > > > >
> > > > > > I think StreamsConfigs override the linger.ms to 100ms not 10ms,
> > so
> > > > in the
> > > > > > best case we can defer the first AddPartitionToTxn of the
> > transaction
> > > > by
> > > > > > 100ms.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Sat, May 16, 2020 at 12:20 PM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks Guozhang for the context. The producer batch is either
> > > > bounded by
> > > > > > > the size or the linger time. For the default 10ms linger and
> > 100ms
> > > > > > > transaction commit time, the producer will be capped by
> > > > > > > AddPartitionToTxn 10 times in the worst case. I think the
> > > improvement
> > > > > > here
> > > > > > > aims for the worst case scenario for users who didn't realize
> how
> > > the
> > > > > > > internal works, and uses the API calls in a very inefficient
> way
> > as
> > > > the
> > > > > > > scenario where record processing and send() happen
> concurrently.
> > > > > > >
> > > > > > > Boyang
> > > > > > >
> > > > > > > On Sat, May 16, 2020 at 10:19 AM Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Boyang,
> > > > > > > >
> > > > > > > > Thanks for the proposed KIP, overall it makes sense to me.
> > > > > > > >
> > > > > > > > One non-public API related point that I'd like to make
> though,
> > is
> > > > that
> > > > > > in
> > > > > > > > KafkaProducer.send call we can potentially defer sending
> > > > > > > > AddPartitionsToTxn request until the sender is about to send
> > the
> > > > first
> > > > > > > > batch -- this is what I observed from some soak testing
> > > > investigation
> > > > > > > such
> > > > > > > > that the batching effects actually allows the first record to
> > be
> > > > sent
> > > > > > > much
> > > > > > > > later than the send() call and that can be leveraged to
> further
> > > > reduce
> > > > > > > the
> > > > > > > > time that we would be blocked on the AddPartitionsToTxn
> > request.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, May 14, 2020 at 10:26 PM Boyang Chen <
> > > > > > reluctanthero...@gmail.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey all,
> > > > > > > > >
> > > > > > > > > I would like to start the discussion for KIP-609:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-609%3A+Use+Pre-registration+and+Blocking+Calls+for+Better+Transaction+Efficiency
> > > > > > > > >
> > > > > > > > > This KIP aims to improve the current EOS semantic which
> makes
> > > the
> > > > > > > > > processing more efficient and consolidated.
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to