> Thanks for the KIP. Nice documentation on all current issues with the > timeout.
For the KIP writeup, all credit goes to Joel Koshy. I'll follow up on your comments a little later. > > You also brought up a good use case for timing out a message. For > applications that collect and send sensor data to Kafka, if the data can't > be sent to Kafka for some reason, the application may prefer to buffer the > more recent data in the accumulator. Without a timeout, the accumulator > will be filled with old records and new records can't be added. > > Your proposal makes sense for a developer who is familiar with how the > producer works. I am not sure if this is very intuitive to the users since > it may not be very easy for them to figure out how to configure the new > knob to bound the amount of the time when a message is completed. > > From users' perspective, Apurva's suggestion of > max.message.delivery.wait.ms (which > bounds the time when a message is in the accumulator to the time when the > callback is called) seems more intuition. You listed this in the rejected > section since it requires additional logic to rebatch when a produce > request expires. However, this may not be too bad. The following are the > things that we have to do. > > 1. The clock starts when a batch is created. > 2. If the batch can't be drained within max.message.delivery.wait.ms, all > messages in the batch will fail and the callback will be called. > 3. When sending a produce request, we calculate an expireTime for the > request that equals to the remaining expiration time for the oldest batch > in the request. > 4. We set the minimum of the expireTime of all inflight requests as the > timeout in the selector poll call (so that the selector can wake up before > the expiration time). > 5. If the produce response can't be received within expireTime, we expire > all batches in the produce request whose expiration time has been reached. > For the rest of the batches, we resend them in a new produce request. > 6. If the producer response has a retriable error, we just backoff a bit > and then retry the produce request as today. The number of retries doesn't > really matter now. We just keep retrying until the expiration time is > reached. It's possible that a produce request is never retried due to > expiration. However, this seems the right thing to do since the users want > to timeout the message at this time. > > Implementation wise, there will be a bit more complexity in step 3 and 4, > but probably not too bad. The benefit is that this is more intuitive to the > end user. > > Does that sound reasonable to you? > > Thanks, > > Jun > > > On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com> wrote: > > > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <apu...@confluent.io> wrote: > > > > > > > There seems to be no relationship with cluster metadata > availability > > or > > > > > staleness. Expiry is just based on the time since the batch has > been > > > > ready. > > > > > Please correct me if I am wrong. > > > > > > > > > > > > > I was not very specific about where we do expiration. I glossed over > > some > > > > details because (again) we've other mechanisms to detect non > progress. > > > The > > > > condition (!muted.contains(tp) && (isMetadataStale || > > > > > cluster.leaderFor(tp) == null)) is used in > > > > RecordAccumualtor.expiredBatches: > > > > https://github.com/apache/kafka/blob/trunk/clients/src/ > > > > main/java/org/apache/kafka/clients/producer/internals/ > > > > RecordAccumulator.java#L443 > > > > > > > > > > > > Effectively, we expire in all the following cases > > > > 1) producer is partitioned from the brokers. When metadata age grows > > > beyond > > > > 3x it's max value. It's safe to say that we're not talking to the > > brokers > > > > at all. Report. > > > > 2) fresh metadata && leader for a partition is not known && a batch > is > > > > sitting there for longer than request.timeout.ms. This is one case > we > > > > would > > > > like to improve and use batch.expiry.ms because request.timeout.ms > is > > > too > > > > small. > > > > 3) fresh metadata && leader for a partition is known && batch is > > sitting > > > > there for longer than batch.expiry.ms. This is a new case that is > > > > different > > > > from #2. This is the catch-up mode case. Things are moving too > slowly. > > > > Pipeline SLAs are broken. Report and shutdown kmm. > > > > > > > > The second and the third cases are useful to a real-time app for a > > > > completely different reason. Report, forget about the batch, and just > > > move > > > > on (without shutting down). > > > > > > > > > > > If I understand correctly, you are talking about a fork of apache kafka > > > which has these additional conditions? Because that check doesn't exist > > on > > > trunk today. > > > > Right. It is our internal release in LinkedIn. > > > > Or are you proposing to change the behavior of expiry to > > > account for stale metadata and partitioned producers as part of this > KIP? > > > > > > No. It's our temporary solution in the absence of kip-91. Note that we > dont > > like increasing request.timeout.ms. Without our extra conditions our > > batches expire too soon--a problem in kmm catchup mode. > > > > If we get batch.expiry.ms, we will configure it to 20 mins. maybeExpire > > will use the config instead of r.t.ms. The extra conditions will be > > unnecessary. All three cases shall be covered via the batch.expiry > timeout. > > > > > > > > > > >