OK. Looks like starting the clock after closing the batch has quite a few pitfalls. I can't think of a way of to work around it without adding yet another config. So I won't discuss that here. Anyone? As I said earlier, I'm not hung up on super-accurate notification times.
If we are going down the max.message.delievery.wait.ms route, what would be the default? There seem to be a few options. 1. max.message.delievery.wait.ms=null. Nothing changes for those who don't set it. I.e., batches expire after request.timeout.ms in accumulator. If they are past the accumulator stage, timeout after retries*( request.timeout.ms+backoff). 2. max.message.delivery.wait.ms=request.timeout.ms. No obervable behavioral change at the accumulator level as timeout value is same as before. Retries will be done if as long as batch is under max.message.delivery.wait.ms. However, a batch can expire just after one try. That's ok IMO because request.timeout.ms tend to be large (Default 30000). 3. max.message.delivery.wait.ms=2*request.timeout.ms. Give opportunity for two retries but warn that retries may not happen at all in some rare cases and a batch could expire before any attempt. 4. max.message.delivery.wait.ms=something else (a constant?) Thoughts? On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk> wrote: > Thanks Becket, that seems reasonable. Sumant, would you be willing to > update the KIP based on the discussion or are you still not convinced? > > Ismael > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin <becket....@gmail.com> wrote: > > > In general max.message.delivery.wait.ms is a cleaner approach. That > would > > make the guarantee clearer. That said, there seem subtleties in some > > scenarios: > > > > 1. I agree with Sumante that it is a little weird that a message could be > > expired immediately if it happens to enter a batch that is about to be > > expired. But as Jun said, as long as we have multiple messages in a > batch, > > there isn't a cheap way to achieve a precise timeout. So the question > > actually becomes whether it is more user-friendly to expire early (based > on > > the batch creation time) or expire late (based on the batch close time). > I > > think both are acceptable. Personally I think most users do not really > care > > about expire a little late as long as it eventually expires. So I would > use > > batch close time as long as there is a bound on that. But it looks that > we > > do not really have a bound on when we will close a batch. So expiration > > based on batch create time may be the only option if we don't want to > > introduce complexity. > > > > 2. If we timeout a batch in a request when it is still in flight, the end > > result of that batch is unclear to the users. It would be weird that user > > receive exception saying those messages are expired while they actually > > have been sent successfully. Also if idempotence is set to true, what > would > > the next sequence ID be after the expired batch? Reusing the same > sequence > > Id may result in data loss, and increment the sequence ID may cause > > OutOfOrderSequenceException. Besides, extracting an expired batch from a > > request also introduces some complexity. Again, personally I think it is > > fine to expire a little bit late. So maybe we don't need to expire a > batch > > that is already in flight. In the worst case we will expire it with delay > > of request.timeout.ms. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > >> Hi all, > >> > >> The discussion has been going on for a while, would it help to have a > >> call to discuss this? I'd like to start a vote soonish so that we can > >> include this in 1.0.0. I personally prefer max.message.delivery.wait.ms > . > >> It seems like Jun, Apurva and Jason also prefer that. Sumant, it seems > like > >> you still prefer a batch.expiry.ms, is that right? What are your > >> thoughts Joel and Becket? > >> > >> Ismael > >> > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> wrote: > >> > >>> Hi, Sumant, > >>> > >>> The semantics of linger.ms is a bit subtle. The reasoning for the > >>> current > >>> implementation is the following. Let's say one sets linger.ms to 0 > (our > >>> current default value). Creating a batch for every message will be bad > >>> for > >>> throughput. Instead, the current implementation only forms a batch when > >>> the > >>> batch is sendable (i.e., broker is available, inflight request limit is > >>> not > >>> exceeded, etc). That way, the producer has more chance for batching. > The > >>> implication is that a batch could be closed longer than linger.ms. > >>> > >>> Now, on your concern about not having a precise way to control delay in > >>> the > >>> accumulator. It seems the batch.expiry.ms approach will have the same > >>> issue. If you start the clock when a batch is initialized, you may > expire > >>> some messages in the same batch early than batch.expiry.ms. If you > start > >>> the clock when the batch is closed, the expiration time could be > >>> unbounded > >>> because of the linger.ms implementation described above. Starting the > >>> expiration clock on batch initialization will at least guarantee the > time > >>> to expire the first message is precise, which is probably good enough. > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> > >>> > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com> > wrote: > >>> > >>> > Question about "the closing of a batch can be delayed longer than > >>> > linger.ms": > >>> > Is it possible to cause an indefinite delay? At some point bytes > limit > >>> > might kick in. Also, why is closing of a batch coupled with > >>> availability of > >>> > its destination? In this approach a batch chosen for eviction due to > >>> delay > >>> > needs to "close" anyway, right (without regards to destination > >>> > availability)? > >>> > > >>> > I'm not too worried about notifying at super-exact time specified in > >>> the > >>> > configs. But expiring before the full wait-span has elapsed sounds a > >>> little > >>> > weird. So expiration time has a +/- spread. It works more like a hint > >>> than > >>> > max. So why not message.delivery.wait.hint.ms? > >>> > > >>> > Yeah, cancellable future will be similar in complexity. > >>> > > >>> > I'm unsure if max.message.delivery.wait.ms will the final nail for > >>> > producer > >>> > timeouts. We still won't have a precise way to control delay in just > >>> the > >>> > accumulator segment. batch.expiry.ms does not try to abstract. It's > >>> very > >>> > specific. > >>> > > >>> > My biggest concern at the moment is implementation complexity. > >>> > > >>> > At this state, I would like to encourage other independent opinions. > >>> > > >>> > Regards, > >>> > Sumant > >>> > > >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote: > >>> > > >>> > > Hi, Sumant, > >>> > > > >>> > > 1. Yes, it's probably reasonable to require > >>> max.message.delivery.wait.ms > >>> > > > >>> > > linger.ms. As for retries, perhaps we can set the default retries > to > >>> > > infinite or just ignore it. Then the latency will be bounded by > >>> > > max.message.delivery.wait.ms. request.timeout.ms is the max time > the > >>> > > request will be spending on the server. The client can expire an > >>> inflight > >>> > > request early if needed. > >>> > > > >>> > > 2. Well, since max.message.delivery.wait.ms specifies the max, > >>> calling > >>> > the > >>> > > callback a bit early may be ok? Note that > >>> max.message.delivery.wait.ms > >>> > > only > >>> > > comes into play in the rare error case. So, I am not sure if we > need > >>> to > >>> > be > >>> > > very precise. The issue with starting the clock on closing a batch > is > >>> > that > >>> > > currently if the leader is not available, the closing of a batch > can > >>> be > >>> > > delayed longer than linger.ms. > >>> > > > >>> > > 4. As you said, future.get(timeout) itself doesn't solve the > problem > >>> > since > >>> > > you still need a way to expire the record in the sender. The amount > >>> of > >>> > work > >>> > > to implement a cancellable future is probably the same? > >>> > > > >>> > > Overall, my concern with patch work is that we have iterated on the > >>> > produce > >>> > > request timeout multiple times and new issues keep coming back. > >>> Ideally, > >>> > > this time, we want to have a solution that covers all cases, even > >>> though > >>> > > that requires a bit more work. > >>> > > > >>> > > Thanks, > >>> > > > >>> > > Jun > >>> > > > >>> > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com> > >>> > wrote: > >>> > > > >>> > > > Hi Jun, > >>> > > > > >>> > > > Thanks for looking into it. > >>> > > > > >>> > > > Yes, we did consider this message-level timeout approach and > >>> expiring > >>> > > > batches selectively in a request but rejected it due to the > >>> reasons of > >>> > > > added complexity without a strong benefit to counter-weigh that. > >>> Your > >>> > > > proposal is a slight variation so I'll mention some issues here. > >>> > > > > >>> > > > 1. It sounds like max.message.delivery.wait.ms will overlap with > >>> "time > >>> > > > segments" of both linger.ms and retries * (request.timeout.ms + > >>> > > > retry.backoff.ms). In that case, which config set takes > >>> precedence? It > >>> > > > would not make sense to configure configs from both sets. > >>> Especially, > >>> > we > >>> > > > discussed exhaustively internally that retries and > >>> > > > max.message.delivery.wait.ms can't / shouldn't be configured > >>> together. > >>> > > > Retires become moot as you already mention. I think that's going > >>> to be > >>> > > > surprising to anyone wanting to use max.message.delivery.wait.ms > . > >>> We > >>> > > > probably need max.message.delivery.wait.ms > linger.ms or > >>> something > >>> > like > >>> > > > that. > >>> > > > > >>> > > > 2. If clock starts when a batch is created and expire when > >>> > > > max.message.delivery.wait.ms is over in the accumulator, the > last > >>> few > >>> > > > messages in the expiring batch may not have lived long enough. As > >>> the > >>> > > > config seems to suggests per-message timeout, it's incorrect to > >>> expire > >>> > > > messages prematurely. On the other hand if clock starts after > >>> batch is > >>> > > > closed (which also implies that linger.ms is not covered by the > >>> > > > max.message.delivery.wait.ms config), no message would be be > >>> expired > >>> > too > >>> > > > soon. Yeah, expiration may be little bit too late but hey, this > >>> ain't > >>> > > > real-time service. > >>> > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) are complex to implement. > >>> On the > >>> > > > other hand, batch.expiry.ms is next to trivial to implement. We > >>> just > >>> > > pass > >>> > > > the config all the way down to ProducerBatch.maybeExpire and be > >>> done > >>> > with > >>> > > > it. > >>> > > > > >>> > > > 4. Do you think the effect of max.message.delivery.wait.ms can > be > >>> > > > simulated > >>> > > > with future.get(timeout) method? Copying excerpt from the kip-91: > >>> An > >>> > > > end-to-end timeout may be partially emulated using the > >>> > > future.get(timeout). > >>> > > > The timeout must be greater than (batch.expiry.ms + nRetries * ( > >>> > > > request.timeout.ms + retry.backoff.ms)). Note that when future > >>> times > >>> > > out, > >>> > > > Sender may continue to send the records in the background. To > avoid > >>> > that, > >>> > > > implementing a cancellable future is a possibility. > >>> > > > > >>> > > > For simplicity, we could just implement a trivial method in > >>> producer > >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return a number > >>> based > >>> > on > >>> > > > this formula? Users of future.get can use this timeout value. > >>> > > > > >>> > > > Thoughts? > >>> > > > > >>> > > > Regards, > >>> > > > Sumant > >>> > > > > >>> > > > > >>> > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com> > >>> wrote: > >>> > > > > >>> > > > > > >>> > > > > Thanks for the KIP. Nice documentation on all current issues > >>> with the > >>> > > > >> timeout. > >>> > > > > > >>> > > > > For the KIP writeup, all credit goes to Joel Koshy. > >>> > > > > > >>> > > > > I'll follow up on your comments a little later. > >>> > > > > > >>> > > > > > >>> > > > >> > >>> > > > >> You also brought up a good use case for timing out a message. > >>> For > >>> > > > >> applications that collect and send sensor data to Kafka, if > the > >>> data > >>> > > > can't > >>> > > > >> be sent to Kafka for some reason, the application may prefer > to > >>> > buffer > >>> > > > the > >>> > > > >> more recent data in the accumulator. Without a timeout, the > >>> > > accumulator > >>> > > > >> will be filled with old records and new records can't be > added. > >>> > > > >> > >>> > > > >> Your proposal makes sense for a developer who is familiar with > >>> how > >>> > the > >>> > > > >> producer works. I am not sure if this is very intuitive to the > >>> users > >>> > > > since > >>> > > > >> it may not be very easy for them to figure out how to > configure > >>> the > >>> > > new > >>> > > > >> knob to bound the amount of the time when a message is > >>> completed. > >>> > > > >> > >>> > > > >> From users' perspective, Apurva's suggestion of > >>> > > > >> max.message.delivery.wait.ms (which > >>> > > > >> bounds the time when a message is in the accumulator to the > time > >>> > when > >>> > > > the > >>> > > > >> callback is called) seems more intuition. You listed this in > the > >>> > > > rejected > >>> > > > >> section since it requires additional logic to rebatch when a > >>> produce > >>> > > > >> request expires. However, this may not be too bad. The > >>> following are > >>> > > the > >>> > > > >> things that we have to do. > >>> > > > >> > >>> > > > >> 1. The clock starts when a batch is created. > >>> > > > >> 2. If the batch can't be drained within > >>> > max.message.delivery.wait.ms, > >>> > > > all > >>> > > > >> messages in the batch will fail and the callback will be > called. > >>> > > > >> 3. When sending a produce request, we calculate an expireTime > >>> for > >>> > the > >>> > > > >> request that equals to the remaining expiration time for the > >>> oldest > >>> > > > batch > >>> > > > >> in the request. > >>> > > > >> 4. We set the minimum of the expireTime of all inflight > >>> requests as > >>> > > the > >>> > > > >> timeout in the selector poll call (so that the selector can > >>> wake up > >>> > > > before > >>> > > > >> the expiration time). > >>> > > > >> 5. If the produce response can't be received within > expireTime, > >>> we > >>> > > > expire > >>> > > > >> all batches in the produce request whose expiration time has > >>> been > >>> > > > reached. > >>> > > > >> For the rest of the batches, we resend them in a new produce > >>> > request. > >>> > > > >> 6. If the producer response has a retriable error, we just > >>> backoff a > >>> > > bit > >>> > > > >> and then retry the produce request as today. The number of > >>> retries > >>> > > > doesn't > >>> > > > >> really matter now. We just keep retrying until the expiration > >>> time > >>> > is > >>> > > > >> reached. It's possible that a produce request is never retried > >>> due > >>> > to > >>> > > > >> expiration. However, this seems the right thing to do since > the > >>> > users > >>> > > > want > >>> > > > >> to timeout the message at this time. > >>> > > > >> > >>> > > > >> Implementation wise, there will be a bit more complexity in > >>> step 3 > >>> > and > >>> > > > 4, > >>> > > > >> but probably not too bad. The benefit is that this is more > >>> intuitive > >>> > > to > >>> > > > >> the > >>> > > > >> end user. > >>> > > > >> > >>> > > > >> Does that sound reasonable to you? > >>> > > > >> > >>> > > > >> Thanks, > >>> > > > >> > >>> > > > >> Jun > >>> > > > >> > >>> > > > >> > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe < > >>> suta...@gmail.com> > >>> > > > wrote: > >>> > > > >> > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta < > >>> apu...@confluent.io> > >>> > > > >> wrote: > >>> > > > >> > > >>> > > > >> > > > > There seems to be no relationship with cluster > metadata > >>> > > > >> availability > >>> > > > >> > or > >>> > > > >> > > > > staleness. Expiry is just based on the time since the > >>> batch > >>> > > has > >>> > > > >> been > >>> > > > >> > > > ready. > >>> > > > >> > > > > Please correct me if I am wrong. > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > I was not very specific about where we do expiration. I > >>> > glossed > >>> > > > over > >>> > > > >> > some > >>> > > > >> > > > details because (again) we've other mechanisms to detect > >>> non > >>> > > > >> progress. > >>> > > > >> > > The > >>> > > > >> > > > condition (!muted.contains(tp) && (isMetadataStale || > >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in > >>> > > > >> > > > RecordAccumualtor.expiredBatches: > >>> > > > >> > > > https://github.com/apache/kafka/blob/trunk/clients/src/ > >>> > > > >> > > > main/java/org/apache/kafka/clients/producer/internals/ > >>> > > > >> > > > RecordAccumulator.java#L443 > >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > > Effectively, we expire in all the following cases > >>> > > > >> > > > 1) producer is partitioned from the brokers. When > >>> metadata age > >>> > > > grows > >>> > > > >> > > beyond > >>> > > > >> > > > 3x it's max value. It's safe to say that we're not > >>> talking to > >>> > > the > >>> > > > >> > brokers > >>> > > > >> > > > at all. Report. > >>> > > > >> > > > 2) fresh metadata && leader for a partition is not known > >>> && a > >>> > > > batch > >>> > > > >> is > >>> > > > >> > > > sitting there for longer than request.timeout.ms. This > >>> is one > >>> > > > case > >>> > > > >> we > >>> > > > >> > > > would > >>> > > > >> > > > like to improve and use batch.expiry.ms because > >>> > > > request.timeout.ms > >>> > > > >> is > >>> > > > >> > > too > >>> > > > >> > > > small. > >>> > > > >> > > > 3) fresh metadata && leader for a partition is known && > >>> batch > >>> > is > >>> > > > >> > sitting > >>> > > > >> > > > there for longer than batch.expiry.ms. This is a new > case > >>> > that > >>> > > is > >>> > > > >> > > > different > >>> > > > >> > > > from #2. This is the catch-up mode case. Things are > >>> moving too > >>> > > > >> slowly. > >>> > > > >> > > > Pipeline SLAs are broken. Report and shutdown kmm. > >>> > > > >> > > > > >>> > > > >> > > > The second and the third cases are useful to a real-time > >>> app > >>> > > for a > >>> > > > >> > > > completely different reason. Report, forget about the > >>> batch, > >>> > and > >>> > > > >> just > >>> > > > >> > > move > >>> > > > >> > > > on (without shutting down). > >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > If I understand correctly, you are talking about a fork of > >>> > apache > >>> > > > >> kafka > >>> > > > >> > > which has these additional conditions? Because that check > >>> > doesn't > >>> > > > >> exist > >>> > > > >> > on > >>> > > > >> > > trunk today. > >>> > > > >> > > >>> > > > >> > Right. It is our internal release in LinkedIn. > >>> > > > >> > > >>> > > > >> > Or are you proposing to change the behavior of expiry to > >>> > > > >> > > account for stale metadata and partitioned producers as > >>> part of > >>> > > this > >>> > > > >> KIP? > >>> > > > >> > > >>> > > > >> > > >>> > > > >> > No. It's our temporary solution in the absence of kip-91. > Note > >>> > that > >>> > > we > >>> > > > >> dont > >>> > > > >> > like increasing request.timeout.ms. Without our extra > >>> conditions > >>> > > our > >>> > > > >> > batches expire too soon--a problem in kmm catchup mode. > >>> > > > >> > > >>> > > > >> > If we get batch.expiry.ms, we will configure it to 20 mins. > >>> > > > maybeExpire > >>> > > > >> > will use the config instead of r.t.ms. The extra conditions > >>> will > >>> > be > >>> > > > >> > unnecessary. All three cases shall be covered via the > >>> batch.expiry > >>> > > > >> timeout. > >>> > > > >> > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> > >> > > >