> > There seems to be no relationship with cluster metadata availability or
> > staleness. Expiry is just based on the time since the batch has been
> ready.
> > Please correct me if I am wrong.
> >
>
> I was not very specific about where we do expiration. I glossed over some
> details because (again) we've other mechanisms to detect non progress. The
> condition (!muted.contains(tp) && (isMetadataStale ||
> > cluster.leaderFor(tp) == null)) is used in
> RecordAccumualtor.expiredBatches:
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/producer/internals/
> RecordAccumulator.java#L443
>
>
> Effectively, we expire in all the following cases
> 1) producer is partitioned from the brokers. When metadata age grows beyond
> 3x it's max value. It's safe to say that we're not talking to the brokers
> at all. Report.
> 2) fresh metadata && leader for a partition is not known && a batch is
> sitting there for longer than request.timeout.ms. This is one case we
> would
> like to improve and use batch.expiry.ms because request.timeout.ms is too
> small.
> 3) fresh metadata && leader for a partition is known && batch is sitting
> there for longer than batch.expiry.ms. This is a new case that is
> different
> from #2. This is the catch-up mode case. Things are moving too slowly.
> Pipeline SLAs are broken. Report and shutdown kmm.
>
> The second and the third cases are useful to a real-time app for a
> completely different reason. Report, forget about the batch, and just move
> on (without shutting down).
>
>
If I understand correctly, you are talking about a fork of apache kafka
which has these additional conditions? Because that check doesn't exist on
trunk today. Or are you proposing to change the behavior of expiry to
account for stale metadata and partitioned producers as part of this KIP?