@Becket Good point about unnecessarily resetting the PID in cases where we know the request has failed. Might be worth opening a JIRA to try and improve this.
So if we expire the batch prematurely and resend all > the other batches in the same request, chances are there will be > duplicates. If we wait for the response instead, it is less likely to > introduce duplicates, and we may not need to reset the PID. Not sure I follow this. Are you assuming that we change the batch PID/sequence of the retried batches after resetting the PID? I think we probably need to ensure that when we retry a batch, we always use the same PID/sequence. By the way, as far as naming, `max.message.delivery.wait.ms` is quite a mouthful. Could we shorten it? Perhaps `delivery.timeout.ms`? -Jason On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin <becket....@gmail.com> wrote: > Hi Jun, > > If TCP timeout is longer than request.timeout.ms, the producer will always > hit request.timeout.ms before hitting TCP timeout, right? That is why we > added request.timeout.ms in the first place. > > You are right. Currently we are reset the PID and resend the batches to > avoid OutOfOrderSequenceException when the expired batches are in retry. > > This does not distinguish the reasons that caused the retry. There are two > cases: > 1. If the batch was in retry because it received an error response (e.g. > NotLeaderForPartition), we actually don't need to reset PID in this case > because we know that broker did not accept it. > 2. If the batch was in retry because it hit a timeout earlier, then we > should reset the PID (or optimistically send and only reset PID when > receive OutOfOrderSequenceException?) > Case 1 is probably the most common case, so it looks that we are resetting > the PID more often than necessary. But because in case 1 the broker does > not have the batch, there isn't much impact on resting PID and resend other > than the additional round trip. > > Now we are introducing another case: > 3. A batch is in retry because we expired an in-flight request before it > hits request.timeout.ms. > > The difference between 2 and 3 is that in case 3 likely the broker has > appended the messages. So if we expire the batch prematurely and resend all > the other batches in the same request, chances are there will be > duplicates. If we wait for the response instead, it is less likely to > introduce duplicates, and we may not need to reset the PID. > > That said, given that batch expiration is probably already rare enough, so > it may not be necessary to optimize for that. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Becket, > > > > If a message expires while it's in an inflight produce request, the > > producer will get a new PID if idempotent is enabled. This will prevent > > subsequent messages from hitting OutOfOrderSequenceException. The issue > of > > not expiring an inflight request is that if a broker server goes down > hard > > (e.g. power outage), the time that it takes for the client to detect the > > socket level error (this will be sth like 8+ minutes with the default TCP > > setting) is much longer than the default request.timeout.ms. > > > > Hi, Sumant, > > > > We can probably just default max.message.delivery.wait.ms to 30 secs, > the > > current default for request.timeout.ms. > > > > Thanks, > > > > Jun > > > > > > On Wed, Aug 23, 2017 at 3:38 PM, Sumant Tambe <suta...@gmail.com> wrote: > > > > > OK. Looks like starting the clock after closing the batch has quite a > few > > > pitfalls. I can't think of a way of to work around it without adding > yet > > > another config. So I won't discuss that here. Anyone? As I said > earlier, > > > I'm not hung up on super-accurate notification times. > > > > > > If we are going down the max.message.delievery.wait.ms route, what > would > > > be > > > the default? There seem to be a few options. > > > > > > 1. max.message.delievery.wait.ms=null. Nothing changes for those who > > don't > > > set it. I.e., batches expire after request.timeout.ms in accumulator. > If > > > they are past the accumulator stage, timeout after retries*( > > > request.timeout.ms+backoff). > > > > > > 2. max.message.delivery.wait.ms=request.timeout.ms. No obervable > > > behavioral > > > change at the accumulator level as timeout value is same as before. > > Retries > > > will be done if as long as batch is under max.message.delivery.wait.ms > . > > > However, a batch can expire just after one try. That's ok IMO because > > > request.timeout.ms tend to be large (Default 30000). > > > > > > 3. max.message.delivery.wait.ms=2*request.timeout.ms. Give opportunity > > for > > > two retries but warn that retries may not happen at all in some rare > > > cases and a batch could expire before any attempt. > > > > > > 4. max.message.delivery.wait.ms=something else (a constant?) > > > > > > Thoughts? > > > > > > On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > > > Thanks Becket, that seems reasonable. Sumant, would you be willing to > > > > update the KIP based on the discussion or are you still not > convinced? > > > > > > > > Ismael > > > > > > > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > > > In general max.message.delivery.wait.ms is a cleaner approach. > That > > > > would > > > > > make the guarantee clearer. That said, there seem subtleties in > some > > > > > scenarios: > > > > > > > > > > 1. I agree with Sumante that it is a little weird that a message > > could > > > be > > > > > expired immediately if it happens to enter a batch that is about to > > be > > > > > expired. But as Jun said, as long as we have multiple messages in a > > > > batch, > > > > > there isn't a cheap way to achieve a precise timeout. So the > question > > > > > actually becomes whether it is more user-friendly to expire early > > > (based > > > > on > > > > > the batch creation time) or expire late (based on the batch close > > > time). > > > > I > > > > > think both are acceptable. Personally I think most users do not > > really > > > > care > > > > > about expire a little late as long as it eventually expires. So I > > would > > > > use > > > > > batch close time as long as there is a bound on that. But it looks > > that > > > > we > > > > > do not really have a bound on when we will close a batch. So > > expiration > > > > > based on batch create time may be the only option if we don't want > to > > > > > introduce complexity. > > > > > > > > > > 2. If we timeout a batch in a request when it is still in flight, > the > > > end > > > > > result of that batch is unclear to the users. It would be weird > that > > > user > > > > > receive exception saying those messages are expired while they > > actually > > > > > have been sent successfully. Also if idempotence is set to true, > what > > > > would > > > > > the next sequence ID be after the expired batch? Reusing the same > > > > sequence > > > > > Id may result in data loss, and increment the sequence ID may cause > > > > > OutOfOrderSequenceException. Besides, extracting an expired batch > > from > > > a > > > > > request also introduces some complexity. Again, personally I think > it > > > is > > > > > fine to expire a little bit late. So maybe we don't need to expire > a > > > > batch > > > > > that is already in flight. In the worst case we will expire it with > > > delay > > > > > of request.timeout.ms. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma <ism...@juma.me.uk> > > > wrote: > > > > > > > > > >> Hi all, > > > > >> > > > > >> The discussion has been going on for a while, would it help to > have > > a > > > > >> call to discuss this? I'd like to start a vote soonish so that we > > can > > > > >> include this in 1.0.0. I personally prefer > > > max.message.delivery.wait.ms > > > > . > > > > >> It seems like Jun, Apurva and Jason also prefer that. Sumant, it > > seems > > > > like > > > > >> you still prefer a batch.expiry.ms, is that right? What are your > > > > >> thoughts Joel and Becket? > > > > >> > > > > >> Ismael > > > > >> > > > > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> > wrote: > > > > >> > > > > >>> Hi, Sumant, > > > > >>> > > > > >>> The semantics of linger.ms is a bit subtle. The reasoning for > the > > > > >>> current > > > > >>> implementation is the following. Let's say one sets linger.ms > to 0 > > > > (our > > > > >>> current default value). Creating a batch for every message will > be > > > bad > > > > >>> for > > > > >>> throughput. Instead, the current implementation only forms a > batch > > > when > > > > >>> the > > > > >>> batch is sendable (i.e., broker is available, inflight request > > limit > > > is > > > > >>> not > > > > >>> exceeded, etc). That way, the producer has more chance for > > batching. > > > > The > > > > >>> implication is that a batch could be closed longer than > linger.ms. > > > > >>> > > > > >>> Now, on your concern about not having a precise way to control > > delay > > > in > > > > >>> the > > > > >>> accumulator. It seems the batch.expiry.ms approach will have the > > > same > > > > >>> issue. If you start the clock when a batch is initialized, you > may > > > > expire > > > > >>> some messages in the same batch early than batch.expiry.ms. If > you > > > > start > > > > >>> the clock when the batch is closed, the expiration time could be > > > > >>> unbounded > > > > >>> because of the linger.ms implementation described above. > Starting > > > the > > > > >>> expiration clock on batch initialization will at least guarantee > > the > > > > time > > > > >>> to expire the first message is precise, which is probably good > > > enough. > > > > >>> > > > > >>> Thanks, > > > > >>> > > > > >>> Jun > > > > >>> > > > > >>> > > > > >>> > > > > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com > > > > > > wrote: > > > > >>> > > > > >>> > Question about "the closing of a batch can be delayed longer > than > > > > >>> > linger.ms": > > > > >>> > Is it possible to cause an indefinite delay? At some point > bytes > > > > limit > > > > >>> > might kick in. Also, why is closing of a batch coupled with > > > > >>> availability of > > > > >>> > its destination? In this approach a batch chosen for eviction > due > > > to > > > > >>> delay > > > > >>> > needs to "close" anyway, right (without regards to destination > > > > >>> > availability)? > > > > >>> > > > > > >>> > I'm not too worried about notifying at super-exact time > specified > > > in > > > > >>> the > > > > >>> > configs. But expiring before the full wait-span has elapsed > > sounds > > > a > > > > >>> little > > > > >>> > weird. So expiration time has a +/- spread. It works more like > a > > > hint > > > > >>> than > > > > >>> > max. So why not message.delivery.wait.hint.ms? > > > > >>> > > > > > >>> > Yeah, cancellable future will be similar in complexity. > > > > >>> > > > > > >>> > I'm unsure if max.message.delivery.wait.ms will the final nail > > for > > > > >>> > producer > > > > >>> > timeouts. We still won't have a precise way to control delay in > > > just > > > > >>> the > > > > >>> > accumulator segment. batch.expiry.ms does not try to abstract. > > > It's > > > > >>> very > > > > >>> > specific. > > > > >>> > > > > > >>> > My biggest concern at the moment is implementation complexity. > > > > >>> > > > > > >>> > At this state, I would like to encourage other independent > > > opinions. > > > > >>> > > > > > >>> > Regards, > > > > >>> > Sumant > > > > >>> > > > > > >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote: > > > > >>> > > > > > >>> > > Hi, Sumant, > > > > >>> > > > > > > >>> > > 1. Yes, it's probably reasonable to require > > > > >>> max.message.delivery.wait.ms > > > > >>> > > > > > > >>> > > linger.ms. As for retries, perhaps we can set the default > > > retries > > > > to > > > > >>> > > infinite or just ignore it. Then the latency will be bounded > by > > > > >>> > > max.message.delivery.wait.ms. request.timeout.ms is the max > > time > > > > the > > > > >>> > > request will be spending on the server. The client can expire > > an > > > > >>> inflight > > > > >>> > > request early if needed. > > > > >>> > > > > > > >>> > > 2. Well, since max.message.delivery.wait.ms specifies the > max, > > > > >>> calling > > > > >>> > the > > > > >>> > > callback a bit early may be ok? Note that > > > > >>> max.message.delivery.wait.ms > > > > >>> > > only > > > > >>> > > comes into play in the rare error case. So, I am not sure if > we > > > > need > > > > >>> to > > > > >>> > be > > > > >>> > > very precise. The issue with starting the clock on closing a > > > batch > > > > is > > > > >>> > that > > > > >>> > > currently if the leader is not available, the closing of a > > batch > > > > can > > > > >>> be > > > > >>> > > delayed longer than linger.ms. > > > > >>> > > > > > > >>> > > 4. As you said, future.get(timeout) itself doesn't solve the > > > > problem > > > > >>> > since > > > > >>> > > you still need a way to expire the record in the sender. The > > > amount > > > > >>> of > > > > >>> > work > > > > >>> > > to implement a cancellable future is probably the same? > > > > >>> > > > > > > >>> > > Overall, my concern with patch work is that we have iterated > on > > > the > > > > >>> > produce > > > > >>> > > request timeout multiple times and new issues keep coming > back. > > > > >>> Ideally, > > > > >>> > > this time, we want to have a solution that covers all cases, > > even > > > > >>> though > > > > >>> > > that requires a bit more work. > > > > >>> > > > > > > >>> > > Thanks, > > > > >>> > > > > > > >>> > > Jun > > > > >>> > > > > > > >>> > > > > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe < > > > suta...@gmail.com> > > > > >>> > wrote: > > > > >>> > > > > > > >>> > > > Hi Jun, > > > > >>> > > > > > > > >>> > > > Thanks for looking into it. > > > > >>> > > > > > > > >>> > > > Yes, we did consider this message-level timeout approach > and > > > > >>> expiring > > > > >>> > > > batches selectively in a request but rejected it due to the > > > > >>> reasons of > > > > >>> > > > added complexity without a strong benefit to counter-weigh > > > that. > > > > >>> Your > > > > >>> > > > proposal is a slight variation so I'll mention some issues > > > here. > > > > >>> > > > > > > > >>> > > > 1. It sounds like max.message.delivery.wait.ms will > overlap > > > with > > > > >>> "time > > > > >>> > > > segments" of both linger.ms and retries * ( > > request.timeout.ms > > > + > > > > >>> > > > retry.backoff.ms). In that case, which config set takes > > > > >>> precedence? It > > > > >>> > > > would not make sense to configure configs from both sets. > > > > >>> Especially, > > > > >>> > we > > > > >>> > > > discussed exhaustively internally that retries and > > > > >>> > > > max.message.delivery.wait.ms can't / shouldn't be > configured > > > > >>> together. > > > > >>> > > > Retires become moot as you already mention. I think that's > > > going > > > > >>> to be > > > > >>> > > > surprising to anyone wanting to use > > > max.message.delivery.wait.ms > > > > . > > > > >>> We > > > > >>> > > > probably need max.message.delivery.wait.ms > linger.ms or > > > > >>> something > > > > >>> > like > > > > >>> > > > that. > > > > >>> > > > > > > > >>> > > > 2. If clock starts when a batch is created and expire when > > > > >>> > > > max.message.delivery.wait.ms is over in the accumulator, > the > > > > last > > > > >>> few > > > > >>> > > > messages in the expiring batch may not have lived long > > enough. > > > As > > > > >>> the > > > > >>> > > > config seems to suggests per-message timeout, it's > incorrect > > to > > > > >>> expire > > > > >>> > > > messages prematurely. On the other hand if clock starts > after > > > > >>> batch is > > > > >>> > > > closed (which also implies that linger.ms is not covered > by > > > the > > > > >>> > > > max.message.delivery.wait.ms config), no message would be > be > > > > >>> expired > > > > >>> > too > > > > >>> > > > soon. Yeah, expiration may be little bit too late but hey, > > this > > > > >>> ain't > > > > >>> > > > real-time service. > > > > >>> > > > > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) are complex to > > > implement. > > > > >>> On the > > > > >>> > > > other hand, batch.expiry.ms is next to trivial to > implement. > > > We > > > > >>> just > > > > >>> > > pass > > > > >>> > > > the config all the way down to ProducerBatch.maybeExpire > and > > be > > > > >>> done > > > > >>> > with > > > > >>> > > > it. > > > > >>> > > > > > > > >>> > > > 4. Do you think the effect of max.message.delivery.wait.ms > > can > > > > be > > > > >>> > > > simulated > > > > >>> > > > with future.get(timeout) method? Copying excerpt from the > > > kip-91: > > > > >>> An > > > > >>> > > > end-to-end timeout may be partially emulated using the > > > > >>> > > future.get(timeout). > > > > >>> > > > The timeout must be greater than (batch.expiry.ms + > nRetries > > > * ( > > > > >>> > > > request.timeout.ms + retry.backoff.ms)). Note that when > > future > > > > >>> times > > > > >>> > > out, > > > > >>> > > > Sender may continue to send the records in the background. > To > > > > avoid > > > > >>> > that, > > > > >>> > > > implementing a cancellable future is a possibility. > > > > >>> > > > > > > > >>> > > > For simplicity, we could just implement a trivial method in > > > > >>> producer > > > > >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return a > > number > > > > >>> based > > > > >>> > on > > > > >>> > > > this formula? Users of future.get can use this timeout > value. > > > > >>> > > > > > > > >>> > > > Thoughts? > > > > >>> > > > > > > > >>> > > > Regards, > > > > >>> > > > Sumant > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe < > suta...@gmail.com> > > > > >>> wrote: > > > > >>> > > > > > > > >>> > > > > > > > > >>> > > > > Thanks for the KIP. Nice documentation on all current > > issues > > > > >>> with the > > > > >>> > > > >> timeout. > > > > >>> > > > > > > > > >>> > > > > For the KIP writeup, all credit goes to Joel Koshy. > > > > >>> > > > > > > > > >>> > > > > I'll follow up on your comments a little later. > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > >> > > > > >>> > > > >> You also brought up a good use case for timing out a > > > message. > > > > >>> For > > > > >>> > > > >> applications that collect and send sensor data to Kafka, > > if > > > > the > > > > >>> data > > > > >>> > > > can't > > > > >>> > > > >> be sent to Kafka for some reason, the application may > > prefer > > > > to > > > > >>> > buffer > > > > >>> > > > the > > > > >>> > > > >> more recent data in the accumulator. Without a timeout, > > the > > > > >>> > > accumulator > > > > >>> > > > >> will be filled with old records and new records can't be > > > > added. > > > > >>> > > > >> > > > > >>> > > > >> Your proposal makes sense for a developer who is > familiar > > > with > > > > >>> how > > > > >>> > the > > > > >>> > > > >> producer works. I am not sure if this is very intuitive > to > > > the > > > > >>> users > > > > >>> > > > since > > > > >>> > > > >> it may not be very easy for them to figure out how to > > > > configure > > > > >>> the > > > > >>> > > new > > > > >>> > > > >> knob to bound the amount of the time when a message is > > > > >>> completed. > > > > >>> > > > >> > > > > >>> > > > >> From users' perspective, Apurva's suggestion of > > > > >>> > > > >> max.message.delivery.wait.ms (which > > > > >>> > > > >> bounds the time when a message is in the accumulator to > > the > > > > time > > > > >>> > when > > > > >>> > > > the > > > > >>> > > > >> callback is called) seems more intuition. You listed > this > > in > > > > the > > > > >>> > > > rejected > > > > >>> > > > >> section since it requires additional logic to rebatch > > when a > > > > >>> produce > > > > >>> > > > >> request expires. However, this may not be too bad. The > > > > >>> following are > > > > >>> > > the > > > > >>> > > > >> things that we have to do. > > > > >>> > > > >> > > > > >>> > > > >> 1. The clock starts when a batch is created. > > > > >>> > > > >> 2. If the batch can't be drained within > > > > >>> > max.message.delivery.wait.ms, > > > > >>> > > > all > > > > >>> > > > >> messages in the batch will fail and the callback will be > > > > called. > > > > >>> > > > >> 3. When sending a produce request, we calculate an > > > expireTime > > > > >>> for > > > > >>> > the > > > > >>> > > > >> request that equals to the remaining expiration time for > > the > > > > >>> oldest > > > > >>> > > > batch > > > > >>> > > > >> in the request. > > > > >>> > > > >> 4. We set the minimum of the expireTime of all inflight > > > > >>> requests as > > > > >>> > > the > > > > >>> > > > >> timeout in the selector poll call (so that the selector > > can > > > > >>> wake up > > > > >>> > > > before > > > > >>> > > > >> the expiration time). > > > > >>> > > > >> 5. If the produce response can't be received within > > > > expireTime, > > > > >>> we > > > > >>> > > > expire > > > > >>> > > > >> all batches in the produce request whose expiration time > > has > > > > >>> been > > > > >>> > > > reached. > > > > >>> > > > >> For the rest of the batches, we resend them in a new > > produce > > > > >>> > request. > > > > >>> > > > >> 6. If the producer response has a retriable error, we > just > > > > >>> backoff a > > > > >>> > > bit > > > > >>> > > > >> and then retry the produce request as today. The number > of > > > > >>> retries > > > > >>> > > > doesn't > > > > >>> > > > >> really matter now. We just keep retrying until the > > > expiration > > > > >>> time > > > > >>> > is > > > > >>> > > > >> reached. It's possible that a produce request is never > > > retried > > > > >>> due > > > > >>> > to > > > > >>> > > > >> expiration. However, this seems the right thing to do > > since > > > > the > > > > >>> > users > > > > >>> > > > want > > > > >>> > > > >> to timeout the message at this time. > > > > >>> > > > >> > > > > >>> > > > >> Implementation wise, there will be a bit more complexity > > in > > > > >>> step 3 > > > > >>> > and > > > > >>> > > > 4, > > > > >>> > > > >> but probably not too bad. The benefit is that this is > more > > > > >>> intuitive > > > > >>> > > to > > > > >>> > > > >> the > > > > >>> > > > >> end user. > > > > >>> > > > >> > > > > >>> > > > >> Does that sound reasonable to you? > > > > >>> > > > >> > > > > >>> > > > >> Thanks, > > > > >>> > > > >> > > > > >>> > > > >> Jun > > > > >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe < > > > > >>> suta...@gmail.com> > > > > >>> > > > wrote: > > > > >>> > > > >> > > > > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta < > > > > >>> apu...@confluent.io> > > > > >>> > > > >> wrote: > > > > >>> > > > >> > > > > > >>> > > > >> > > > > There seems to be no relationship with cluster > > > > metadata > > > > >>> > > > >> availability > > > > >>> > > > >> > or > > > > >>> > > > >> > > > > staleness. Expiry is just based on the time > since > > > the > > > > >>> batch > > > > >>> > > has > > > > >>> > > > >> been > > > > >>> > > > >> > > > ready. > > > > >>> > > > >> > > > > Please correct me if I am wrong. > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > I was not very specific about where we do > > expiration. > > > I > > > > >>> > glossed > > > > >>> > > > over > > > > >>> > > > >> > some > > > > >>> > > > >> > > > details because (again) we've other mechanisms to > > > detect > > > > >>> non > > > > >>> > > > >> progress. > > > > >>> > > > >> > > The > > > > >>> > > > >> > > > condition (!muted.contains(tp) && (isMetadataStale > > || > > > > >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in > > > > >>> > > > >> > > > RecordAccumualtor.expiredBatches: > > > > >>> > > > >> > > > https://github.com/apache/ > > > kafka/blob/trunk/clients/src/ > > > > >>> > > > >> > > > main/java/org/apache/kafka/ > > > clients/producer/internals/ > > > > >>> > > > >> > > > RecordAccumulator.java#L443 > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > Effectively, we expire in all the following cases > > > > >>> > > > >> > > > 1) producer is partitioned from the brokers. When > > > > >>> metadata age > > > > >>> > > > grows > > > > >>> > > > >> > > beyond > > > > >>> > > > >> > > > 3x it's max value. It's safe to say that we're not > > > > >>> talking to > > > > >>> > > the > > > > >>> > > > >> > brokers > > > > >>> > > > >> > > > at all. Report. > > > > >>> > > > >> > > > 2) fresh metadata && leader for a partition is not > > > known > > > > >>> && a > > > > >>> > > > batch > > > > >>> > > > >> is > > > > >>> > > > >> > > > sitting there for longer than request.timeout.ms. > > > This > > > > >>> is one > > > > >>> > > > case > > > > >>> > > > >> we > > > > >>> > > > >> > > > would > > > > >>> > > > >> > > > like to improve and use batch.expiry.ms because > > > > >>> > > > request.timeout.ms > > > > >>> > > > >> is > > > > >>> > > > >> > > too > > > > >>> > > > >> > > > small. > > > > >>> > > > >> > > > 3) fresh metadata && leader for a partition is > known > > > && > > > > >>> batch > > > > >>> > is > > > > >>> > > > >> > sitting > > > > >>> > > > >> > > > there for longer than batch.expiry.ms. This is a > > new > > > > case > > > > >>> > that > > > > >>> > > is > > > > >>> > > > >> > > > different > > > > >>> > > > >> > > > from #2. This is the catch-up mode case. Things > are > > > > >>> moving too > > > > >>> > > > >> slowly. > > > > >>> > > > >> > > > Pipeline SLAs are broken. Report and shutdown kmm. > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > The second and the third cases are useful to a > > > real-time > > > > >>> app > > > > >>> > > for a > > > > >>> > > > >> > > > completely different reason. Report, forget about > > the > > > > >>> batch, > > > > >>> > and > > > > >>> > > > >> just > > > > >>> > > > >> > > move > > > > >>> > > > >> > > > on (without shutting down). > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > If I understand correctly, you are talking about a > > fork > > > of > > > > >>> > apache > > > > >>> > > > >> kafka > > > > >>> > > > >> > > which has these additional conditions? Because that > > > check > > > > >>> > doesn't > > > > >>> > > > >> exist > > > > >>> > > > >> > on > > > > >>> > > > >> > > trunk today. > > > > >>> > > > >> > > > > > >>> > > > >> > Right. It is our internal release in LinkedIn. > > > > >>> > > > >> > > > > > >>> > > > >> > Or are you proposing to change the behavior of expiry > to > > > > >>> > > > >> > > account for stale metadata and partitioned producers > > as > > > > >>> part of > > > > >>> > > this > > > > >>> > > > >> KIP? > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > No. It's our temporary solution in the absence of > > kip-91. > > > > Note > > > > >>> > that > > > > >>> > > we > > > > >>> > > > >> dont > > > > >>> > > > >> > like increasing request.timeout.ms. Without our extra > > > > >>> conditions > > > > >>> > > our > > > > >>> > > > >> > batches expire too soon--a problem in kmm catchup > mode. > > > > >>> > > > >> > > > > > >>> > > > >> > If we get batch.expiry.ms, we will configure it to 20 > > > mins. > > > > >>> > > > maybeExpire > > > > >>> > > > >> > will use the config instead of r.t.ms. The extra > > > conditions > > > > >>> will > > > > >>> > be > > > > >>> > > > >> > unnecessary. All three cases shall be covered via the > > > > >>> batch.expiry > > > > >>> > > > >> timeout. > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > >