Hi Ryan, It seems to me the by_duration policy is already perfectly suited to tackle the long downtime scenario (Case 2a).
If a user wants to avoid a massive backlog after being offline for a long time, they can configure something like by_duration=10m. This allows them to skip the massive backlog of older records, while still capturing the most recent data (including anything in newly expanded partitions). Since by_duration and latest already serve the users who want to skip data, shouldn't by_start_time remain strictly focused on users whose primary goal is data completeness (even if it means processing a backlog)? Best, Chia-Ping Ryan Leslie (BLOOMBERG/ NEW YORK) <[email protected]> 於 2026年3月16日週一 上午4:57寫道: > Hey Chia, > > That's an interesting point that there is another subcase, 2c, caused by > the user having deleted records. While the current behavior of 'latest' > would not be changed by adding something like 'auto.no.offset.reset' > mentioned earlier, the current suggestion of by_start_time would address > not only the case of newly added partitions, but this additional case when > records are deleted and the offset is invalid. It may be worth explicitly > highlighting this as another intended use case in the KIP. > > Also, have you given anymore thought to the case where a consumer crashes > during the partition expansion and its timestamp is lost or incorrect when > it restarts? If we decided to do something where the timestamp is just > stored by the broker as the creation time of the consumer group, then after > a short while (a typical retention period) it may just degenerate into > 'earliest' anyway. In that case the behavior of 2a is also implicitly > changed to 'earliest' which again the user may not want since they > intentionally had the consumer down. This is why I feel it can get kind of > confusing to try to cover all reset cases in a single tunable, > auto.offset.reset. It's because there are several different cases. > > From: [email protected] At: 03/15/26 10:44:06 UTC-4:00To: > [email protected] > Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition > expansion for dynamically added partitions > > Hi Ryan, > > Thanks for the detailed breakdown! Categorizing the scenarios into 1a, 1b, > and > 2a is incredibly helpful. We are completely aligned that Case 1b (a newly > added > partition) is the true issue and the primary motivation for this KIP. > > Regarding the "out of range" scenario (Case 2a), the situation you > described (a > consumer being offline for a long time) is very real. However, I'd like to > offer another common operational perspective: > > What if a user explicitly uses AdminClient.deleteRecords to advance the > start > offset just to skip a batch of corrupted/poison-pill records? > > In this scenario, the consumer hasn't been hanging or offline for long; it > simply hits an OffsetOutOfRange error. If we force the policy to jump to > latest, the consumer will unexpectedly skip all the remaining healthy > records > up to the High Watermark. This would cause massive, unintended data loss > for an > operation that merely meant to skip a few bad records. > > Therefore, to make the semantic guarantee accurate and predictable: if a > user > explicitly opts into policy=by_start_time (meaning "I want every record > since I > started"), our contract should be to let them process as much surviving > data as > possible. Falling back to earliest during an out-of-range event fulfills > this > promise safely. > > On 2026/03/13 23:07:04 "Ryan Leslie (BLOOMBERG/ NEW YORK)" wrote: > > Hi Chia, > > > > I have a similar thought that knowing if the consumer group is new is > helpful. It may be possible to avoid having to know the age of the > partition. > Thinking out loud a bit here, but: > > > > The two cases for auto.offset.reset are: > > > > 1. no offset > > 2. offset is out of range > > > > These have subcases such as: > > > > 1. no offset > > a. group is new > > b. partition is new > > c. offset previously existed was explicitly deleted > > > > 2. offset is out of range > > a. group has stopped consuming or had no consumers for a while, offset > is > stale > > b. invalid offset was explicitly committed > > > > Users of auto.offset.reset LATEST are perhaps mostly interested in not > processing a backlog of messages when they first start up. But if there is > a > new partition they don't want to lose data. A typical user might want: > > > > 1a - latest > > > > 1b - earliest > > > > 1c - Unclear, but this may be an advanced user. Since they may have > explicitly used AdminClient to remove the offset, they can explicitly use > it to > configure a new one > > > > 2a - latest may be appropriate - the group has most likely already lost > messages > > > > 2b - Similar to 2c, this may be an advanced user already in control over > offsets with AdminClient > > > > If we worry most about cases 1a, 1b, and 2a, only 1b is the exception > (which > is why this KIP exists). What we really want to know is if partition is > new, > but perhaps as long as we ensure that 1a and 2a are more explicitly > configurable, > > then we may get support for 1b without a partition timestamp. For > example, if > the following new config existed: > > > > Applies if the group already exists and no offset is committed for a > partition. Group existence is defined as the group already having some > offsets > committed for partitions. Otherwise, auto.offset.reset applies. > > Note: we could possibly make this not applicable to static partition > assignment cases > > > > auto.no.offset.reset = earliest > > > > Handles all other cases as it does today. auto.no.offset.resets take > precedence > > > > auto.offset.reset = latest > > > > The question is if the consumer can reliably determine if the group is > new > (has offsets or not) since there may be multiple consumers all committing > at > the same time around startup. Otherwise we would have to rely on the fact > that > the new consumer protocol centralizes things > > through the coordinator, but this may require more significant changes > as you > mentioned... > > > > I agree with Jiunn that we could consider supporting the > auto.offset.reset > timestamp separately, but I'm still concerned about the case where a > consumer > crashes during the partition expansion and it's timestamp is lost or > incorrect > when it restarts. A feature with a false sense of security can be > problematic. > > > > From: [email protected] At: 03/10/26 04:02:58 UTC-4:00To: > [email protected] > > Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition > expansion > for dynamically added partitions > > > > Hi Ryan, > > > > Thanks for sharing your thoughts! We initially tried to leave the RPCs as > > they were, but you're absolutely right that the current KIP feels a bit > > like a kludge. > > > > If we are open to touching the protocol (and can accept the potential > time > > skew between nodes), I have another idea: what if we add a creation > > timestamp to both the partition and the group? > > > > This information could be returned to the consumer via the new Heartbeat > > RPC. The async consumer could then seamlessly leverage these timestamps > to > > make a deterministic decision between using "latest" or "earliest." > > > > This approach would only work for the AsyncConsumer using subscribe() > > (since it relies on the group state), which actually serves as another > > great incentive for users to migrate to the new consumer! > > > > Thoughts on this direction? > > > > Best, > > > > Chia-Ping > > > > Ryan Leslie (BLOOMBERG/ NEW YORK) <[email protected]> 於 2026年3月10日週二 > > 上午6:38寫道: > > > > > Hi Chia and Jiunn, > > > > > > Thanks for the response. I agree that the explicit timestamp gives > enough > > > flexibility for the user to avoid the issue I mentioned with the > implicit > > > timestamp at startup not matching the time the group instance started. > > > > > > One potential downside is that the user may have to store this > timestamp > > > somewhere in between restarts. For the group instance id, that is not > > > always the case since sometimes it can be derived from the environment > such > > > as the hostname, or hardcoded in an environment variable where it > typically > > > doesn't need to be updated. > > > > > > Also, since static instances may be long-lived, preserving just the > > > initial timestamp of the first instance might feel a bit awkward, > since you > > > may end up with static instances restarting and passing timestamps that > > > could be old like two months ago. The user could instead store > something > > > like the last time of restart (and subtract metadata max age from it > to be > > > safe), but it can be considered a burden and may fail if shutdown was > not > > > graceful, i.e. a crash. > > > > > > I agree that this KIP provides a workable solution to avoid data loss > > > without protocol or broker changes, so I'm +1. But it does still feel a > > > little like a kludge since what the user really needs is an easy, > almost > > > implicit, way to not lose data when a recently added partition is > > > discovered, and currently there is no metadata for the creation time > of a > > > partition. The user may not want to even have the same policy applied > to > > > older partitions for which their offset was deleted. > > > > > > Even for a consumer group not using static membership, suppose > partitions > > > are added by a producer and new messages are published. If at the same > time > > > there is consumer group, e.g. with 1 consumer only, and it has crashed, > > > when it comes back up it may lose messages unless it knows what > timestamp > > > to pass. > > > > > > Thanks, > > > > > > Ryan > > > > > > From: [email protected] At: 03/07/26 02:46:28 UTC-5:00To: > > > [email protected] > > > Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition > > > expansion for dynamically added partitions > > > > > > Hello Sikka, > > > > > > > If consumer restarts (app crash, bounce etc.) after dynamically > adding > > > partitions > > > > it would consume unread messages from last committed offset for > existing > > > > partitions but would still miss the messages from new partition. > > > > > > For dynamic consumers, a restart inherently means leaving and rejoining > > > the > > > group > > > as a new member, so recalculating startupTimestamp = now() is > semantically > > > correct — > > > the consumer is genuinely starting fresh. > > > > > > The gap you described only applies to static membership, where the > > > consumer can > > > restart > > > without triggering a rebalance, yet the local timestamp still gets > reset. > > > For > > > this scenario, as > > > Chia suggested, we could extend the configuration to accept an explicit > > > timestamp > > > This would allow users to pin a fixed reference point across restarts, > > > effectively closing the gap for > > > static membership. For dynamic consumers, the default by_start_time > > > without an > > > explicit timestamp > > > already provides the correct behavior and a significant improvement > over > > > latest, which would miss > > > data even without a restart. > > > > > > > If the offset are deleted mention in mentioned in Scenario 2 (Log > > > truncation) > > > > how this solution would address that scenario ? > > > > > > For the log truncation scenario, when segments are deleted and the > > > consumer's > > > committed > > > offset becomes out of range, auto.offset.reset is triggered. With > latest, > > > the > > > consumer simply jumps > > > to the end of the partition, skipping all remaining available data. > With > > > by_start_time, the consumer looks up > > > the position based on the startup timestamp rather than relying on > > > offsets. > > > Since the lookup is timestamp-based, > > > it is not affected by offset invalidation due to truncation. Any data > with > > > timestamps at or after the startup time > > > will still be found and consumed. > > > > > > > Do we need to care about Clock Skew or SystemTime Issues on consumer > > > client > > > side. > > > > Should we use timestamp on the server/broker side ? > > > > > > Clock skew is a fair concern, but using a server-side timestamp does > not > > > necessarily make things safer. > > > It would mean comparing the Group Coordinator's time against the > Partition > > > Leader's time, which are often > > > different nodes. Without strict clock synchronization across the Kafka > > > cluster, > > > this "happens-before" relationship > > > remains fundamentally unpredictable. On the other hand, > auto.offset.reset > > > is > > > strictly a client-level configuration — > > > consumers within the same group can intentionally use different > policies. > > > Tying > > > the timestamp to a global server-side > > > state would be a semantic mismatch. A local timestamp aligns much > better > > > with > > > the client-level nature of this config. > > > > > > > Do you plan to have any metrics or observability when consumer resets > > > offset > > > by_start_time > > > > > > That's a great suggestion. I plan to expose the startup timestamp used > by > > > by_start_time as a client-level metric, > > > so users can easily verify which reference point the consumer is using > > > during > > > debugging. > > > > > > Best Regards, > > > Jiunn-Yang > > > > > > > Chia-Ping Tsai <[email protected]> 於 2026年3月6日 晚上10:44 寫道: > > > > > > > > Hi Ryan, > > > > > > > > That is a fantastic point. A static member restarting and capturing a > > > newer > > > local timestamp is definitely a critical edge case. > > > > > > > > Since users already need to inject a unique group.instance.id into > the > > > configuration for static members, my idea is to allow the > > > auto.offset.reset > > > policy to carry a dedicated timestamp to explicitly "lock" the startup > > > time > > > (for example, using a format like > auto.offset.reset=startup:<timestamp>). > > > > > > > > This means that if users want to leverage this new policy with static > > > membership, their deployment configuration would simply include these > two > > > specific injected values (the static ID and the locked timestamp). > > > > > > > > This approach elegantly maintains the configuration semantics at the > > > member-level, and most importantly, it avoids any need to update the > RPC > > > protocol. > > > > > > > > What do you think of this approach for the static membership > scenario? > > > > > > > > On 2026/03/05 19:45:13 "Ryan Leslie (BLOOMBERG/ NEW YORK)" wrote: > > > >> Hey Jiunn, > > > >> > > > >> Glad to see some progress around this issue. > > > >> > > > >> I had a similar thought to David, that if the time is only known > client > > > side > > > there are still edge cases for data loss. One case is static membership > > > where > > > from the perspective of a client they are free to restart their > consumer > > > task > > > without actually having left or meaningfully affected the group. > However, > > > I > > > think with the proposed implementation the timestamp is still reset > here. > > > So if > > > the restart happens just after a partition is added and published to, > but > > > before the consumer metadata refreshed, the group still runs the risk > of > > > data > > > loss. > > > >> > > > >> It could be argued that keeping the group 'stable' is a requirement > for > > > this > > > feature to work, but sometimes it's not possible to accomplish. > > > >> > > > >> From: [email protected] At: 03/05/26 14:32:20 UTC-5:00To: > > > [email protected] > > > >> Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition > > > expansion for dynamically added partitions > > > >> > > > >> Hi Jiunn, > > > >> > > > >> Thanks for the KIP! > > > >> > > > >> I was also considering this solution while we discussed in the > jira. It > > > >> seems to work in most of the cases but not in all. For instance, > let’s > > > >> imagine a partition created just before a new consumer joins or > rejoins > > > the > > > >> group and this consumer gets the new partition. In this case, the > > > consumer > > > >> will have a start time which is older than the partition creation > time. > > > >> This could also happen with the truncation case. It makes the > behavior > > > kind > > > >> of unpredictable again. > > > >> > > > >> Instead of relying on a local timestamp, one idea would to rely on a > > > >> timestamp provided by the server. For instance, we could define the > time > > > >> since the group became non-empty. This would define the subscription > > > time > > > >> for the consumer group. The downside is that it only works if the > > > consumer > > > >> is part of a group. > > > >> > > > >> In your missing semantic section, I don’t fully understand how the > 4th > > > >> point is improved by the KIP. It says start from earliest but with > the > > > >> change it would start from application start time. Could you > elaborate? > > > >> > > > >> Best, > > > >> David > > > >> > > > >> Le jeu. 5 mars 2026 à 12:47, 黃竣陽 <[email protected]> a écrit : > > > >> > > > >>> Hello chia, > > > >>> > > > >>> Thanks for your feedback, I have updated the KIP. > > > >>> > > > >>> Best Regards, > > > >>> Jiunn-Yang > > > >>> > > > >>>> Chia-Ping Tsai <[email protected]> 於 2026年3月5日 晚上7:24 寫道: > > > >>>> > > > >>>> hi Jiunn > > > >>>> > > > >>>> chia_00: Would you mind mentioning KAFKA-19236 in the KIP? It > would be > > > >>> helpful to let readers know that "Dynamically at partition > discovery" > > > is > > > >>> being tracked as a separate issue > > > >>>> > > > >>>> Best, > > > >>>> Chia-Ping > > > >>>> > > > >>>> On 2026/03/05 11:14:31 黃竣陽 wrote: > > > >>>>> Hello everyone, > > > >>>>> > > > >>>>> I would like to start a discussion on KIP-1282: Prevent data loss > > > >>> during partition expansion for dynamically added partitions > > > >>>>> <https://cwiki.apache.org/confluence/x/mIY8G> > > > >>>>> > > > >>>>> This proposal aims to introduce a new auto.offset.reset policy > > > >>> by_start_time, anchoring the > > > >>>>> offset reset to the consumer's startup timestamp rather than > > > partition > > > >>> discovery time, to prevent > > > >>>>> silent data loss during partition expansion. > > > >>>>> > > > >>>>> Best regards, > > > >>>>> Jiunn-Yang > > > >>> > > > >>> > > > >> > > > >> > > > >> > > > > > > > > > > > > > > > > > >
