Hi all, The discussion has been going on for a while, would it help to have a call to discuss this? I'd like to start a vote soonish so that we can include this in 1.0.0. I personally prefer max.message.delivery.wait.ms. It seems like Jun, Apurva and Jason also prefer that. Sumant, it seems like you still prefer a batch.expiry.ms, is that right? What are your thoughts Joel and Becket?
Ismael On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Sumant, > > The semantics of linger.ms is a bit subtle. The reasoning for the current > implementation is the following. Let's say one sets linger.ms to 0 (our > current default value). Creating a batch for every message will be bad for > throughput. Instead, the current implementation only forms a batch when the > batch is sendable (i.e., broker is available, inflight request limit is not > exceeded, etc). That way, the producer has more chance for batching. The > implication is that a batch could be closed longer than linger.ms. > > Now, on your concern about not having a precise way to control delay in the > accumulator. It seems the batch.expiry.ms approach will have the same > issue. If you start the clock when a batch is initialized, you may expire > some messages in the same batch early than batch.expiry.ms. If you start > the clock when the batch is closed, the expiration time could be unbounded > because of the linger.ms implementation described above. Starting the > expiration clock on batch initialization will at least guarantee the time > to expire the first message is precise, which is probably good enough. > > Thanks, > > Jun > > > > On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com> wrote: > > > Question about "the closing of a batch can be delayed longer than > > linger.ms": > > Is it possible to cause an indefinite delay? At some point bytes limit > > might kick in. Also, why is closing of a batch coupled with availability > of > > its destination? In this approach a batch chosen for eviction due to > delay > > needs to "close" anyway, right (without regards to destination > > availability)? > > > > I'm not too worried about notifying at super-exact time specified in the > > configs. But expiring before the full wait-span has elapsed sounds a > little > > weird. So expiration time has a +/- spread. It works more like a hint > than > > max. So why not message.delivery.wait.hint.ms? > > > > Yeah, cancellable future will be similar in complexity. > > > > I'm unsure if max.message.delivery.wait.ms will the final nail for > > producer > > timeouts. We still won't have a precise way to control delay in just the > > accumulator segment. batch.expiry.ms does not try to abstract. It's very > > specific. > > > > My biggest concern at the moment is implementation complexity. > > > > At this state, I would like to encourage other independent opinions. > > > > Regards, > > Sumant > > > > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Sumant, > > > > > > 1. Yes, it's probably reasonable to require > max.message.delivery.wait.ms > > > > > > linger.ms. As for retries, perhaps we can set the default retries to > > > infinite or just ignore it. Then the latency will be bounded by > > > max.message.delivery.wait.ms. request.timeout.ms is the max time the > > > request will be spending on the server. The client can expire an > inflight > > > request early if needed. > > > > > > 2. Well, since max.message.delivery.wait.ms specifies the max, calling > > the > > > callback a bit early may be ok? Note that max.message.delivery.wait.ms > > > only > > > comes into play in the rare error case. So, I am not sure if we need to > > be > > > very precise. The issue with starting the clock on closing a batch is > > that > > > currently if the leader is not available, the closing of a batch can be > > > delayed longer than linger.ms. > > > > > > 4. As you said, future.get(timeout) itself doesn't solve the problem > > since > > > you still need a way to expire the record in the sender. The amount of > > work > > > to implement a cancellable future is probably the same? > > > > > > Overall, my concern with patch work is that we have iterated on the > > produce > > > request timeout multiple times and new issues keep coming back. > Ideally, > > > this time, we want to have a solution that covers all cases, even > though > > > that requires a bit more work. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com> > > wrote: > > > > > > > Hi Jun, > > > > > > > > Thanks for looking into it. > > > > > > > > Yes, we did consider this message-level timeout approach and expiring > > > > batches selectively in a request but rejected it due to the reasons > of > > > > added complexity without a strong benefit to counter-weigh that. Your > > > > proposal is a slight variation so I'll mention some issues here. > > > > > > > > 1. It sounds like max.message.delivery.wait.ms will overlap with > "time > > > > segments" of both linger.ms and retries * (request.timeout.ms + > > > > retry.backoff.ms). In that case, which config set takes precedence? > It > > > > would not make sense to configure configs from both sets. Especially, > > we > > > > discussed exhaustively internally that retries and > > > > max.message.delivery.wait.ms can't / shouldn't be configured > together. > > > > Retires become moot as you already mention. I think that's going to > be > > > > surprising to anyone wanting to use max.message.delivery.wait.ms. We > > > > probably need max.message.delivery.wait.ms > linger.ms or something > > like > > > > that. > > > > > > > > 2. If clock starts when a batch is created and expire when > > > > max.message.delivery.wait.ms is over in the accumulator, the last > few > > > > messages in the expiring batch may not have lived long enough. As the > > > > config seems to suggests per-message timeout, it's incorrect to > expire > > > > messages prematurely. On the other hand if clock starts after batch > is > > > > closed (which also implies that linger.ms is not covered by the > > > > max.message.delivery.wait.ms config), no message would be be expired > > too > > > > soon. Yeah, expiration may be little bit too late but hey, this ain't > > > > real-time service. > > > > > > > > 3. I agree that steps #3, #4, (and #5) are complex to implement. On > the > > > > other hand, batch.expiry.ms is next to trivial to implement. We just > > > pass > > > > the config all the way down to ProducerBatch.maybeExpire and be done > > with > > > > it. > > > > > > > > 4. Do you think the effect of max.message.delivery.wait.ms can be > > > > simulated > > > > with future.get(timeout) method? Copying excerpt from the kip-91: An > > > > end-to-end timeout may be partially emulated using the > > > future.get(timeout). > > > > The timeout must be greater than (batch.expiry.ms + nRetries * ( > > > > request.timeout.ms + retry.backoff.ms)). Note that when future times > > > out, > > > > Sender may continue to send the records in the background. To avoid > > that, > > > > implementing a cancellable future is a possibility. > > > > > > > > For simplicity, we could just implement a trivial method in producer > > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return a number based > > on > > > > this formula? Users of future.get can use this timeout value. > > > > > > > > Thoughts? > > > > > > > > Regards, > > > > Sumant > > > > > > > > > > > > > > > > On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com> wrote: > > > > > > > > > > > > > > Thanks for the KIP. Nice documentation on all current issues with > the > > > > >> timeout. > > > > > > > > > > For the KIP writeup, all credit goes to Joel Koshy. > > > > > > > > > > I'll follow up on your comments a little later. > > > > > > > > > > > > > > >> > > > > >> You also brought up a good use case for timing out a message. For > > > > >> applications that collect and send sensor data to Kafka, if the > data > > > > can't > > > > >> be sent to Kafka for some reason, the application may prefer to > > buffer > > > > the > > > > >> more recent data in the accumulator. Without a timeout, the > > > accumulator > > > > >> will be filled with old records and new records can't be added. > > > > >> > > > > >> Your proposal makes sense for a developer who is familiar with how > > the > > > > >> producer works. I am not sure if this is very intuitive to the > users > > > > since > > > > >> it may not be very easy for them to figure out how to configure > the > > > new > > > > >> knob to bound the amount of the time when a message is completed. > > > > >> > > > > >> From users' perspective, Apurva's suggestion of > > > > >> max.message.delivery.wait.ms (which > > > > >> bounds the time when a message is in the accumulator to the time > > when > > > > the > > > > >> callback is called) seems more intuition. You listed this in the > > > > rejected > > > > >> section since it requires additional logic to rebatch when a > produce > > > > >> request expires. However, this may not be too bad. The following > are > > > the > > > > >> things that we have to do. > > > > >> > > > > >> 1. The clock starts when a batch is created. > > > > >> 2. If the batch can't be drained within > > max.message.delivery.wait.ms, > > > > all > > > > >> messages in the batch will fail and the callback will be called. > > > > >> 3. When sending a produce request, we calculate an expireTime for > > the > > > > >> request that equals to the remaining expiration time for the > oldest > > > > batch > > > > >> in the request. > > > > >> 4. We set the minimum of the expireTime of all inflight requests > as > > > the > > > > >> timeout in the selector poll call (so that the selector can wake > up > > > > before > > > > >> the expiration time). > > > > >> 5. If the produce response can't be received within expireTime, we > > > > expire > > > > >> all batches in the produce request whose expiration time has been > > > > reached. > > > > >> For the rest of the batches, we resend them in a new produce > > request. > > > > >> 6. If the producer response has a retriable error, we just > backoff a > > > bit > > > > >> and then retry the produce request as today. The number of retries > > > > doesn't > > > > >> really matter now. We just keep retrying until the expiration time > > is > > > > >> reached. It's possible that a produce request is never retried due > > to > > > > >> expiration. However, this seems the right thing to do since the > > users > > > > want > > > > >> to timeout the message at this time. > > > > >> > > > > >> Implementation wise, there will be a bit more complexity in step 3 > > and > > > > 4, > > > > >> but probably not too bad. The benefit is that this is more > intuitive > > > to > > > > >> the > > > > >> end user. > > > > >> > > > > >> Does that sound reasonable to you? > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jun > > > > >> > > > > >> > > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com> > > > > wrote: > > > > >> > > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta < > apu...@confluent.io> > > > > >> wrote: > > > > >> > > > > > >> > > > > There seems to be no relationship with cluster metadata > > > > >> availability > > > > >> > or > > > > >> > > > > staleness. Expiry is just based on the time since the > batch > > > has > > > > >> been > > > > >> > > > ready. > > > > >> > > > > Please correct me if I am wrong. > > > > >> > > > > > > > > >> > > > > > > > >> > > > I was not very specific about where we do expiration. I > > glossed > > > > over > > > > >> > some > > > > >> > > > details because (again) we've other mechanisms to detect non > > > > >> progress. > > > > >> > > The > > > > >> > > > condition (!muted.contains(tp) && (isMetadataStale || > > > > >> > > > > cluster.leaderFor(tp) == null)) is used in > > > > >> > > > RecordAccumualtor.expiredBatches: > > > > >> > > > https://github.com/apache/kafka/blob/trunk/clients/src/ > > > > >> > > > main/java/org/apache/kafka/clients/producer/internals/ > > > > >> > > > RecordAccumulator.java#L443 > > > > >> > > > > > > > >> > > > > > > > >> > > > Effectively, we expire in all the following cases > > > > >> > > > 1) producer is partitioned from the brokers. When metadata > age > > > > grows > > > > >> > > beyond > > > > >> > > > 3x it's max value. It's safe to say that we're not talking > to > > > the > > > > >> > brokers > > > > >> > > > at all. Report. > > > > >> > > > 2) fresh metadata && leader for a partition is not known && > a > > > > batch > > > > >> is > > > > >> > > > sitting there for longer than request.timeout.ms. This is > one > > > > case > > > > >> we > > > > >> > > > would > > > > >> > > > like to improve and use batch.expiry.ms because > > > > request.timeout.ms > > > > >> is > > > > >> > > too > > > > >> > > > small. > > > > >> > > > 3) fresh metadata && leader for a partition is known && > batch > > is > > > > >> > sitting > > > > >> > > > there for longer than batch.expiry.ms. This is a new case > > that > > > is > > > > >> > > > different > > > > >> > > > from #2. This is the catch-up mode case. Things are moving > too > > > > >> slowly. > > > > >> > > > Pipeline SLAs are broken. Report and shutdown kmm. > > > > >> > > > > > > > >> > > > The second and the third cases are useful to a real-time app > > > for a > > > > >> > > > completely different reason. Report, forget about the batch, > > and > > > > >> just > > > > >> > > move > > > > >> > > > on (without shutting down). > > > > >> > > > > > > > >> > > > > > > > >> > > If I understand correctly, you are talking about a fork of > > apache > > > > >> kafka > > > > >> > > which has these additional conditions? Because that check > > doesn't > > > > >> exist > > > > >> > on > > > > >> > > trunk today. > > > > >> > > > > > >> > Right. It is our internal release in LinkedIn. > > > > >> > > > > > >> > Or are you proposing to change the behavior of expiry to > > > > >> > > account for stale metadata and partitioned producers as part > of > > > this > > > > >> KIP? > > > > >> > > > > > >> > > > > > >> > No. It's our temporary solution in the absence of kip-91. Note > > that > > > we > > > > >> dont > > > > >> > like increasing request.timeout.ms. Without our extra > conditions > > > our > > > > >> > batches expire too soon--a problem in kmm catchup mode. > > > > >> > > > > > >> > If we get batch.expiry.ms, we will configure it to 20 mins. > > > > maybeExpire > > > > >> > will use the config instead of r.t.ms. The extra conditions > will > > be > > > > >> > unnecessary. All three cases shall be covered via the > batch.expiry > > > > >> timeout. > > > > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > >