> Thanks for the KIP. Nice documentation on all current issues with the
> timeout.

For the KIP writeup, all credit goes to Joel Koshy.

I'll follow up on your comments a little later.


>
> You also brought up a good use case for timing out a message. For
> applications that collect and send sensor data to Kafka, if the data can't
> be sent to Kafka for some reason, the application may prefer to buffer the
> more recent data in the accumulator. Without a timeout, the accumulator
> will be filled with old records and new records can't be added.
>
> Your proposal makes sense for a developer who is familiar with how the
> producer works. I am not sure if this is very intuitive to the users since
> it may not be very easy for them to figure out how to configure the new
> knob to bound the amount of the time when a message is completed.
>
> From users' perspective, Apurva's suggestion of
> max.message.delivery.wait.ms (which
> bounds the time when a message is in the accumulator to the time when the
> callback is called) seems more intuition. You listed this in the rejected
> section since it requires additional logic to rebatch when a produce
> request expires. However, this may not be too bad. The following are the
> things that we have to do.
>
> 1. The clock starts when a batch is created.
> 2. If the batch can't be drained within max.message.delivery.wait.ms, all
> messages in the batch will fail and the callback will be called.
> 3. When sending a produce request, we calculate an expireTime for the
> request that equals to the remaining expiration time for the oldest batch
> in the request.
> 4. We set the minimum of the expireTime of all inflight requests as the
> timeout in the selector poll call (so that the selector can wake up before
> the expiration time).
> 5. If the produce response can't be received within expireTime, we expire
> all batches in the produce request whose expiration time has been reached.
> For the rest of the batches, we resend them in a new produce request.
> 6. If the producer response has a retriable error, we just backoff a bit
> and then retry the produce request as today. The number of retries doesn't
> really matter now. We just keep retrying until the expiration time is
> reached. It's possible that a produce request is never retried due to
> expiration. However, this seems the right thing to do since the users want
> to timeout the message at this time.
>
> Implementation wise, there will be a bit more complexity in step 3 and 4,
> but probably not too bad. The benefit is that this is more intuitive to the
> end user.
>
> Does that sound reasonable to you?
>
> Thanks,
>
> Jun
>
>
> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <apu...@confluent.io> wrote:
> >
> > > > > There seems to be no relationship with cluster metadata
> availability
> > or
> > > > > staleness. Expiry is just based on the time since the batch has
> been
> > > > ready.
> > > > > Please correct me if I am wrong.
> > > > >
> > > >
> > > > I was not very specific about where we do expiration. I glossed over
> > some
> > > > details because (again) we've other mechanisms to detect non
> progress.
> > > The
> > > > condition (!muted.contains(tp) && (isMetadataStale ||
> > > > > cluster.leaderFor(tp) == null)) is used in
> > > > RecordAccumualtor.expiredBatches:
> > > > https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > main/java/org/apache/kafka/clients/producer/internals/
> > > > RecordAccumulator.java#L443
> > > >
> > > >
> > > > Effectively, we expire in all the following cases
> > > > 1) producer is partitioned from the brokers. When metadata age grows
> > > beyond
> > > > 3x it's max value. It's safe to say that we're not talking to the
> > brokers
> > > > at all. Report.
> > > > 2) fresh metadata && leader for a partition is not known && a batch
> is
> > > > sitting there for longer than request.timeout.ms. This is one case
> we
> > > > would
> > > > like to improve and use batch.expiry.ms because request.timeout.ms
> is
> > > too
> > > > small.
> > > > 3) fresh metadata && leader for a partition is known && batch is
> > sitting
> > > > there for longer than batch.expiry.ms. This is a new case that is
> > > > different
> > > > from #2. This is the catch-up mode case. Things are moving too
> slowly.
> > > > Pipeline SLAs are broken. Report and shutdown kmm.
> > > >
> > > > The second and the third cases are useful to a real-time app for a
> > > > completely different reason. Report, forget about the batch, and just
> > > move
> > > > on (without shutting down).
> > > >
> > > >
> > > If I understand correctly, you are talking about a fork of apache kafka
> > > which has these additional conditions? Because that check doesn't exist
> > on
> > > trunk today.
> >
> > Right. It is our internal release in LinkedIn.
> >
> > Or are you proposing to change the behavior of expiry to
> > > account for stale metadata and partitioned producers as part of this
> KIP?
> >
> >
> > No. It's our temporary solution in the absence of kip-91. Note that we
> dont
> > like increasing request.timeout.ms. Without our extra conditions our
> > batches expire too soon--a problem in kmm catchup mode.
> >
> > If we get batch.expiry.ms, we will configure it to 20 mins. maybeExpire
> > will use the config instead of r.t.ms. The extra conditions will be
> > unnecessary. All three cases shall be covered via the batch.expiry
> timeout.
> >
> > >
> > >
> >
>

Reply via email to