Hello Matthias,

I have a quick question regarding the motivation of the long-blocking and
batch-add-partitions behavior: do we think the latency primarily comes from
the network round-trips, or from the coordinator-side transaction-log
appends? If we believe it is coming from the latter, then perhaps we can
first consider optimizing that without making any public changes,
specifically:

1) We block on the add-partitions in a purgatory, as proposed in your KIP.

2) When try-completing the parked add-partitions requests in a purgatory,
we consolidate them as a single txn state transition with a single append
to transaction log.

3) *Optionally* on the client side, we can further optimize the behavior:
instead of block on sending any batches as long as there are any txn
requests in flight, we would just query which partitions has successfully
"registered" as part of the txn from add-partitions response and then send
records for those partitions. By doing this we can reduce the end-to-end
blocking time.

None of the above changes actually requires any public API or protocol
changes. In addition, it would not make things worse even in edge cases
whereas with the proposed API change, if the producer pre-registered a
bunch of partitions but then timed out, the coordinator need to write abort
markers to those pre-registered partitions unnecessarily. We can measure
the avg. number of txn log appends per transaction on the broker side and
see if it can be reduced to close to 1 already.


Guozhang


On Tue, May 19, 2020 at 10:33 AM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Hey John,
>
> thanks for the insights! Replied inline.
>
> On Tue, May 19, 2020 at 7:48 AM John Roesler <vvcep...@apache.org> wrote:
>
> > Thanks for the KIP, Boyang!
> >
> > This looks good and reasonable to me overall.
> >
> > J1: One clarification: you mention that the blocking behavior depends on
> > a new broker version, which sounds good to me, but I didn't see why
> > we would need to throw any UnsupportedVersionExceptions. It sounds
> > a little like you just want to implement a kind of long polling on the
> > AddPartitionToTxn API, such that the broker would optimistically block
> > for a while when there is a pending prior transaction.
> >
> > Can this just be a behavior change on the broker side, such that both
> > old and new clients would be asked to retry when the broker is older,
> > and both old and new clients would instead see the API call block for
> > longer (but be successful more often) when the broker is newer?
> >
> > Related: is it still possible to get back the "please retry" error from
> the
> > broker, or is it guaranteed to block until the call completes?
> >
> > This is a good observation. I agree the blocking behavior could benefit
> all the producer
> versions older than 0.11, which could be retried. Added to the KIP.
>
>
> > J2: Please forgive my ignorance, but is there any ill effect if a
> producer
> > adds a partition to a transaction and then commits without having added
> > any data to the transaction?
> >
> > I can see this happening, e.g., if I know that my application generally
> > sends to 5 TopicPartitions, I would use the new beginTransaction call
> > and just always give it the same list of partitions, and _then_ do the
> > processing, which may or may not send data to all five potential
> > partitions.
> >
>
> Yes, that's possible, which is the reason why we discussed bumping the
> EndTxn request
> to only include the partitions actually being written to, so that the
> transaction coordinator will only send markers
> to the actually-written partitions. The worst case for mis-used
> pre-registration API
> is to write out more transaction markers than necessary. For once, I do see
> the benefit of doing that,
> which is a life-saver for a "lazy user" who doesn't want to infer the
> output partitions it would write to, but always
> registers the full set of output partitions. With this observation in mind,
> bumping EndTxn makes sense.
>
> >
> > Thanks again!
> > -John
> >
> > On Mon, May 18, 2020, at 10:25, Boyang Chen wrote:
> > > Oh, I see your point! Will add that context to the KIP.
> > >
> > > Boyang
> > >
> > > On Sun, May 17, 2020 at 11:39 AM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > My point here is only for the first AddPartitionToTxn request of the
> > > > transaction, since only that request would potentially be blocked on
> > the
> > > > previous txn to complete. By deferring it we reduce the blocking
> time.
> > > >
> > > > I think StreamsConfigs override the linger.ms to 100ms not 10ms, so
> > in the
> > > > best case we can defer the first AddPartitionToTxn of the transaction
> > by
> > > > 100ms.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, May 16, 2020 at 12:20 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Guozhang for the context. The producer batch is either
> > bounded by
> > > > > the size or the linger time. For the default 10ms linger and 100ms
> > > > > transaction commit time, the producer will be capped by
> > > > > AddPartitionToTxn 10 times in the worst case. I think the
> improvement
> > > > here
> > > > > aims for the worst case scenario for users who didn't realize how
> the
> > > > > internal works, and uses the API calls in a very inefficient way as
> > the
> > > > > scenario where record processing and send() happen concurrently.
> > > > >
> > > > > Boyang
> > > > >
> > > > > On Sat, May 16, 2020 at 10:19 AM Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hello Boyang,
> > > > > >
> > > > > > Thanks for the proposed KIP, overall it makes sense to me.
> > > > > >
> > > > > > One non-public API related point that I'd like to make though, is
> > that
> > > > in
> > > > > > KafkaProducer.send call we can potentially defer sending
> > > > > > AddPartitionsToTxn request until the sender is about to send the
> > first
> > > > > > batch -- this is what I observed from some soak testing
> > investigation
> > > > > such
> > > > > > that the batching effects actually allows the first record to be
> > sent
> > > > > much
> > > > > > later than the send() call and that can be leveraged to further
> > reduce
> > > > > the
> > > > > > time that we would be blocked on the AddPartitionsToTxn request.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Thu, May 14, 2020 at 10:26 PM Boyang Chen <
> > > > reluctanthero...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey all,
> > > > > > >
> > > > > > > I would like to start the discussion for KIP-609:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-609%3A+Use+Pre-registration+and+Blocking+Calls+for+Better+Transaction+Efficiency
> > > > > > >
> > > > > > > This KIP aims to improve the current EOS semantic which makes
> the
> > > > > > > processing more efficient and consolidated.
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


-- 
-- Guozhang

Reply via email to