[
https://issues.apache.org/jira/browse/KAFKA-19902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
RivenSun updated KAFKA-19902:
-----------------------------
Description:
h2. Summary
When a partition leader changes and the consumer commits offsets with a stale
epoch, if the log segments containing that epoch are subsequently deleted due
to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error and
reset to earliest (if auto.offset.reset=earliest), causing massive message
reprocessing.The root cause is that SubscriptionState.allConsumed() uses
position.offsetEpoch instead of position.currentLeader.epoch when constructing
OffsetAndMetadata for commit, which can become stale when leader changes occur.
----
h2. Environment
Cluster Configuration:
* Kafka Server Version: 3.9.0
* Kafka Client Version: 3.9.0
* Topic: 200 partitions, 7-day retention, no tiered storage
* Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
* No broker/controller restarts occurred
* High throughput producer continuously writing messages
Consumer Configuration:
{code:java}
auto.offset.reset=earliest
enable.auto.commit=true {code}
Consumer Code:
* Registered ConsumerRebalanceListener
* Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
----
h2. Problem Description
In a scenario where the consumer group has no lag, consumers suddenly consumed
a massive amount of messages, far exceeding the recent few minutes of producer
writes. Investigation revealed that multiple partitions reset to the earliest
offset and reprocessed up to 7 days of historical data.
----
h2. Observed Symptoms (Timeline)
# Consumer group rebalance occurred (triggered by normal consumer group
management)
# Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
# Consumer reset to earliest offset due to auto.offset.reset=earliest
configuration
# Kafka broker logged NotLeaderOrFollowerException around the same timeframe,
indicating partition leader changes
# Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG
level and not visible in production logs)
#
The image below uses the partition
asyncmq_local_us_us_marketplace-ssl-a_aws_us-east-1_2a7e053c-9d90-4efd-af2d-3a8bf9564715-153
as an example to trace the problem log chain.
# !image-2025-11-21-18-03-24-592.png!!image-2025-11-21-18-04-35-845.png!
!image-2025-11-21-18-11-24-316.png!
----
h2. Root Cause Analysis
h3. The Problem Chain
1. Leader change occurs (epoch changes from N to N+1)
↓
2. Consumer continues processing old batches (epoch=N)
↓
3. Consumer commits offset during/after rebalance
├─ Committed offset: 1000
└─ Committed epoch: N (using position.offsetEpoch from old batch)
↓
4. High throughput + retention policy causes old segments (epoch=N) to be
deleted
↓
5. Consumer restarts/rebalances and fetches committed offset
├─ Tries to validate offset 1000 with epoch=N
└─ Broker cannot find epoch=N (segments deleted)
↓
6. Broker returns OFFSET_OUT_OF_RANGE
↓
7. Consumer resets to earliest offset
h3. Code Analysis
The problematic code in SubscriptionState.allConsumed():
{code:java}
//
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.offsetEpoch, // Problem: uses
offsetEpoch from consumed batch
""));
});
return allConsumed;
} {code}
Why this is problematic:The FetchPosition class contains two different epoch
values:
* offsetEpoch: The epoch from the last consumed record's batch
* currentLeader.epoch: The current partition leader's epoch from metadata
When committing offsets, we should use currentLeader.epoch instead of
offsetEpoch because:
# offsetEpoch represents the epoch of already consumed data (historical)
# currentLeader.epoch represents the current partition leader (up-to-date)
h3. Scenarios Where These Epochs Diverge
Scenario A: Leader changes while consumer is processing old batches
* T1: Consumer fetches batch with epoch=5
* T2: Leader changes to epoch=6
* T3: Metadata updates with new leader epoch=6
* T4: Consumer commits offset
* offsetEpoch = 5 (from batch being processed)
* currentLeader.epoch = 6 (from updated metadata)
* Problem: Commits epoch=5, which may soon be deleted
Scenario B: Recovery from committed offset after leader change
* Consumer commits offset with old epoch=N
* Leader changes to epoch=N+1
* Old segments (epoch=N) are deleted by retention policy
* Consumer rebalances and tries to restore from committed offset
* offsetEpoch = N (from committed offset)
* currentLeader.epoch = N+1 (from current metadata)
* Problem: Validation fails because epoch=N no longer exists
----
h2. Steps to Reproduce
This is a timing-sensitive edge case. The following conditions increase the
likelihood:
# Setup:
* High-throughput topic (to trigger faster log rotation)
* Relatively short retention period (e.g., 7 days)
* Consumer group with rebalance listener calling commitSync()
* enable.auto.commit=true (or any manual commit)
# Trigger:
* Trigger a partition leader change (broker restart, controller election, etc.)
* Simultaneously or shortly after, trigger a consumer group rebalance
* Wait for retention policy to delete old log segments
# Expected Result:
Consumer should resume from committed offset
# Actual Result:
Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
----
h2. Impact
* Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
* Service Degradation: Sudden spike in consumer throughput can overwhelm
downstream systems
* Resource Waste: Unnecessary CPU, memory, and network usage
* Potential Duplicates: If using auto.offset.reset=earliest, duplicate message
processing is guaranteed
----
h2. Proposed Fix
h3. Root Cause Analysis
The issue is more fundamental than a simple field selection problem. The core
issue is that both epoch values in FetchPosition can be stale at commit time:
# offsetEpoch: Contains the epoch from the last consumed record's batch. If a
leader change occurs after consumption but before commit, this epoch becomes
stale and may reference log segments that have been deleted.
# currentLeader.epoch: Inherited from the previous position during normal
consumption and only updated when:
* NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
* Position is restored from committed offsets (fetches from metadata)
* Explicit validation is triggered via maybeValidatePositionForCurrentLeader()
During normal, error-free consumption, currentLeader is never updated and can
also become stale.
h3. Problem with Current Code
Location: org.apache.kafka.clients.consumer.internals.FetchCollector
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed batch
position.currentLeader); // ❌ currentLeader: inherited, NOT
updated!
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
{code}
The inherited currentLeader means it can be as stale as offsetEpoch in certain
scenarios.
----
h3. Recommended Solution: Proactively Update currentLeader During Position
Updates
Option 1: Update currentLeader when advancing position (Primary
Recommendation)Modify FetchCollector to fetch the latest leader information
from metadata every time the position is updated:
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
// Fetch the latest leader information from metadata
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(tp);
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
currentLeaderAndEpoch); // ✅ Use fresh leader info from metadata
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
} {code}
Advantages:
* Ensures currentLeader is always up-to-date
* Makes allConsumed() safe to use *currentLeader.epoch* for commits
Modify SubscriptionState.allConsumed() to {color:#de350b}use
currentLeader.epoch instead of offsetEpoch{color}:
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.currentLeader.epoch, // ✅ Use current
leader epoch
""));
});
return allConsumed;
} {code}
* Minimal performance impact (metadata lookup is O(1) from local cache)
* Aligns with the existing pattern in refreshCommittedOffsets()
Potential Concerns:
* Adds one metadata lookup per position update
* If metadata is stale, currentLeader.epoch could still lag slightly, but this
is the same risk as today
----
h3. Alternative Solutions
Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch
the latest leader information at commit time:
{code:java}
// Note: This would require passing metadata reference to allConsumed()
public synchronized Map<TopicPartition, OffsetAndMetadata>
allConsumed(ConsumerMetadata metadata) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
// Fetch the latest leader epoch from metadata at commit time
Metadata.LeaderAndEpoch latestLeader =
metadata.currentLeader(topicPartition);
Optional<Integer> epochToCommit = latestLeader.epoch.isPresent()
? latestLeader.epoch
: partitionState.position.offsetEpoch; // Fallback to
offsetEpoch
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
} {code}
Advantages:
* Only impacts commit path, not consumption hot path
* Directly addresses the commit-time staleness issue
Disadvantages:
* Requires changing the signature of allConsumed() (API change)
* May still have a race condition if leader changes between metadata fetch and
commit
* Metadata could be stale if update hasn't been processed yet
----
Option 3: Use the maximum epoch valueUse the larger of the two epoch values,
assuming newer epochs have higher values:
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
Optional<Integer> epochToCommit;
if (partitionState.position.offsetEpoch.isPresent() &&
partitionState.position.currentLeader.epoch.isPresent()) {
// Use the maximum of the two epochs
int maxEpoch = Math.max(
partitionState.position.offsetEpoch.get(),
partitionState.position.currentLeader.epoch.get());
epochToCommit = Optional.of(maxEpoch);
} else {
// Fallback to whichever is present
epochToCommit =
partitionState.position.currentLeader.epoch.isPresent()
? partitionState.position.currentLeader.epoch
: partitionState.position.offsetEpoch;
}
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
} {code}
Advantages:
* No API changes required
* Simple to implement
* Provides better protection than using only one epoch
Disadvantages:
* Heuristic-based; assumes epochs are monotonically increasing
* Could still use a stale epoch if both values are old
* Doesn't solve the root cause of stale currentLeader
----
h3. Recommendation
Primary recommendation: Implement Option 1 (Update currentLeader during
position updates)This is the most robust solution because:
# It ensures currentLeader is always fresh
# It fixes the root cause rather than working around symptoms
# It has minimal performance impact
# It makes the codebase more consistent and maintainable
Secondary recommendation: Implement Option 3 as a defense-in-depth measureEven
with Option 1, using max(offsetEpoch, currentLeader.epoch) in allConsumed()
provides additional safety against any edge cases where one epoch might be more
up-to-date than the other.Combined approach (strongest protection):
{code:java}
// In FetchCollector.java
if (nextInLineFetch.nextFetchOffset() > position.offset) {
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(tp);
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
currentLeaderAndEpoch); // ✅ Keep currentLeader fresh
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
// In SubscriptionState.java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
// Use the maximum epoch as defense-in-depth
Optional<Integer> epochToCommit;
if (partitionState.position.offsetEpoch.isPresent() &&
partitionState.position.currentLeader.epoch.isPresent()) {
epochToCommit = Optional.of(Math.max(
partitionState.position.offsetEpoch.get(),
partitionState.position.currentLeader.epoch.get()));
} else {
epochToCommit =
partitionState.position.currentLeader.epoch.isPresent()
? partitionState.position.currentLeader.epoch
: partitionState.position.offsetEpoch;
}
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
} {code}
This combined approach provides:
* Prevention: Keep currentLeader fresh during normal operation
* Defense: Use the best available epoch value at commit time
* Resilience: Minimize the window where a stale epoch can cause issues
----
h2. Additional Notes
Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side
handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
{code:java}
// FetchCollector.java line 325
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
// AbstractFetch.java line 207
log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition,
partitionError, ...);
// OffsetsForLeaderEpochUtils.java line 102
LOG.debug("Attempt to fetch offsets for partition {} failed due to {},
retrying.", ...); {code}
This makes the issue difficult to diagnose in production environments.
----
h2. Workarounds (Until Fixed)
# Increase retention period to reduce likelihood of epoch deletion
# Monitor consumer lag to ensure it stays low
# Reduce rebalance frequency (increase max.poll.interval.ms,
session.timeout.ms)
# Use cooperative rebalance strategy to minimize rebalance impact
# Consider using auto.offset.reset=latest if reprocessing is more costly than
data loss (application-dependent)
----
h2. Related Code References
h3. 1. The problematic method: SubscriptionState.allConsumed()
Location: org.apache.kafka.clients.consumer.internals.SubscriptionState
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new
OffsetAndMetadata(partitionState.position.offset,
partitionState.position.offsetEpoch, "")); // Uses
offsetEpoch instead of currentLeader.epoch
});
return allConsumed;
} {code}
h3. 2. How FetchPosition is updated during normal consumption
Location: org.apache.kafka.clients.consumer.internals.FetchCollector
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed batch
position.currentLeader); // currentLeader: inherited from
old position, NOT updated!
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
} {code}
Key Issue: The currentLeader field is inherited from the previous position and
not automatically updated during normal consumption. It only gets updated when
leader change errors are detected.
h3. 3. How committed offsets are restored after rebalance
Location:
org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
{code:java}
public static void refreshCommittedOffsets(final Map<TopicPartition,
OffsetAndMetadata> offsetsAndMetadata,
final ConsumerMetadata metadata,
final SubscriptionState
subscriptions) {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsetsAndMetadata.entrySet()) {
final TopicPartition tp = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata != null) {
// first update the epoch if necessary
entry.getValue().leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
// it's possible that the partition is no longer assigned when the
response is received,
// so we need to ignore seeking if that's the case
if (subscriptions.isAssigned(tp)) {
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
final SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(), // offsetEpoch from
committed offset (may be old)
leaderAndEpoch); // currentLeader
from current metadata (may be new)
subscriptions.seekUnvalidated(tp, position);
log.info("Setting offset for partition {} to the committed
offset {}", tp, position);
}
}
}
} {code}
The Divergence Point: When restoring from committed offsets, offsetEpoch comes
from the stored offset (potentially old), while currentLeader comes from fresh
metadata (potentially new after leader change).
h3. 4. How OffsetsForLeaderEpoch validation request is constructed
Location:
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
{code:java}
static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
OffsetForLeaderTopicCollection topics = new
OffsetForLeaderTopicCollection(requestData.size());
requestData.forEach((topicPartition, fetchPosition) ->
fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
OffsetForLeaderTopic topic =
topics.find(topicPartition.topic());
if (topic == null) {
topic = new
OffsetForLeaderTopic().setTopic(topicPartition.topic());
topics.add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(topicPartition.partition())
.setLeaderEpoch(fetchEpoch) // Uses
offsetEpoch for validation
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
})
);
return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
} {code}
The Validation Problem: The validation request uses fetchEpoch (which is
offsetEpoch) to validate against the broker. If this epoch no longer exists in
the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
h3. 5. FetchPosition class definition
Location:
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition
{code:java}
/**
* Represents the position of a partition subscription.
*
* This includes the offset and epoch from the last record in
* the batch from a FetchResponse. It also includes the leader epoch at the
time the batch was consumed.
*/
public static class FetchPosition {
public final long offset;
final Optional<Integer> offsetEpoch; // Epoch from last consumed
record's batch
final Metadata.LeaderAndEpoch currentLeader; // Current partition leader
info from metadata
FetchPosition(long offset) {
this(offset, Optional.empty(),
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
}
public FetchPosition(long offset, Optional<Integer> offsetEpoch,
Metadata.LeaderAndEpoch currentLeader) {
this.offset = offset;
this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
this.currentLeader = Objects.requireNonNull(currentLeader);
}
@Override
public String toString() {
return "FetchPosition{" +
"offset=" + offset +
", offsetEpoch=" + offsetEpoch +
", currentLeader=" + currentLeader +
'}';
}
}{code}
Class Design: The class contains both offsetEpoch (historical data epoch) and
currentLeader.epoch (current metadata epoch), but allConsumed() only uses the
former when committing.
was:
h2. Summary
When a partition leader changes and the consumer commits offsets with a stale
epoch, if the log segments containing that epoch are subsequently deleted due
to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error and
reset to earliest (if auto.offset.reset=earliest), causing massive message
reprocessing.The root cause is that SubscriptionState.allConsumed() uses
position.offsetEpoch instead of position.currentLeader.epoch when constructing
OffsetAndMetadata for commit, which can become stale when leader changes occur.
----
h2. Environment
Cluster Configuration:
* Kafka Server Version: 3.9.0
* Kafka Client Version: 3.9.0
* Topic: 200 partitions, 7-day retention, no tiered storage
* Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
* No broker/controller restarts occurred
* High throughput producer continuously writing messages
Consumer Configuration:
{code:java}
auto.offset.reset=earliest
enable.auto.commit=true {code}
Consumer Code:
* Registered ConsumerRebalanceListener
* Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
----
h2. Problem Description
In a scenario where the consumer group has no lag, consumers suddenly consumed
a massive amount of messages, far exceeding the recent few minutes of producer
writes. Investigation revealed that multiple partitions reset to the earliest
offset and reprocessed up to 7 days of historical data.
----
h2. Observed Symptoms (Timeline)
# Consumer group rebalance occurred (triggered by normal consumer group
management)
# Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
# Consumer reset to earliest offset due to auto.offset.reset=earliest
configuration
# Kafka broker logged NotLeaderOrFollowerException around the same timeframe,
indicating partition leader changes
# Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG
level and not visible in production logs)
!image-2025-11-21-18-03-24-592.png!!image-2025-11-21-18-04-35-845.png!
!image-2025-11-21-18-11-24-316.png!
----
h2. Root Cause Analysis
h3. The Problem Chain
1. Leader change occurs (epoch changes from N to N+1)
↓
2. Consumer continues processing old batches (epoch=N)
↓
3. Consumer commits offset during/after rebalance
├─ Committed offset: 1000
└─ Committed epoch: N (using position.offsetEpoch from old batch)
↓
4. High throughput + retention policy causes old segments (epoch=N) to be
deleted
↓
5. Consumer restarts/rebalances and fetches committed offset
├─ Tries to validate offset 1000 with epoch=N
└─ Broker cannot find epoch=N (segments deleted)
↓
6. Broker returns OFFSET_OUT_OF_RANGE
↓
7. Consumer resets to earliest offset
h3. Code Analysis
The problematic code in SubscriptionState.allConsumed():
{code:java}
//
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.offsetEpoch, // Problem: uses
offsetEpoch from consumed batch
""));
});
return allConsumed;
} {code}
Why this is problematic:The FetchPosition class contains two different epoch
values:
* offsetEpoch: The epoch from the last consumed record's batch
* currentLeader.epoch: The current partition leader's epoch from metadata
When committing offsets, we should use currentLeader.epoch instead of
offsetEpoch because:
# offsetEpoch represents the epoch of already consumed data (historical)
# currentLeader.epoch represents the current partition leader (up-to-date)
h3. Scenarios Where These Epochs Diverge
Scenario A: Leader changes while consumer is processing old batches
* T1: Consumer fetches batch with epoch=5
* T2: Leader changes to epoch=6
* T3: Metadata updates with new leader epoch=6
* T4: Consumer commits offset
* offsetEpoch = 5 (from batch being processed)
* currentLeader.epoch = 6 (from updated metadata)
* Problem: Commits epoch=5, which may soon be deleted
Scenario B: Recovery from committed offset after leader change
* Consumer commits offset with old epoch=N
* Leader changes to epoch=N+1
* Old segments (epoch=N) are deleted by retention policy
* Consumer rebalances and tries to restore from committed offset
* offsetEpoch = N (from committed offset)
* currentLeader.epoch = N+1 (from current metadata)
* Problem: Validation fails because epoch=N no longer exists
----
h2. Steps to Reproduce
This is a timing-sensitive edge case. The following conditions increase the
likelihood:
# Setup:
* High-throughput topic (to trigger faster log rotation)
* Relatively short retention period (e.g., 7 days)
* Consumer group with rebalance listener calling commitSync()
* enable.auto.commit=true (or any manual commit)
# Trigger:
* Trigger a partition leader change (broker restart, controller election, etc.)
* Simultaneously or shortly after, trigger a consumer group rebalance
* Wait for retention policy to delete old log segments
# Expected Result:
Consumer should resume from committed offset
# Actual Result:
Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
----
h2. Impact
* Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
* Service Degradation: Sudden spike in consumer throughput can overwhelm
downstream systems
* Resource Waste: Unnecessary CPU, memory, and network usage
* Potential Duplicates: If using auto.offset.reset=earliest, duplicate message
processing is guaranteed
----
h2. Proposed Fix
h3. Root Cause Analysis
The issue is more fundamental than a simple field selection problem. The core
issue is that both epoch values in FetchPosition can be stale at commit time:
# offsetEpoch: Contains the epoch from the last consumed record's batch. If a
leader change occurs after consumption but before commit, this epoch becomes
stale and may reference log segments that have been deleted.
# currentLeader.epoch: Inherited from the previous position during normal
consumption and only updated when:
* NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
* Position is restored from committed offsets (fetches from metadata)
* Explicit validation is triggered via maybeValidatePositionForCurrentLeader()
During normal, error-free consumption, currentLeader is never updated and can
also become stale.
h3. Problem with Current Code
Location: org.apache.kafka.clients.consumer.internals.FetchCollector
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed batch
position.currentLeader); // ❌ currentLeader: inherited, NOT
updated!
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
{code}
The inherited currentLeader means it can be as stale as offsetEpoch in certain
scenarios.
----
h3. Recommended Solution: Proactively Update currentLeader During Position
Updates
Option 1: Update currentLeader when advancing position (Primary
Recommendation)Modify FetchCollector to fetch the latest leader information
from metadata every time the position is updated:
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
// Fetch the latest leader information from metadata
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(tp);
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
currentLeaderAndEpoch); // ✅ Use fresh leader info from metadata
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
} {code}
Advantages:
* Ensures currentLeader is always up-to-date
* Makes allConsumed() safe to use *currentLeader.epoch* for commits
Modify SubscriptionState.allConsumed() to {color:#de350b}use
currentLeader.epoch instead of offsetEpoch{color}:
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.currentLeader.epoch, // ✅ Use current
leader epoch
""));
});
return allConsumed;
} {code}
* Minimal performance impact (metadata lookup is O(1) from local cache)
* Aligns with the existing pattern in refreshCommittedOffsets()
Potential Concerns:
* Adds one metadata lookup per position update
* If metadata is stale, currentLeader.epoch could still lag slightly, but this
is the same risk as today
----
h3. Alternative Solutions
Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch
the latest leader information at commit time:
{code:java}
// Note: This would require passing metadata reference to allConsumed()
public synchronized Map<TopicPartition, OffsetAndMetadata>
allConsumed(ConsumerMetadata metadata) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
// Fetch the latest leader epoch from metadata at commit time
Metadata.LeaderAndEpoch latestLeader =
metadata.currentLeader(topicPartition);
Optional<Integer> epochToCommit = latestLeader.epoch.isPresent()
? latestLeader.epoch
: partitionState.position.offsetEpoch; // Fallback to
offsetEpoch
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
} {code}
Advantages:
* Only impacts commit path, not consumption hot path
* Directly addresses the commit-time staleness issue
Disadvantages:
* Requires changing the signature of allConsumed() (API change)
* May still have a race condition if leader changes between metadata fetch and
commit
* Metadata could be stale if update hasn't been processed yet
----
Option 3: Use the maximum epoch valueUse the larger of the two epoch values,
assuming newer epochs have higher values:
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
Optional<Integer> epochToCommit;
if (partitionState.position.offsetEpoch.isPresent() &&
partitionState.position.currentLeader.epoch.isPresent()) {
// Use the maximum of the two epochs
int maxEpoch = Math.max(
partitionState.position.offsetEpoch.get(),
partitionState.position.currentLeader.epoch.get());
epochToCommit = Optional.of(maxEpoch);
} else {
// Fallback to whichever is present
epochToCommit =
partitionState.position.currentLeader.epoch.isPresent()
? partitionState.position.currentLeader.epoch
: partitionState.position.offsetEpoch;
}
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
} {code}
Advantages:
* No API changes required
* Simple to implement
* Provides better protection than using only one epoch
Disadvantages:
* Heuristic-based; assumes epochs are monotonically increasing
* Could still use a stale epoch if both values are old
* Doesn't solve the root cause of stale currentLeader
----
h3. Recommendation
Primary recommendation: Implement Option 1 (Update currentLeader during
position updates)This is the most robust solution because:
# It ensures currentLeader is always fresh
# It fixes the root cause rather than working around symptoms
# It has minimal performance impact
# It makes the codebase more consistent and maintainable
Secondary recommendation: Implement Option 3 as a defense-in-depth measureEven
with Option 1, using max(offsetEpoch, currentLeader.epoch) in allConsumed()
provides additional safety against any edge cases where one epoch might be more
up-to-date than the other.Combined approach (strongest protection):
{code:java}
// In FetchCollector.java
if (nextInLineFetch.nextFetchOffset() > position.offset) {
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(tp);
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
currentLeaderAndEpoch); // ✅ Keep currentLeader fresh
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
// In SubscriptionState.java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
// Use the maximum epoch as defense-in-depth
Optional<Integer> epochToCommit;
if (partitionState.position.offsetEpoch.isPresent() &&
partitionState.position.currentLeader.epoch.isPresent()) {
epochToCommit = Optional.of(Math.max(
partitionState.position.offsetEpoch.get(),
partitionState.position.currentLeader.epoch.get()));
} else {
epochToCommit =
partitionState.position.currentLeader.epoch.isPresent()
? partitionState.position.currentLeader.epoch
: partitionState.position.offsetEpoch;
}
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
} {code}
This combined approach provides:
* Prevention: Keep currentLeader fresh during normal operation
* Defense: Use the best available epoch value at commit time
* Resilience: Minimize the window where a stale epoch can cause issues
----
h2. Additional Notes
Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side
handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
{code:java}
// FetchCollector.java line 325
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
// AbstractFetch.java line 207
log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition,
partitionError, ...);
// OffsetsForLeaderEpochUtils.java line 102
LOG.debug("Attempt to fetch offsets for partition {} failed due to {},
retrying.", ...); {code}
This makes the issue difficult to diagnose in production environments.
----
h2. Workarounds (Until Fixed)
# Increase retention period to reduce likelihood of epoch deletion
# Monitor consumer lag to ensure it stays low
# Reduce rebalance frequency (increase max.poll.interval.ms,
session.timeout.ms)
# Use cooperative rebalance strategy to minimize rebalance impact
# Consider using auto.offset.reset=latest if reprocessing is more costly than
data loss (application-dependent)
----
h2. Related Code References
h3. 1. The problematic method: SubscriptionState.allConsumed()
Location: org.apache.kafka.clients.consumer.internals.SubscriptionState
{code:java}
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new
OffsetAndMetadata(partitionState.position.offset,
partitionState.position.offsetEpoch, "")); // Uses
offsetEpoch instead of currentLeader.epoch
});
return allConsumed;
} {code}
h3. 2. How FetchPosition is updated during normal consumption
Location: org.apache.kafka.clients.consumer.internals.FetchCollector
{code:java}
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed batch
position.currentLeader); // currentLeader: inherited from
old position, NOT updated!
log.trace("Updating fetch position from {} to {} for partition {} and
returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
} {code}
Key Issue: The currentLeader field is inherited from the previous position and
not automatically updated during normal consumption. It only gets updated when
leader change errors are detected.
h3. 3. How committed offsets are restored after rebalance
Location:
org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
{code:java}
public static void refreshCommittedOffsets(final Map<TopicPartition,
OffsetAndMetadata> offsetsAndMetadata,
final ConsumerMetadata metadata,
final SubscriptionState
subscriptions) {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsetsAndMetadata.entrySet()) {
final TopicPartition tp = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata != null) {
// first update the epoch if necessary
entry.getValue().leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
// it's possible that the partition is no longer assigned when the
response is received,
// so we need to ignore seeking if that's the case
if (subscriptions.isAssigned(tp)) {
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
final SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(), // offsetEpoch from
committed offset (may be old)
leaderAndEpoch); // currentLeader
from current metadata (may be new)
subscriptions.seekUnvalidated(tp, position);
log.info("Setting offset for partition {} to the committed
offset {}", tp, position);
}
}
}
} {code}
The Divergence Point: When restoring from committed offsets, offsetEpoch comes
from the stored offset (potentially old), while currentLeader comes from fresh
metadata (potentially new after leader change).
h3. 4. How OffsetsForLeaderEpoch validation request is constructed
Location:
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
{code:java}
static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
OffsetForLeaderTopicCollection topics = new
OffsetForLeaderTopicCollection(requestData.size());
requestData.forEach((topicPartition, fetchPosition) ->
fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
OffsetForLeaderTopic topic =
topics.find(topicPartition.topic());
if (topic == null) {
topic = new
OffsetForLeaderTopic().setTopic(topicPartition.topic());
topics.add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(topicPartition.partition())
.setLeaderEpoch(fetchEpoch) // Uses
offsetEpoch for validation
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
})
);
return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
} {code}
The Validation Problem: The validation request uses fetchEpoch (which is
offsetEpoch) to validate against the broker. If this epoch no longer exists in
the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
h3. 5. FetchPosition class definition
Location:
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition
{code:java}
/**
* Represents the position of a partition subscription.
*
* This includes the offset and epoch from the last record in
* the batch from a FetchResponse. It also includes the leader epoch at the
time the batch was consumed.
*/
public static class FetchPosition {
public final long offset;
final Optional<Integer> offsetEpoch; // Epoch from last consumed
record's batch
final Metadata.LeaderAndEpoch currentLeader; // Current partition leader
info from metadata
FetchPosition(long offset) {
this(offset, Optional.empty(),
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
}
public FetchPosition(long offset, Optional<Integer> offsetEpoch,
Metadata.LeaderAndEpoch currentLeader) {
this.offset = offset;
this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
this.currentLeader = Objects.requireNonNull(currentLeader);
}
@Override
public String toString() {
return "FetchPosition{" +
"offset=" + offset +
", offsetEpoch=" + offsetEpoch +
", currentLeader=" + currentLeader +
'}';
}
}{code}
Class Design: The class contains both offsetEpoch (historical data epoch) and
currentLeader.epoch (current metadata epoch), but allConsumed() only uses the
former when committing.
> Consumer triggers OFFSET_OUT_OF_RANGE and resets to earliest when committed
> offset's epoch has been outdated after leader change
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19902
> URL: https://issues.apache.org/jira/browse/KAFKA-19902
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.9.0
> Reporter: RivenSun
> Priority: Major
> Attachments: image-2025-11-21-18-03-24-592.png,
> image-2025-11-21-18-04-35-845.png, image-2025-11-21-18-05-56-528.png,
> image-2025-11-21-18-11-24-316.png
>
>
> h2. Summary
> When a partition leader changes and the consumer commits offsets with a stale
> epoch, if the log segments containing that epoch are subsequently deleted due
> to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error
> and reset to earliest (if auto.offset.reset=earliest), causing massive
> message reprocessing.The root cause is that SubscriptionState.allConsumed()
> uses position.offsetEpoch instead of position.currentLeader.epoch when
> constructing OffsetAndMetadata for commit, which can become stale when leader
> changes occur.
> ----
> h2. Environment
> Cluster Configuration:
> * Kafka Server Version: 3.9.0
> * Kafka Client Version: 3.9.0
> * Topic: 200 partitions, 7-day retention, no tiered storage
> * Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
> * No broker/controller restarts occurred
> * High throughput producer continuously writing messages
> Consumer Configuration:
> {code:java}
> auto.offset.reset=earliest
> enable.auto.commit=true {code}
>
> Consumer Code:
> * Registered ConsumerRebalanceListener
> * Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
> ----
> h2. Problem Description
> In a scenario where the consumer group has no lag, consumers suddenly
> consumed a massive amount of messages, far exceeding the recent few minutes
> of producer writes. Investigation revealed that multiple partitions reset to
> the earliest offset and reprocessed up to 7 days of historical data.
> ----
> h2. Observed Symptoms (Timeline)
> # Consumer group rebalance occurred (triggered by normal consumer group
> management)
> # Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
> # Consumer reset to earliest offset due to auto.offset.reset=earliest
> configuration
> # Kafka broker logged NotLeaderOrFollowerException around the same
> timeframe, indicating partition leader changes
> # Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG
> level and not visible in production logs)
> #
> The image below uses the partition
> asyncmq_local_us_us_marketplace-ssl-a_aws_us-east-1_2a7e053c-9d90-4efd-af2d-3a8bf9564715-153
> as an example to trace the problem log chain.
> # !image-2025-11-21-18-03-24-592.png!!image-2025-11-21-18-04-35-845.png!
> !image-2025-11-21-18-11-24-316.png!
> ----
> h2. Root Cause Analysis
> h3. The Problem Chain
> 1. Leader change occurs (epoch changes from N to N+1)
> ↓
> 2. Consumer continues processing old batches (epoch=N)
> ↓
> 3. Consumer commits offset during/after rebalance
> ├─ Committed offset: 1000
> └─ Committed epoch: N (using position.offsetEpoch from old batch)
> ↓
> 4. High throughput + retention policy causes old segments (epoch=N) to be
> deleted
> ↓
> 5. Consumer restarts/rebalances and fetches committed offset
> ├─ Tries to validate offset 1000 with epoch=N
> └─ Broker cannot find epoch=N (segments deleted)
> ↓
> 6. Broker returns OFFSET_OUT_OF_RANGE
> ↓
> 7. Consumer resets to earliest offset
> h3. Code Analysis
> The problematic code in SubscriptionState.allConsumed():
> {code:java}
> //
> kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> partitionState.position.offsetEpoch, // Problem: uses
> offsetEpoch from consumed batch
> ""));
> });
> return allConsumed;
> } {code}
>
> Why this is problematic:The FetchPosition class contains two different epoch
> values:
> * offsetEpoch: The epoch from the last consumed record's batch
> * currentLeader.epoch: The current partition leader's epoch from metadata
> When committing offsets, we should use currentLeader.epoch instead of
> offsetEpoch because:
> # offsetEpoch represents the epoch of already consumed data (historical)
> # currentLeader.epoch represents the current partition leader (up-to-date)
> h3. Scenarios Where These Epochs Diverge
> Scenario A: Leader changes while consumer is processing old batches
> * T1: Consumer fetches batch with epoch=5
> * T2: Leader changes to epoch=6
> * T3: Metadata updates with new leader epoch=6
> * T4: Consumer commits offset
> * offsetEpoch = 5 (from batch being processed)
> * currentLeader.epoch = 6 (from updated metadata)
> * Problem: Commits epoch=5, which may soon be deleted
> Scenario B: Recovery from committed offset after leader change
> * Consumer commits offset with old epoch=N
> * Leader changes to epoch=N+1
> * Old segments (epoch=N) are deleted by retention policy
> * Consumer rebalances and tries to restore from committed offset
> * offsetEpoch = N (from committed offset)
> * currentLeader.epoch = N+1 (from current metadata)
> * Problem: Validation fails because epoch=N no longer exists
> ----
> h2. Steps to Reproduce
> This is a timing-sensitive edge case. The following conditions increase the
> likelihood:
> # Setup:
> * High-throughput topic (to trigger faster log rotation)
> * Relatively short retention period (e.g., 7 days)
> * Consumer group with rebalance listener calling commitSync()
> * enable.auto.commit=true (or any manual commit)
> # Trigger:
> * Trigger a partition leader change (broker restart, controller election,
> etc.)
> * Simultaneously or shortly after, trigger a consumer group rebalance
> * Wait for retention policy to delete old log segments
> # Expected Result:
> Consumer should resume from committed offset
> # Actual Result:
> Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
> ----
> h2. Impact
> * Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
> * Service Degradation: Sudden spike in consumer throughput can overwhelm
> downstream systems
> * Resource Waste: Unnecessary CPU, memory, and network usage
> * Potential Duplicates: If using auto.offset.reset=earliest, duplicate
> message processing is guaranteed
> ----
> h2. Proposed Fix
> h3. Root Cause Analysis
> The issue is more fundamental than a simple field selection problem. The core
> issue is that both epoch values in FetchPosition can be stale at commit time:
> # offsetEpoch: Contains the epoch from the last consumed record's batch. If
> a leader change occurs after consumption but before commit, this epoch
> becomes stale and may reference log segments that have been deleted.
> # currentLeader.epoch: Inherited from the previous position during normal
> consumption and only updated when:
> * NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
> * Position is restored from committed offsets (fetches from metadata)
> * Explicit validation is triggered via
> maybeValidatePositionForCurrentLeader()
> During normal, error-free consumption, currentLeader is never updated and can
> also become stale.
> h3. Problem with Current Code
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed
> batch
> position.currentLeader); // ❌ currentLeader: inherited,
> NOT updated!
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> }
> {code}
> The inherited currentLeader means it can be as stale as offsetEpoch in
> certain scenarios.
> ----
> h3. Recommended Solution: Proactively Update currentLeader During Position
> Updates
> Option 1: Update currentLeader when advancing position (Primary
> Recommendation)Modify FetchCollector to fetch the latest leader information
> from metadata every time the position is updated:
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> // Fetch the latest leader information from metadata
> Metadata.LeaderAndEpoch currentLeaderAndEpoch =
> metadata.currentLeader(tp);
>
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(),
> currentLeaderAndEpoch); // ✅ Use fresh leader info from metadata
>
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> } {code}
> Advantages:
> * Ensures currentLeader is always up-to-date
> * Makes allConsumed() safe to use *currentLeader.epoch* for commits
> Modify SubscriptionState.allConsumed() to {color:#de350b}use
> currentLeader.epoch instead of offsetEpoch{color}:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> partitionState.position.currentLeader.epoch, // ✅ Use
> current leader epoch
> ""));
> });
> return allConsumed;
> } {code}
>
> * Minimal performance impact (metadata lookup is O(1) from local cache)
> * Aligns with the existing pattern in refreshCommittedOffsets()
> Potential Concerns:
> * Adds one metadata lookup per position update
> * If metadata is stale, currentLeader.epoch could still lag slightly, but
> this is the same risk as today
> ----
> h3. Alternative Solutions
> Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch
> the latest leader information at commit time:
> {code:java}
> // Note: This would require passing metadata reference to allConsumed()
> public synchronized Map<TopicPartition, OffsetAndMetadata>
> allConsumed(ConsumerMetadata metadata) {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> // Fetch the latest leader epoch from metadata at commit time
> Metadata.LeaderAndEpoch latestLeader =
> metadata.currentLeader(topicPartition);
> Optional<Integer> epochToCommit = latestLeader.epoch.isPresent()
> ? latestLeader.epoch
> : partitionState.position.offsetEpoch; // Fallback to
> offsetEpoch
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> Advantages:
> * Only impacts commit path, not consumption hot path
> * Directly addresses the commit-time staleness issue
> Disadvantages:
> * Requires changing the signature of allConsumed() (API change)
> * May still have a race condition if leader changes between metadata fetch
> and commit
> * Metadata could be stale if update hasn't been processed yet
> ----
> Option 3: Use the maximum epoch valueUse the larger of the two epoch values,
> assuming newer epochs have higher values:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> Optional<Integer> epochToCommit;
>
> if (partitionState.position.offsetEpoch.isPresent() &&
> partitionState.position.currentLeader.epoch.isPresent()) {
> // Use the maximum of the two epochs
> int maxEpoch = Math.max(
> partitionState.position.offsetEpoch.get(),
> partitionState.position.currentLeader.epoch.get());
> epochToCommit = Optional.of(maxEpoch);
> } else {
> // Fallback to whichever is present
> epochToCommit =
> partitionState.position.currentLeader.epoch.isPresent()
> ? partitionState.position.currentLeader.epoch
> : partitionState.position.offsetEpoch;
> }
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> Advantages:
> * No API changes required
> * Simple to implement
> * Provides better protection than using only one epoch
> Disadvantages:
> * Heuristic-based; assumes epochs are monotonically increasing
> * Could still use a stale epoch if both values are old
> * Doesn't solve the root cause of stale currentLeader
> ----
> h3. Recommendation
> Primary recommendation: Implement Option 1 (Update currentLeader during
> position updates)This is the most robust solution because:
> # It ensures currentLeader is always fresh
> # It fixes the root cause rather than working around symptoms
> # It has minimal performance impact
> # It makes the codebase more consistent and maintainable
> Secondary recommendation: Implement Option 3 as a defense-in-depth
> measureEven with Option 1, using max(offsetEpoch, currentLeader.epoch) in
> allConsumed() provides additional safety against any edge cases where one
> epoch might be more up-to-date than the other.Combined approach (strongest
> protection):
> {code:java}
> // In FetchCollector.java
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> Metadata.LeaderAndEpoch currentLeaderAndEpoch =
> metadata.currentLeader(tp);
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(),
> currentLeaderAndEpoch); // ✅ Keep currentLeader fresh
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> }
>
> // In SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> // Use the maximum epoch as defense-in-depth
> Optional<Integer> epochToCommit;
> if (partitionState.position.offsetEpoch.isPresent() &&
> partitionState.position.currentLeader.epoch.isPresent()) {
> epochToCommit = Optional.of(Math.max(
> partitionState.position.offsetEpoch.get(),
> partitionState.position.currentLeader.epoch.get()));
> } else {
> epochToCommit =
> partitionState.position.currentLeader.epoch.isPresent()
> ? partitionState.position.currentLeader.epoch
> : partitionState.position.offsetEpoch;
> }
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> This combined approach provides:
> * Prevention: Keep currentLeader fresh during normal operation
> * Defense: Use the best available epoch value at commit time
> * Resilience: Minimize the window where a stale epoch can cause issues
> ----
> h2. Additional Notes
> Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side
> handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
> {code:java}
> // FetchCollector.java line 325
> log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
>
> // AbstractFetch.java line 207
> log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition,
> partitionError, ...);
>
> // OffsetsForLeaderEpochUtils.java line 102
> LOG.debug("Attempt to fetch offsets for partition {} failed due to {},
> retrying.", ...); {code}
>
> This makes the issue difficult to diagnose in production environments.
> ----
> h2. Workarounds (Until Fixed)
> # Increase retention period to reduce likelihood of epoch deletion
> # Monitor consumer lag to ensure it stays low
> # Reduce rebalance frequency (increase max.poll.interval.ms,
> session.timeout.ms)
> # Use cooperative rebalance strategy to minimize rebalance impact
> # Consider using auto.offset.reset=latest if reprocessing is more costly
> than data loss (application-dependent)
> ----
> h2. Related Code References
> h3. 1. The problematic method: SubscriptionState.allConsumed()
> Location: org.apache.kafka.clients.consumer.internals.SubscriptionState
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new
> OffsetAndMetadata(partitionState.position.offset,
> partitionState.position.offsetEpoch, "")); // Uses
> offsetEpoch instead of currentLeader.epoch
> });
> return allConsumed;
> } {code}
> h3. 2. How FetchPosition is updated during normal consumption
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed
> batch
> position.currentLeader); // currentLeader: inherited from
> old position, NOT updated!
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> } {code}
> Key Issue: The currentLeader field is inherited from the previous position
> and not automatically updated during normal consumption. It only gets updated
> when leader change errors are detected.
> h3. 3. How committed offsets are restored after rebalance
> Location:
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
> {code:java}
> public static void refreshCommittedOffsets(final Map<TopicPartition,
> OffsetAndMetadata> offsetsAndMetadata,
> final ConsumerMetadata metadata,
> final SubscriptionState
> subscriptions) {
> for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
> offsetsAndMetadata.entrySet()) {
> final TopicPartition tp = entry.getKey();
> final OffsetAndMetadata offsetAndMetadata = entry.getValue();
> if (offsetAndMetadata != null) {
> // first update the epoch if necessary
> entry.getValue().leaderEpoch().ifPresent(epoch ->
> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
>
> // it's possible that the partition is no longer assigned when
> the response is received,
> // so we need to ignore seeking if that's the case
> if (subscriptions.isAssigned(tp)) {
> final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
> metadata.currentLeader(tp);
> final SubscriptionState.FetchPosition position = new
> SubscriptionState.FetchPosition(
> offsetAndMetadata.offset(),
> offsetAndMetadata.leaderEpoch(), // offsetEpoch from
> committed offset (may be old)
> leaderAndEpoch); // currentLeader
> from current metadata (may be new)
>
> subscriptions.seekUnvalidated(tp, position);
>
> log.info("Setting offset for partition {} to the committed
> offset {}", tp, position);
> }
> }
> }
> } {code}
> The Divergence Point: When restoring from committed offsets, offsetEpoch
> comes from the stored offset (potentially old), while currentLeader comes
> from fresh metadata (potentially new after leader change).
> h3. 4. How OffsetsForLeaderEpoch validation request is constructed
> Location:
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
> {code:java}
> static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
> Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
> OffsetForLeaderTopicCollection topics = new
> OffsetForLeaderTopicCollection(requestData.size());
> requestData.forEach((topicPartition, fetchPosition) ->
> fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
> OffsetForLeaderTopic topic =
> topics.find(topicPartition.topic());
> if (topic == null) {
> topic = new
> OffsetForLeaderTopic().setTopic(topicPartition.topic());
> topics.add(topic);
> }
> topic.partitions().add(new OffsetForLeaderPartition()
> .setPartition(topicPartition.partition())
> .setLeaderEpoch(fetchEpoch) // Uses
> offsetEpoch for validation
>
> .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
>
> .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
> );
> })
> );
> return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
> } {code}
> The Validation Problem: The validation request uses fetchEpoch (which is
> offsetEpoch) to validate against the broker. If this epoch no longer exists
> in the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
> h3. 5. FetchPosition class definition
> Location:
> org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition
>
> {code:java}
> /**
> * Represents the position of a partition subscription.
> *
> * This includes the offset and epoch from the last record in
> * the batch from a FetchResponse. It also includes the leader epoch at the
> time the batch was consumed.
> */
> public static class FetchPosition {
> public final long offset;
> final Optional<Integer> offsetEpoch; // Epoch from last consumed
> record's batch
> final Metadata.LeaderAndEpoch currentLeader; // Current partition leader
> info from metadata
>
> FetchPosition(long offset) {
> this(offset, Optional.empty(),
> Metadata.LeaderAndEpoch.noLeaderOrEpoch());
> }
>
> public FetchPosition(long offset, Optional<Integer> offsetEpoch,
> Metadata.LeaderAndEpoch currentLeader) {
> this.offset = offset;
> this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
> this.currentLeader = Objects.requireNonNull(currentLeader);
> }
>
> @Override
> public String toString() {
> return "FetchPosition{" +
> "offset=" + offset +
> ", offsetEpoch=" + offsetEpoch +
> ", currentLeader=" + currentLeader +
> '}';
> }
> }{code}
> Class Design: The class contains both offsetEpoch (historical data epoch) and
> currentLeader.epoch (current metadata epoch), but allConsumed() only uses the
> former when committing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)