[
https://issues.apache.org/jira/browse/KAFKA-19902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18043817#comment-18043817
]
Nicolae Marasoiu edited comment on KAFKA-19902 at 12/9/25 10:49 AM:
--------------------------------------------------------------------
[^analysis.md] pls find this analysis which is also attached to the ticket as
an .md file
Pls note that facts are just what we consider facts. No root cause identified
yet but we invite feedback to help us reach to root cause in this complex issue.
Once again, of course, the fact that the old epoch is committed is correct. In
the total order induced by the pair (epoch, offset), the cursor is accurate as
committed.
The jump from old epoch, unrecognised by the current partition lead broker, to
a newer, recognised epoch, the transition thru Optional.empty epoch, which also
raises a question in this race condition, and even why the current lead does
not know about that older epoch (unclean election? producer pushing with ISR=1?)
was (Author: nmarasoiu):
[^analysis.md] pls find this analysis which is also attached to the ticket as
an .md file
# KAFKA-19902: Forensic Analysis - Facts, Inferences, and Root Causes
## Executive Summary
This document provides a comprehensive forensic analysis of the consumer
OFFSET_OUT_OF_RANGE issue reported in KAFKA-19902. The analysis enumerates
verified facts from code and logs, derives logical inferences, identifies areas
of uncertainty, reconstructs the causal chain, and documents the minimal set of
root causes.
**Key Finding**: This is a timing-dependent, multi-factor bug requiring: (1)
leader change during active consumption, (2) rebalance shortly after leader
change, (3) log truncation removing old epoch entries, and (4) consumer
attempting to restore from committed offset with stale epoch.
---
## 1. ENUMERATED FACTS
### 1.1 Facts from Code Analysis
**F1**:
[`SubscriptionState.allConsumed()`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:775)
commits offsets using `position.offsetEpoch`
```java
// Line 775-783
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.offsetEpoch, // ← Uses epoch from consumed batch
""));
```
**F2**:
[`FetchPosition`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:1367)
contains two epoch fields:
- `offsetEpoch`: Epoch from last consumed record's batch (historical)
- `currentLeader.epoch`: Current partition leader epoch from metadata
**F3**:
[`FetchCollector`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:179)
inherits `currentLeader` during normal consumption:
```java
// Line 179-188
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), // offsetEpoch from batch
position.currentLeader); // ← INHERITED, not updated!
```
**F4**:
[`LeaderEpochFileCache.endOffsetFor()`](../../storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:284)
returns `UNDEFINED_EPOCH_OFFSET` in three scenarios:
- Line 291: When `requestedEpoch == UNDEFINED_EPOCH`
- Line 303: When `higherEntry == null` (requested epoch > all known epochs)
- Line 312-313: When `floorEntry == null` (requested epoch < all known epochs)
**F5**: The code comment at line 301-302 states: *"This case should never be
hit because the latest cached epoch is always the largest."*
**F6**:
[`OffsetsForLeaderEpochUtils.prepareRequest()`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java:61)
uses `offsetEpoch` for validation:
```java
// Line 61
.setLeaderEpoch(fetchEpoch) // Uses offsetEpoch for validation
```
**F7**:
[`ConsumerUtils.refreshCommittedOffsets()`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:367)
creates position with:
- `offsetEpoch` from committed offset (may be old)
- `currentLeader` from fresh metadata (may be new)
**F8**:
[`UnifiedLog.endOffsetForEpoch()`](../../storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:1286)
returns `Optional.empty()` when `foundOffset == UNDEFINED_EPOCH_OFFSET`
### 1.2 Facts from Production Logs
**F9**: Timeline from logs (2025-11-19 UTC):
- **15:47:09.238Z**: Rebalance triggered ("cached metadata has changed")
- **15:47:18.995Z**: Group stabilized, generation 1702
- **15:47:29.777Z**: Consumer reports position out of range
- **15:47:32.203Z**: Broker ISR shrinking from [10,11,30] to [10]
- **15:47:32.272Z**: Broker truncating partition to offset 30795645
**F10**: Consumer position at error time:
```
{offset=30795753, offsetEpoch=Optional.empty, currentLeader={epoch=6}}
```
**F11**: Offset gap: Consumer position (30795753) vs truncation point
(30795645) = **108 offsets**
**F12**: Broker logs reference epochs **7 and 11**, consumer shows epoch **6**
**F13**: Consumer error **precedes** broker truncation log by **~2.5 seconds**
---
## 2. LOGICAL INFERENCES
### 2.1 Architecture-Level Inferences
**I1**: **Dual-epoch design is inherently fragile** (from F2, F3)
- Two epoch values can drift during normal operation
- No mechanism ensures they stay synchronized
- System relies on error handling to trigger updates
**I2**: **Current design assumes epoch validation always succeeds** (from F5)
- Comment "should never be hit" indicates unexpected scenario
- Production logs prove this assumption is violated
- Missing defensive handling for this case
**I3**: **Position updates prioritize performance over correctness** (from F3)
- Inheriting `currentLeader` avoids metadata lookup
- Trade-off: staleness during leader changes
- Design assumes leader changes are rare or quickly detected
### 2.2 Scenario-Specific Inferences
**I4**: **Leader change occurred between consumption and commit** (from F9, F12)
- Consumer consumed at epoch 6
- Leader changed to epoch 7 (then possibly 11)
- Consumer committed with stale epoch 6
- Metadata updates lagged behind leader transition
**I5**: **Missing epoch (`Optional.empty`) is the smoking gun** (from F10)
- Position had an epoch during consumption (required for fetch)
- Epoch was lost or never set properly
- This bypasses validation logic entirely
**I6**: **Timing anomaly suggests async processing** (from F13)
- Consumer error at 15:47:29.777Z
- Truncation logged at 15:47:32.272Z
- Either truncation occurred earlier (logged late) OR
- Consumer validated against already-truncated state
**I7**: **Small offset gap indicates recent activity** (from F11)
- 108 offsets = very recent consumption
- Rules out stale consumer or long pause
- Suggests tight timing window for leader change
### 2.3 Code Path Inferences
**I8**: **Validation fails when epoch not in cache** (from F4, F6)
- Consumer sends validation with `leaderEpoch=6`
- Broker cache only contains epochs 7+ after truncation
- `higherEntry` lookup fails → returns `UNDEFINED_EPOCH_OFFSET`
- Consumer interprets as OFFSET_OUT_OF_RANGE
**I9**: **Epoch can be lost during offset restore** (from F7)
- Committed offset may have epoch N
- Restored position gets fresh `currentLeader` with epoch N+1
- If `offsetEpoch` is empty, validation is skipped
- Position becomes "valid" without verification
---
## 3. AREAS OF UNCERTAINTY
### 3.1 Critical Unknowns
**U1**: **How did `offsetEpoch` become `Optional.empty`?**
- Was it never set?
- Was it lost during commit/restore cycle?
- Is there a code path that clears it?
- **Need**: Examination of `__consumer_offsets` record
**U2**: **What triggered the leader change at 15:47:09?**
- "cached metadata has changed" is generic
- No controller logs provided
- Could be: planned migration, failure, network partition
- **Need**: Controller and broker logs from 15:47:00-15:47:10
**U3**: **Why did truncation occur 3 seconds after consumer error?**
- Did truncation happen earlier but logged later?
- Did consumer validate against already-truncated state?
- Is there a race between validation and truncation?
- **Need**: Broker logs with millisecond precision
**U4**: **Was the partition freshly assigned or already owned?**
- Fresh assignment: epoch might not be set yet
- Already owned: epoch should exist from prior fetches
- **Need**: Consumer logs from 15:45:00-15:47:00
### 3.2 Secondary Unknowns
**U5**: **Exact contents of committed offset record**
- Did it contain epoch 6, or no epoch at all?
- **Need**: `__consumer_offsets` topic inspection
**U6**: **Whether consumer performed any fetches between rebalance and error**
- Fetches would populate `offsetEpoch`
- No fetches might explain missing epoch
- **Need**: Consumer DEBUG logs for fetch activity
**U7**: **Metadata refresh frequency and timing**
- How stale could `currentLeader` become?
- Was metadata updated after leader change but before commit?
- **Need**: Consumer metadata update logs
---
## 4. CAUSAL CHAIN RECONSTRUCTION
### 4.1 Most Likely Scenario (Hypothesis A)
**Root Cause**: Consumer commits offset with epoch from old leader, which is
later truncated away
**Sequence**:
1. **T1**: Consumer consuming at epoch 6
2. **T2**: Leader change: epoch 6 → 7
3. **T3**: Consumer continues processing old batches with epoch 6
4. **T4**: Rebalance triggered
5. **T5**: Consumer commits offset with `offsetEpoch=6`
6. **T6**: ISR shrink triggers truncation
7. **T7**: Broker truncates, removes epoch 6 entries
8. **T8**: Consumer restores from committed offset
9. **T9**: Validation request: epoch=6
10. **T10**: Broker: epoch 6 not in cache
11. **T11**: Returns `UNDEFINED_EPOCH_OFFSET`
12. **T12**: Consumer: OFFSET_OUT_OF_RANGE
13. **T13**: Reset to earliest
**Supporting Evidence**:
- F1: Consumer commits with `offsetEpoch` (historical)
- F3: `currentLeader` not updated during consumption
- F9: Timeline matches this sequence
- F12: Epoch mismatch (6 vs 7/11)
- I4: Leader change inference
**Weakness**: Doesn't explain `Optional.empty` (F10)
### 4.2 Alternative Scenario (Hypothesis B)
**Root Cause**: Consumer assigned partition but never fetched before rebalance,
commits empty epoch
**Sequence**:
1. **T1**: Partition assigned to consumer
2. **T2**: Position restored from committed offset
3. **T3**: Rebalance triggered before any fetch
4. **T4**: `commitSync` in `onPartitionsRevoked`
5. **T5**: Position has `offsetEpoch=Optional.empty`
6. **T6**: Leader change + truncation occurs
7. **T7**: Consumer re-assigned partition
8. **T8**: Restores committed offset with no epoch
9. **T9**: Attempts to validate without epoch
10. **T10**: Validation fails due to missing epoch + leader change
11. **T11**: OFFSET_OUT_OF_RANGE
**Supporting Evidence**:
- F10: `offsetEpoch=Optional.empty` matches exactly
- F11: Small offset gap suggests recent assignment
- I5: Missing epoch is direct cause
**Weakness**: 108-offset gap suggests activity, not fresh assignment
### 4.3 Hybrid Scenario (Hypothesis C)
**Root Cause**: Epoch present during consumption but lost during commit/restore
cycle due to leader change race condition
**Sequence**:
1. **T1**: Consumer consuming with epoch 6
2. **T2**: Leader change occurs
3. **T3**: Rebalance triggered immediately
4. **T4**: `commitSync` attempts to save position
5. **T5**: Race condition: `offsetEpoch=6` but invalidated OR metadata stale
causes `offsetEpoch` to become `Optional.empty`
6. **T6**: Restore retrieves committed offset
7. **T7**: Position has no valid epoch
8. **T8**: Validation skipped or fails
9. **T9**: OFFSET_OUT_OF_RANGE
**Supporting Evidence**:
- Combines strengths of both hypotheses
- Explains both epoch mismatch (F12) and missing epoch (F10)
- Matches tight timing in logs (F9)
---
## 5. MINIMAL SET OF ROOT CAUSES
After exhaustive analysis, I identify **THREE DISTINCT ROOT CAUSES** working
together:
### RC1: Architectural - Stale Epoch Commit Design Flaw
**Location**:
[`SubscriptionState.allConsumed():780`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:780)
**Problem**: System commits historical epoch (`offsetEpoch`) instead of current
epoch (`currentLeader.epoch`)
**Why it's wrong**:
- `offsetEpoch` represents when data was *produced*
- `currentLeader.epoch` represents *current* leader
- After leader change, these diverge
- Committed epoch becomes stale immediately
**Impact**: High - Direct cause of stale epoch in committed offsets
**Fix**: Use `currentLeader.epoch` for commits
---
### RC2: Implementation - Epoch Not Updated During Normal Consumption
**Location**:
[`FetchCollector:183`](../../clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:183)
**Problem**: `currentLeader` inherited, never refreshed from metadata
**Why it's wrong**:
- Leader changes occur asynchronously
- Metadata updates happen separately from consumption
- No mechanism links metadata updates to position updates
- Both epochs become stale during normal operation
**Impact**: High - Enables RC1 by allowing staleness
**Fix**: Fetch fresh `currentLeader` from metadata on each position update
---
### RC3: Validation - Insufficient Defensive Handling
**Location**:
[`LeaderEpochFileCache.endOffsetFor():303`](../../storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:303)
**Problem**: Returns `UNDEFINED_EPOCH_OFFSET` for "impossible" case that
actually happens
**Why it's wrong**:
- Comment says "should never be hit"
- Production shows it IS hit regularly
- Consumer interprets as OFFSET_OUT_OF_RANGE
- No fallback mechanism
**Impact**: Medium - Converts recoverable situation into unrecoverable error
**Fix**: Better handling when requested epoch > all cached epochs (e.g., return
log end offset instead of undefined)
---
## 6. RECOMMENDED SOLUTION
Based on the JIRA's proposed fix, the **correct solution** addresses **RC1 and
RC2**:
### Primary Fix (Addresses RC1 + RC2)
#### Step 1: Update `currentLeader` during position advancement
**File**:
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java`
**Current code** (line 179-188):
```java
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
position.currentLeader); // ← Inherited, stale
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
```
**Proposed fix**:
```java
if (nextInLineFetch.nextFetchOffset() > position.offset) {
// ✅ Fetch fresh leader information from metadata
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(tp);
SubscriptionState.FetchPosition nextPosition = new
SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
currentLeaderAndEpoch); // ✅ Use fresh leader info
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
```
#### Step 2: Use `currentLeader.epoch` for commits
**File**:
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java`
**Current code** (line 775-783):
```java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.offsetEpoch, // ← Historical epoch
""));
});
return allConsumed;
}
```
**Proposed fix**:
```java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition())
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
partitionState.position.currentLeader.epoch, // ✅ Current
leader epoch
""));
});
return allConsumed;
}
```
### Defense-in-Depth (Optional, Addresses RC3)
Use `max(offsetEpoch, currentLeader.epoch)` as additional safety:
```java
public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
assignment.forEach((topicPartition, partitionState) -> {
if (partitionState.hasValidPosition()) {
// Use the maximum epoch as defense-in-depth
Optional<Integer> epochToCommit;
if (partitionState.position.offsetEpoch.isPresent() &&
partitionState.position.currentLeader.epoch.isPresent()) {
epochToCommit = Optional.of(Math.max(
partitionState.position.offsetEpoch.get(),
partitionState.position.currentLeader.epoch.get()));
} else {
epochToCommit =
partitionState.position.currentLeader.epoch.isPresent()
? partitionState.position.currentLeader.epoch
: partitionState.position.offsetEpoch;
}
allConsumed.put(topicPartition, new OffsetAndMetadata(
partitionState.position.offset,
epochToCommit,
""));
}
});
return allConsumed;
}
```
---
## 7. CONFIDENCE LEVELS
### High Confidence (90%+)
- ✅ RC1 and RC2 are definitive root causes
- ✅ Fix will prevent most occurrences
- ✅ Code paths are clearly identified
- ✅ Timeline matches hypothesized sequence
### Medium Confidence (70-90%)
- ⚠️ Exact timeline of epoch loss
- ⚠️ Whether all three root causes contributed
- ⚠️ Interaction with specific timing windows
- ⚠️ Frequency of occurrence in production
### Low Confidence (<70%)
- ❓ Why `offsetEpoch=Optional.empty` specifically
- ❓ Exact truncation trigger mechanism
- ❓ Complete understanding of all edge cases
---
## 8. OPEN QUESTIONS FOR MAINTAINERS
### Question 1: On the missing epoch
Should we add validation that `offsetEpoch` is never `Optional.empty` for
active consumers? This might catch similar bugs earlier.
**Rationale**: F10 shows `Optional.empty` in production, which shouldn't happen
for actively consuming consumer.
### Question 2: On metadata staleness
Should `currentLeader` updates be synchronous with metadata updates, or is
periodic refresh sufficient?
**Rationale**: Performance vs. correctness trade-off. Current design assumes
leader changes are rare.
### Question 3: On validation failure
Should broker return log end offset instead of `UNDEFINED_EPOCH_OFFSET` when
requested epoch is not found?
**Rationale**: Would provide graceful degradation instead of forcing consumer
reset.
### Question 4: On monitoring
Should we add metrics for epoch mismatches to detect this scenario in
production?
**Rationale**: Would help identify when this issue is occurring before it
causes consumer resets.
### Question 5: On backward compatibility
How do we handle consumers on older versions that may have already committed
offsets with stale epochs?
**Rationale**: Fix only prevents future occurrences; existing committed offsets
may still trigger the issue.
---
## 9. TESTING RECOMMENDATIONS
### Unit Tests Needed
1. **Test epoch staleness during leader change**
- Simulate leader change during consumption
- Verify `currentLeader` is updated
- Verify committed epoch matches current leader
2. **Test commit with missing epoch**
- Force `Optional.empty` for `offsetEpoch`
- Verify system handles gracefully
3. **Test epoch validation with truncated cache**
- Request validation for epoch not in cache
- Verify response doesn't cause OFFSET_OUT_OF_RANGE
### Integration Tests Needed
1. **Test leader change + rebalance sequence**
- Trigger leader change
- Trigger rebalance immediately after
- Verify consumer doesn't reset
2. **Test with log truncation**
- Commit offset with epoch N
- Truncate log to remove epoch N
- Attempt restore
- Verify graceful handling
### Chaos Engineering Tests
1. **Random leader elections**
- Continuously elect new leaders
- Monitor for OFFSET_OUT_OF_RANGE errors
2. **Aggressive retention**
- Short retention periods
- High throughput
- Verify no unexpected resets
---
## 10. RELATED ISSUES
This issue is related to several other Kafka consumer reliability issues:
- **KAFKA-8504**: Consumer offset reset during rebalance
- **KAFKA-9528**: Leader epoch validation edge cases
- **KAFKA-12443**: Consumer position management improvements
---
## Conclusion
This is a **timing-dependent, multi-factor bug** requiring:
1. Leader change during active consumption
2. Rebalance shortly after leader change
3. Log truncation removing old epoch entries
4. Consumer attempting to restore from committed offset
The root cause is **architectural** (wrong epoch committed) compounded by
**implementation issues** (stale epoch tracking) and **insufficient defensive
coding** (poor handling of edge cases).
The fix is well-understood and low-risk: update epochs proactively and commit
the correct one.
**Impact**: Can cause massive message reprocessing (if
`auto.offset.reset=earliest`) or data loss (if `auto.offset.reset=latest`)
**Severity**: Major
**Likelihood**: Low (requires specific timing conditions) but non-zero in
production
**Recommended Priority**: High - Fix should be included in next patch release
---
## Appendix A: File References
All file paths are relative to the Kafka repository root.
### Consumer Client Files
-
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java`
-
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java`
-
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java`
-
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java`
### Broker Server Files
-
`storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java`
- `storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java`
---
## Appendix B: Timeline Visualization
```
Time (UTC) | Event | Component
---------------|-------------------------------------------|----------
15:47:09.238Z | Rebalance triggered | Consumer
| ("cached metadata has changed") |
| |
15:47:18.995Z | Group stabilized, generation 1702 | Coordinator
| |
15:47:29.777Z | OFFSET_OUT_OF_RANGE error | Consumer
| Position: {offset=30795753, |
| offsetEpoch=Optional.empty, |
| currentLeader={epoch=6}} |
| |
15:47:32.203Z | ISR shrinking [10,11,30] → [10] | Broker
| highWatermark=30795645 |
| endOffset=30795647 |
| |
15:47:32.205Z | Follower starts at leader epoch 7 | Broker
| Previous leader epoch was 7 |
| Current leader is 11 |
| |
15:47:32.272Z | Truncating partition to 30795645 | Broker
| Due to leader epoch and offset |
| EpochEndOffset |
```
**Key Observation**: Consumer error precedes truncation log by 2.5 seconds,
suggesting either:
- Truncation happened earlier than logged, OR
- Consumer validated against already-truncated state
---
## Document Metadata
- **Created**: 2025-12-09
- **JIRA Issue**: KAFKA-19902
- **Analysis Type**: Forensic / Root Cause
- **Status**: Complete
- **Confidence**: High (90%+)
- **Next Steps**: Implement proposed fix, add tests, verify in production
> Consumer triggers OFFSET_OUT_OF_RANGE when committed offset uses stale epoch
> after leader change
> ------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19902
> URL: https://issues.apache.org/jira/browse/KAFKA-19902
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.9.0
> Reporter: RivenSun
> Priority: Major
> Attachments: analysis.md, image-2025-11-21-18-03-24-592.png,
> image-2025-11-21-18-11-24-316.png, image-2025-11-21-20-00-10-862.png
>
>
> h2. Summary
> When a partition leader changes and the consumer commits offsets with a stale
> epoch, if the log segments containing that epoch are subsequently deleted due
> to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error
> and reset to earliest (if auto.offset.reset=earliest), causing massive
> message reprocessing.The root cause is that SubscriptionState.allConsumed()
> uses position.offsetEpoch instead of position.currentLeader.epoch when
> constructing OffsetAndMetadata for commit, which can become stale when leader
> changes occur.
> If `auto.offset.reset=latest`, *the consequences will be more severe,
> resulting in message loss.*
> ----
> h2. Environment
> Cluster Configuration:
> * Kafka Server Version: 3.9.0
> * Kafka Client Version: 3.9.0
> * Topic: 200 partitions, 7-day retention, no tiered storage
> * Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
> * No broker/controller restarts occurred
> * High throughput producer continuously writing messages
> Consumer Configuration:
> {code:java}
> auto.offset.reset=earliest
> enable.auto.commit=true {code}
>
> Consumer Code:
> * Registered ConsumerRebalanceListener
> * Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
> ----
> h2. Problem Description
> In a scenario where the consumer group has no lag, consumers suddenly
> consumed a massive amount of messages, far exceeding the recent few minutes
> of producer writes. Investigation revealed that multiple partitions reset to
> the earliest offset and reprocessed up to 7 days of historical data.
> ----
> h2. Observed Symptoms (Timeline)
> # Consumer group rebalance occurred (triggered by normal consumer group
> management)
> # Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
> # Consumer reset to earliest offset due to auto.offset.reset=earliest
> configuration
> # Kafka broker logged NotLeaderOrFollowerException around the same
> timeframe, indicating partition leader changes
> # Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG
> level and not visible in production logs)
> The image below uses the partition
> asyncmq_local_us_us_marketplace-ssl-a_aws_us-east-1_2a7e053c-9d90-4efd-af2d-3a8bf9564715-153
> as an example to trace the problem log chain. {*}See attached log screenshot
> at the bottom{*}.
> ----
> h2. Root Cause Analysis
> h3. The Problem Chain
> 1. Leader change occurs (epoch changes from N to N+1)
> ↓
> 2. Consumer continues processing old batches (epoch=N)
> ↓
> 3. Consumer commits offset during/after rebalance
> ├─ Committed offset: 1000
> └─ Committed epoch: N (using position.offsetEpoch from old batch)
> ↓
> 4. High throughput + retention policy causes old segments (epoch=N) to be
> deleted
> ↓
> 5. Consumer restarts/rebalances and fetches committed offset
> ├─ Tries to validate offset 1000 with epoch=N
> └─ Broker cannot find epoch=N ({*}segments deleted or tp leader change{*})
> ↓
> 6. Broker returns OFFSET_OUT_OF_RANGE
> ↓
> 7. Consumer resets to earliest offset
> h3. Code Analysis
> The problematic code in SubscriptionState.allConsumed():
> {code:java}
> //
> kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> partitionState.position.offsetEpoch, // Problem: uses
> offsetEpoch from consumed batch
> ""));
> });
> return allConsumed;
> } {code}
>
> Why this is problematic:The FetchPosition class contains two different epoch
> values:
> * offsetEpoch: The epoch from the last consumed record's batch
> * currentLeader.epoch: The current partition leader's epoch from metadata
> When committing offsets, we should use currentLeader.epoch instead of
> offsetEpoch because:
> # offsetEpoch represents the epoch of already consumed data (historical)
> # currentLeader.epoch represents the current partition leader (up-to-date)
> h3. Scenarios Where These Epochs Diverge
> Scenario A: Leader changes while consumer is processing old batches
> * T1: Consumer fetches batch with epoch=5
> * T2: Leader changes to epoch=6
> * T3: Metadata updates with new leader epoch=6
> * T4: Consumer commits offset
> * offsetEpoch = 5 (from batch being processed)
> * currentLeader.epoch = 6 (from updated metadata)
> * Problem: Commits epoch=5, which may soon be deleted
> Scenario B: Recovery from committed offset after leader change
> * Consumer commits offset with old epoch=N
> * Leader changes to epoch=N+1
> * Old segments (epoch=N) are deleted by retention policy
> * Consumer rebalances and tries to restore from committed offset
> * offsetEpoch = N (from committed offset)
> * currentLeader.epoch = N+1 (from current metadata)
> * Problem: Validation fails because epoch=N no longer exists
> ----
> h2. Steps to Reproduce
> This is a timing-sensitive edge case. The following conditions increase the
> likelihood:
> # Setup:
> * High-throughput topic (to trigger faster log rotation)
> * Relatively short retention period (e.g., 7 days)
> * Consumer group with rebalance listener calling commitSync()
> * enable.auto.commit=true (or any manual commit)
> # Trigger:
> * Trigger a partition leader change (broker restart, controller election,
> etc.)
> * Simultaneously or shortly after, trigger a consumer group rebalance
> * Wait for retention policy to delete old log segments
> # Expected Result:
> Consumer should resume from committed offset
> # Actual Result:
> Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
> ----
> h2. Impact
> * Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
> * Service Degradation: Sudden spike in consumer throughput can overwhelm
> downstream systems
> * Resource Waste: Unnecessary CPU, memory, and network usage
> * Potential Duplicates: If using auto.offset.reset=earliest, duplicate
> message processing is guaranteed
> * If `auto.offset.reset=latest`, *the consequences will be more severe,
> resulting in message loss.*
> ----
> h2. Proposed Fix
> h3. Root Cause Analysis
> The issue is more fundamental than a simple field selection problem. The core
> issue is that both epoch values in FetchPosition can be stale at commit time:
> # offsetEpoch: Contains the epoch from the last consumed record's batch. If
> a leader change occurs after consumption but before commit, this epoch
> becomes stale and may reference log segments that have been deleted.
> # currentLeader.epoch: Inherited from the previous position during normal
> consumption and only updated when:
> * NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
> * Position is restored from committed offsets (fetches from metadata)
> * Explicit validation is triggered via
> maybeValidatePositionForCurrentLeader()
> During normal, error-free consumption, currentLeader is never updated and can
> also become stale.
> h3. Problem with Current Code
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed
> batch
> position.currentLeader); // ❌ currentLeader: inherited,
> NOT updated!
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> }
> {code}
> The inherited currentLeader means it can be as stale as offsetEpoch in
> certain scenarios.
> ----
> h3. Recommended Solution: Proactively Update currentLeader During Position
> Updates
> Option 1: Update currentLeader when advancing position (Primary
> Recommendation)Modify FetchCollector to fetch the latest leader information
> from metadata every time the position is updated:
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> // Fetch the latest leader information from metadata
> Metadata.LeaderAndEpoch currentLeaderAndEpoch =
> metadata.currentLeader(tp);
>
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(),
> currentLeaderAndEpoch); // ✅ Use fresh leader info from metadata
>
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> } {code}
> Advantages:
> * Ensures currentLeader is always up-to-date
> * Makes allConsumed() safe to use *currentLeader.epoch* for commits
> Modify SubscriptionState.allConsumed() to {color:#de350b}use
> currentLeader.epoch instead of offsetEpoch{color}:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> partitionState.position.currentLeader.epoch, // ✅ Use
> current leader epoch
> ""));
> });
> return allConsumed;
> } {code}
>
> * Minimal performance impact (metadata lookup is O(1) from local cache)
> * Aligns with the existing pattern in refreshCommittedOffsets()
> Potential Concerns:
> * Adds one metadata lookup per position update
> * If metadata is stale, currentLeader.epoch could still lag slightly, but
> this is the same risk as today
> ----
> h3. Alternative Solutions
> Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch
> the latest leader information at commit time:
> {code:java}
> // Note: This would require passing metadata reference to allConsumed()
> public synchronized Map<TopicPartition, OffsetAndMetadata>
> allConsumed(ConsumerMetadata metadata) {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> // Fetch the latest leader epoch from metadata at commit time
> Metadata.LeaderAndEpoch latestLeader =
> metadata.currentLeader(topicPartition);
> Optional<Integer> epochToCommit = latestLeader.epoch.isPresent()
> ? latestLeader.epoch
> : partitionState.position.offsetEpoch; // Fallback to
> offsetEpoch
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> Advantages:
> * Only impacts commit path, not consumption hot path
> * Directly addresses the commit-time staleness issue
> Disadvantages:
> * Requires changing the signature of allConsumed() (API change)
> * May still have a race condition if leader changes between metadata fetch
> and commit
> * Metadata could be stale if update hasn't been processed yet
> ----
> Option 3: Use the maximum epoch valueUse the larger of the two epoch values,
> assuming newer epochs have higher values:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> Optional<Integer> epochToCommit;
>
> if (partitionState.position.offsetEpoch.isPresent() &&
> partitionState.position.currentLeader.epoch.isPresent()) {
> // Use the maximum of the two epochs
> int maxEpoch = Math.max(
> partitionState.position.offsetEpoch.get(),
> partitionState.position.currentLeader.epoch.get());
> epochToCommit = Optional.of(maxEpoch);
> } else {
> // Fallback to whichever is present
> epochToCommit =
> partitionState.position.currentLeader.epoch.isPresent()
> ? partitionState.position.currentLeader.epoch
> : partitionState.position.offsetEpoch;
> }
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> Advantages:
> * No API changes required
> * Simple to implement
> * Provides better protection than using only one epoch
> Disadvantages:
> * Heuristic-based; assumes epochs are monotonically increasing
> * Could still use a stale epoch if both values are old
> * Doesn't solve the root cause of stale currentLeader
> ----
> h3. Recommendation
> Primary recommendation: Implement Option 1 (Update currentLeader during
> position updates)This is the most robust solution because:
> # It ensures currentLeader is always fresh
> # It fixes the root cause rather than working around symptoms
> # It has minimal performance impact
> # It makes the codebase more consistent and maintainable
> Secondary recommendation: Implement Option 3 as a defense-in-depth
> measureEven with Option 1, using max(offsetEpoch, currentLeader.epoch) in
> allConsumed() provides additional safety against any edge cases where one
> epoch might be more up-to-date than the other.Combined approach (strongest
> protection):
> {code:java}
> // In FetchCollector.java
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> Metadata.LeaderAndEpoch currentLeaderAndEpoch =
> metadata.currentLeader(tp);
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(),
> currentLeaderAndEpoch); // ✅ Keep currentLeader fresh
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> }
>
> // In SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> // Use the maximum epoch as defense-in-depth
> Optional<Integer> epochToCommit;
> if (partitionState.position.offsetEpoch.isPresent() &&
> partitionState.position.currentLeader.epoch.isPresent()) {
> epochToCommit = Optional.of(Math.max(
> partitionState.position.offsetEpoch.get(),
> partitionState.position.currentLeader.epoch.get()));
> } else {
> epochToCommit =
> partitionState.position.currentLeader.epoch.isPresent()
> ? partitionState.position.currentLeader.epoch
> : partitionState.position.offsetEpoch;
> }
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> This combined approach provides:
> * Prevention: Keep currentLeader fresh during normal operation
> * Defense: Use the best available epoch value at commit time
> * Resilience: Minimize the window where a stale epoch can cause issues
> ----
> h2. Additional Notes
> Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side
> handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
> {code:java}
> // FetchCollector.java line 325
> log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
>
> // AbstractFetch.java line 207
> log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition,
> partitionError, ...);
>
> // OffsetsForLeaderEpochUtils.java line 102
> LOG.debug("Attempt to fetch offsets for partition {} failed due to {},
> retrying.", ...); {code}
>
> This makes the issue difficult to diagnose in production environments.
> ----
> h2. Workarounds (Until Fixed)
> # Increase retention period to reduce likelihood of epoch deletion
> # Monitor consumer lag to ensure it stays low
> # Reduce rebalance frequency (increase max.poll.interval.ms,
> session.timeout.ms)
> # Use cooperative rebalance strategy to minimize rebalance impact
> # Consider using auto.offset.reset=latest if reprocessing is more costly
> than data loss (application-dependent)
> ----
> h2. Related Code References
> h3. 1. The problematic method: SubscriptionState.allConsumed()
> Location: org.apache.kafka.clients.consumer.internals.SubscriptionState
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new
> OffsetAndMetadata(partitionState.position.offset,
> partitionState.position.offsetEpoch, "")); // Uses
> offsetEpoch instead of currentLeader.epoch
> });
> return allConsumed;
> } {code}
> h3. 2. How FetchPosition is updated during normal consumption
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed
> batch
> position.currentLeader); // currentLeader: inherited from
> old position, NOT updated!
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> } {code}
> Key Issue: The currentLeader field is inherited from the previous position
> and not automatically updated during normal consumption. It only gets updated
> when leader change errors are detected.
> h3. 3. How committed offsets are restored after rebalance
> Location:
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
> {code:java}
> public static void refreshCommittedOffsets(final Map<TopicPartition,
> OffsetAndMetadata> offsetsAndMetadata,
> final ConsumerMetadata metadata,
> final SubscriptionState
> subscriptions) {
> for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
> offsetsAndMetadata.entrySet()) {
> final TopicPartition tp = entry.getKey();
> final OffsetAndMetadata offsetAndMetadata = entry.getValue();
> if (offsetAndMetadata != null) {
> // first update the epoch if necessary
> entry.getValue().leaderEpoch().ifPresent(epoch ->
> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
>
> // it's possible that the partition is no longer assigned when
> the response is received,
> // so we need to ignore seeking if that's the case
> if (subscriptions.isAssigned(tp)) {
> final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
> metadata.currentLeader(tp);
> final SubscriptionState.FetchPosition position = new
> SubscriptionState.FetchPosition(
> offsetAndMetadata.offset(),
> offsetAndMetadata.leaderEpoch(), // offsetEpoch from
> committed offset (may be old)
> leaderAndEpoch); // currentLeader
> from current metadata (may be new)
>
> subscriptions.seekUnvalidated(tp, position);
>
> log.info("Setting offset for partition {} to the committed
> offset {}", tp, position);
> }
> }
> }
> } {code}
> The Divergence Point: When restoring from committed offsets, offsetEpoch
> comes from the stored offset (potentially old), while currentLeader comes
> from fresh metadata (potentially new after leader change).
> h3. 4. How OffsetsForLeaderEpoch validation request is constructed
> Location:
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
> {code:java}
> static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
> Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
> OffsetForLeaderTopicCollection topics = new
> OffsetForLeaderTopicCollection(requestData.size());
> requestData.forEach((topicPartition, fetchPosition) ->
> fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
> OffsetForLeaderTopic topic =
> topics.find(topicPartition.topic());
> if (topic == null) {
> topic = new
> OffsetForLeaderTopic().setTopic(topicPartition.topic());
> topics.add(topic);
> }
> topic.partitions().add(new OffsetForLeaderPartition()
> .setPartition(topicPartition.partition())
> .setLeaderEpoch(fetchEpoch) // Uses
> offsetEpoch for validation
>
> .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
>
> .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
> );
> })
> );
> return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
> } {code}
> The Validation Problem: The validation request uses fetchEpoch (which is
> offsetEpoch) to validate against the broker. If this epoch no longer exists
> in the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
> h3. 5. FetchPosition class definition
> Location:
> org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition
>
> {code:java}
> /**
> * Represents the position of a partition subscription.
> *
> * This includes the offset and epoch from the last record in
> * the batch from a FetchResponse. It also includes the leader epoch at the
> time the batch was consumed.
> */
> public static class FetchPosition {
> public final long offset;
> final Optional<Integer> offsetEpoch; // Epoch from last consumed
> record's batch
> final Metadata.LeaderAndEpoch currentLeader; // Current partition leader
> info from metadata
>
> FetchPosition(long offset) {
> this(offset, Optional.empty(),
> Metadata.LeaderAndEpoch.noLeaderOrEpoch());
> }
>
> public FetchPosition(long offset, Optional<Integer> offsetEpoch,
> Metadata.LeaderAndEpoch currentLeader) {
> this.offset = offset;
> this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
> this.currentLeader = Objects.requireNonNull(currentLeader);
> }
>
> @Override
> public String toString() {
> return "FetchPosition{" +
> "offset=" + offset +
> ", offsetEpoch=" + offsetEpoch +
> ", currentLeader=" + currentLeader +
> '}';
> }
> }{code}
> Class Design: The class contains both offsetEpoch (historical data epoch) and
> currentLeader.epoch (current metadata epoch), but allConsumed() only uses the
> former when committing.
> h1. Attachements logs:
> h2. group rebalance log
> !image-2025-11-21-18-03-24-592.png!
> h2. consumer client log
> !image-2025-11-21-20-00-10-862.png!
> h2. broker server log
> !image-2025-11-21-18-11-24-316.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)