Hi Ryan,

It seems to me the by_duration policy is already perfectly suited to tackle
the long downtime scenario (Case 2a).

If a user wants to avoid a massive backlog after being offline for a long
time, they can configure something like by_duration=10m.

This allows them to skip the massive backlog of older records, while still
capturing the most recent data (including anything in newly expanded
partitions).

Since by_duration and latest already serve the users who want to skip data,
shouldn't by_start_time remain strictly focused on users whose primary goal
is data completeness (even if it means processing a backlog)?

Best,
Chia-Ping

Ryan Leslie (BLOOMBERG/ NEW YORK) <[email protected]> 於 2026年3月16日週一
上午4:57寫道:

> Hey Chia,
>
> That's an interesting point that there is another subcase, 2c, caused by
> the user having deleted records. While the current behavior of 'latest'
> would not be changed by adding something like 'auto.no.offset.reset'
> mentioned earlier, the current suggestion of by_start_time would address
> not only the case of newly added partitions, but this additional case when
> records are deleted and the offset is invalid. It may be worth explicitly
> highlighting this as another intended use case in the KIP.
>
> Also, have you given anymore thought to the case where a consumer crashes
> during the partition expansion and its timestamp is lost or incorrect when
> it restarts? If we decided to do something where the timestamp is just
> stored by the broker as the creation time of the consumer group, then after
> a short while (a typical retention period) it may just degenerate into
> 'earliest' anyway. In that case the behavior of 2a is also implicitly
> changed to 'earliest' which again the user may not want since they
> intentionally had the consumer down. This is why I feel it can get kind of
> confusing to try to cover all reset cases in a single tunable,
> auto.offset.reset. It's because there are several different cases.
>
> From: [email protected] At: 03/15/26 10:44:06 UTC-4:00To:
> [email protected]
> Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition
> expansion for dynamically added partitions
>
> Hi Ryan,
>
> Thanks for the detailed breakdown! Categorizing the scenarios into 1a, 1b,
> and
> 2a is incredibly helpful. We are completely aligned that Case 1b (a newly
> added
> partition) is the true issue and the primary motivation for this KIP.
>
> Regarding the "out of range" scenario (Case 2a), the situation you
> described (a
> consumer being offline for a long time) is very real. However, I'd like to
> offer another common operational perspective:
>
> What if a user explicitly uses AdminClient.deleteRecords to advance the
> start
> offset just to skip a batch of corrupted/poison-pill records?
>
> In this scenario, the consumer hasn't been hanging or offline for long; it
> simply hits an OffsetOutOfRange error. If we force the policy to jump to
> latest, the consumer will unexpectedly skip all the remaining healthy
> records
> up to the High Watermark. This would cause massive, unintended data loss
> for an
> operation that merely meant to skip a few bad records.
>
> Therefore, to make the semantic guarantee accurate and predictable: if a
> user
> explicitly opts into policy=by_start_time (meaning "I want every record
> since I
> started"), our contract should be to let them process as much surviving
> data as
> possible. Falling back to earliest during an out-of-range event fulfills
> this
> promise safely.
>
> On 2026/03/13 23:07:04 "Ryan Leslie (BLOOMBERG/ NEW YORK)" wrote:
> > Hi Chia,
> >
> > I have a similar thought that knowing if the consumer group is new is
> helpful. It may be possible to avoid having to know the age of the
> partition.
> Thinking out loud a bit here, but:
> >
> > The two cases for auto.offset.reset are:
> >
> > 1. no offset
> > 2. offset is out of range
> >
> > These have subcases such as:
> >
> > 1. no offset
> >   a. group is new
> >   b. partition is new
> >   c. offset previously existed was explicitly deleted
> >
> > 2. offset is out of range
> >   a. group has stopped consuming or had no consumers for a while, offset
> is
> stale
> >   b. invalid offset was explicitly committed
> >
> > Users of auto.offset.reset LATEST are perhaps mostly interested in not
> processing a backlog of messages when they first start up. But if there is
> a
> new partition they don't want to lose data. A typical user might want:
> >
> > 1a - latest
> >
> > 1b - earliest
> >
> > 1c - Unclear, but this may be an advanced user. Since they may have
> explicitly used AdminClient to remove the offset, they can explicitly use
> it to
> configure a new one
> >
> > 2a - latest may be appropriate - the group has most likely already lost
> messages
> >
> > 2b - Similar to 2c, this may be an advanced user already in control over
> offsets with AdminClient
> >
> > If we worry most about cases 1a, 1b, and 2a, only 1b is the exception
> (which
> is why this KIP exists). What we really want to know is if partition is
> new,
> but perhaps as long as we ensure that 1a and 2a are more explicitly
> configurable,
> > then we may get support for 1b without a partition timestamp. For
> example, if
> the following new config existed:
> >
> > Applies if the group already exists and no offset is committed for a
> partition. Group existence is defined as the group already having some
> offsets
> committed for partitions. Otherwise, auto.offset.reset applies.
> > Note: we could possibly make this not applicable to static partition
> assignment cases
> >
> > auto.no.offset.reset = earliest
> >
> > Handles all other cases as it does today. auto.no.offset.resets take
> precedence
> >
> > auto.offset.reset = latest
> >
> > The question is if the consumer can reliably determine if the group is
> new
> (has offsets or not) since there may be multiple consumers all committing
> at
> the same time around startup. Otherwise we would have to rely on the fact
> that
> the new consumer protocol centralizes things
> > through the coordinator, but this may require more significant changes
> as you
> mentioned...
> >
> > I agree with Jiunn that we could consider supporting the
> auto.offset.reset
> timestamp separately, but I'm still concerned about the case where a
> consumer
> crashes during the partition expansion and it's timestamp is lost or
> incorrect
> when it restarts. A feature with a false sense of security can be
> problematic.
> >
> > From: [email protected] At: 03/10/26 04:02:58 UTC-4:00To:
> [email protected]
> > Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition
> expansion
> for dynamically added partitions
> >
> > Hi Ryan,
> >
> > Thanks for sharing your thoughts! We initially tried to leave the RPCs as
> > they were, but you're absolutely right that the current KIP feels a bit
> > like a kludge.
> >
> > If we are open to touching the protocol (and can accept the potential
> time
> > skew between nodes), I have another idea: what if we add a creation
> > timestamp to both the partition and the group?
> >
> > This information could be returned to the consumer via the new Heartbeat
> > RPC. The async consumer could then seamlessly leverage these timestamps
> to
> > make a deterministic decision between using "latest" or "earliest."
> >
> > This approach would only work for the AsyncConsumer using subscribe()
> > (since it relies on the group state), which actually serves as another
> > great incentive for users to migrate to the new consumer!
> >
> > Thoughts on this direction?
> >
> > Best,
> >
> > Chia-Ping
> >
> > Ryan Leslie (BLOOMBERG/ NEW YORK) <[email protected]> 於 2026年3月10日週二
> > 上午6:38寫道:
> >
> > > Hi Chia and Jiunn,
> > >
> > > Thanks for the response. I agree that the explicit timestamp gives
> enough
> > > flexibility for the user to avoid the issue I mentioned with the
> implicit
> > > timestamp at startup not matching the time the group instance started.
> > >
> > > One potential downside is that the user may have to store this
> timestamp
> > > somewhere in between restarts. For the group instance id, that is not
> > > always the case since sometimes it can be derived from the environment
> such
> > > as the hostname, or hardcoded in an environment variable where it
> typically
> > > doesn't need to be updated.
> > >
> > > Also, since static instances may be long-lived, preserving just the
> > > initial timestamp of the first instance might feel a bit awkward,
> since you
> > > may end up with static instances restarting and passing timestamps that
> > > could be old like two months ago. The user could instead store
> something
> > > like the last time of restart (and subtract metadata max age from it
> to be
> > > safe), but it can be considered a burden and may fail if shutdown was
> not
> > > graceful, i.e. a crash.
> > >
> > > I agree that this KIP provides a workable solution to avoid data loss
> > > without protocol or broker changes, so I'm +1. But it does still feel a
> > > little like a kludge since what the user really needs is an easy,
> almost
> > > implicit, way to not lose data when a recently added partition is
> > > discovered, and currently there is no metadata for the creation time
> of a
> > > partition. The user may not want to even have the same policy applied
> to
> > > older partitions for which their offset was deleted.
> > >
> > > Even for a consumer group not using static membership, suppose
> partitions
> > > are added by a producer and new messages are published. If at the same
> time
> > > there is consumer group, e.g. with 1 consumer only, and it has crashed,
> > > when it comes back up it may lose messages unless it knows what
> timestamp
> > > to pass.
> > >
> > > Thanks,
> > >
> > > Ryan
> > >
> > > From: [email protected] At: 03/07/26 02:46:28 UTC-5:00To:
> > > [email protected]
> > > Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition
> > > expansion for dynamically added partitions
> > >
> > > Hello Sikka,
> > >
> > > > If consumer restarts (app crash, bounce etc.) after dynamically
> adding
> > > partitions
> > > > it would consume unread messages from last committed offset for
> existing
> > > > partitions but would still miss the messages from new partition.
> > >
> > > For dynamic consumers, a restart inherently means leaving and rejoining
> > > the
> > > group
> > > as a new member, so recalculating startupTimestamp = now() is
> semantically
> > > correct —
> > > the consumer is genuinely starting fresh.
> > >
> > > The gap you described only applies to static membership, where the
> > > consumer can
> > > restart
> > > without triggering a rebalance, yet the local timestamp still gets
> reset.
> > > For
> > > this scenario, as
> > > Chia suggested, we could extend the configuration to accept an explicit
> > > timestamp
> > > This would allow users to pin a fixed reference point across restarts,
> > > effectively closing the gap for
> > > static membership. For dynamic consumers, the default by_start_time
> > > without an
> > > explicit timestamp
> > > already provides the correct behavior and a significant improvement
> over
> > > latest, which would miss
> > > data even without a restart.
> > >
> > > > If the offset are deleted mention in mentioned in Scenario 2 (Log
> > > truncation)
> > > > how this solution would address that scenario ?
> > >
> > > For the log truncation scenario, when segments are deleted and the
> > > consumer's
> > > committed
> > > offset becomes out of range, auto.offset.reset is triggered. With
> latest,
> > > the
> > > consumer simply jumps
> > > to the end of the partition, skipping all remaining available data.
> With
> > > by_start_time, the consumer looks up
> > > the position based on the startup timestamp rather than relying on
> > > offsets.
> > > Since the lookup is timestamp-based,
> > > it is not affected by offset invalidation due to truncation. Any data
> with
> > > timestamps at or after the startup time
> > > will still be found and consumed.
> > >
> > > > Do we need to care about Clock Skew or SystemTime Issues on consumer
> > > client
> > > side.
> > > > Should we use timestamp on the server/broker side ?
> > >
> > > Clock skew is a fair concern, but using a server-side timestamp does
> not
> > > necessarily make things safer.
> > > It would mean comparing the Group Coordinator's time against the
> Partition
> > > Leader's time, which are often
> > > different nodes. Without strict clock synchronization across the Kafka
> > > cluster,
> > > this "happens-before" relationship
> > > remains fundamentally unpredictable. On the other hand,
> auto.offset.reset
> > > is
> > > strictly a client-level configuration —
> > > consumers within the same group can intentionally use different
> policies.
> > > Tying
> > > the timestamp to a global server-side
> > > state would be a semantic mismatch. A local timestamp aligns much
> better
> > > with
> > > the client-level nature of this config.
> > >
> > > > Do you plan to have any metrics or observability when consumer resets
> > > offset
> > > by_start_time
> > >
> > > That's a great suggestion. I plan to expose the startup timestamp used
> by
> > > by_start_time as a client-level metric,
> > > so users can easily verify which reference point the consumer is using
> > > during
> > > debugging.
> > >
> > > Best Regards,
> > > Jiunn-Yang
> > >
> > > > Chia-Ping Tsai <[email protected]> 於 2026年3月6日 晚上10:44 寫道:
> > > >
> > > > Hi Ryan,
> > > >
> > > > That is a fantastic point. A static member restarting and capturing a
> > > newer
> > > local timestamp is definitely a critical edge case.
> > > >
> > > > Since users already need to inject a unique group.instance.id into
> the
> > > configuration for static members, my idea is to allow the
> > > auto.offset.reset
> > > policy to carry a dedicated timestamp to explicitly "lock" the startup
> > > time
> > > (for example, using a format like
> auto.offset.reset=startup:<timestamp>).
> > > >
> > > > This means that if users want to leverage this new policy with static
> > > membership, their deployment configuration would simply include these
> two
> > > specific injected values (the static ID and the locked timestamp).
> > > >
> > > > This approach elegantly maintains the configuration semantics at the
> > > member-level, and most importantly, it avoids any need to update the
> RPC
> > > protocol.
> > > >
> > > > What do you think of this approach for the static membership
> scenario?
> > > >
> > > > On 2026/03/05 19:45:13 "Ryan Leslie (BLOOMBERG/ NEW YORK)" wrote:
> > > >> Hey Jiunn,
> > > >>
> > > >> Glad to see some progress around this issue.
> > > >>
> > > >> I had a similar thought to David, that if the time is only known
> client
> > > side
> > > there are still edge cases for data loss. One case is static membership
> > > where
> > > from the perspective of a client they are free to restart their
> consumer
> > > task
> > > without actually having left or meaningfully affected the group.
> However,
> > > I
> > > think with the proposed implementation the timestamp is still reset
> here.
> > > So if
> > > the restart happens just after a partition is added and published to,
> but
> > > before the consumer metadata refreshed, the group still runs the risk
> of
> > > data
> > > loss.
> > > >>
> > > >> It could be argued that keeping the group 'stable' is a requirement
> for
> > > this
> > > feature to work, but sometimes it's not possible to accomplish.
> > > >>
> > > >> From: [email protected] At: 03/05/26 14:32:20 UTC-5:00To:
> > > [email protected]
> > > >> Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition
> > > expansion for dynamically added partitions
> > > >>
> > > >> Hi Jiunn,
> > > >>
> > > >> Thanks for the KIP!
> > > >>
> > > >> I was also considering this solution while we discussed in the
> jira. It
> > > >> seems to work in most of the cases but not in all. For instance,
> let’s
> > > >> imagine a partition created just before a new consumer joins or
> rejoins
> > > the
> > > >> group and this consumer gets the new partition. In this case, the
> > > consumer
> > > >> will have a start time which is older than the partition creation
> time.
> > > >> This could also happen with the truncation case. It makes the
> behavior
> > > kind
> > > >> of unpredictable again.
> > > >>
> > > >> Instead of relying on a local timestamp, one idea would to rely on a
> > > >> timestamp provided by the server. For instance, we could define the
> time
> > > >> since the group became non-empty. This would define the subscription
> > > time
> > > >> for the consumer group. The downside is that it only works if the
> > > consumer
> > > >> is part of a group.
> > > >>
> > > >> In your missing semantic section, I don’t fully understand how the
> 4th
> > > >> point is improved by the KIP. It says start from earliest but with
> the
> > > >> change it would start from application start time. Could you
> elaborate?
> > > >>
> > > >> Best,
> > > >> David
> > > >>
> > > >> Le jeu. 5 mars 2026 à 12:47, 黃竣陽 <[email protected]> a écrit :
> > > >>
> > > >>> Hello chia,
> > > >>>
> > > >>> Thanks for your feedback, I have updated the KIP.
> > > >>>
> > > >>> Best Regards,
> > > >>> Jiunn-Yang
> > > >>>
> > > >>>> Chia-Ping Tsai <[email protected]> 於 2026年3月5日 晚上7:24 寫道:
> > > >>>>
> > > >>>> hi Jiunn
> > > >>>>
> > > >>>> chia_00: Would you mind mentioning KAFKA-19236 in the KIP? It
> would be
> > > >>> helpful to let readers know that "Dynamically at partition
> discovery"
> > > is
> > > >>> being tracked as a separate issue
> > > >>>>
> > > >>>> Best,
> > > >>>> Chia-Ping
> > > >>>>
> > > >>>> On 2026/03/05 11:14:31 黃竣陽 wrote:
> > > >>>>> Hello everyone,
> > > >>>>>
> > > >>>>> I would like to start a discussion on KIP-1282: Prevent data loss
> > > >>> during partition expansion for dynamically added partitions
> > > >>>>> <https://cwiki.apache.org/confluence/x/mIY8G>
> > > >>>>>
> > > >>>>> This proposal aims to introduce a new auto.offset.reset policy
> > > >>> by_start_time, anchoring the
> > > >>>>> offset reset to the consumer's startup timestamp rather than
> > > partition
> > > >>> discovery time, to prevent
> > > >>>>> silent data loss during partition expansion.
> > > >>>>>
> > > >>>>> Best regards,
> > > >>>>> Jiunn-Yang
> > > >>>
> > > >>>
> > > >>
> > > >>
> > > >>
> > >
> > >
> > >
> >
> >
> >
>
>
>

Reply via email to