[ 
https://issues.apache.org/jira/browse/KAFKA-19902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041444#comment-18041444
 ] 

Nicolae Marasoiu edited comment on KAFKA-19902 at 12/5/25 8:19 AM:
-------------------------------------------------------------------

Hi RivenSun and Lianet,

Here are my proposed thoughts.

First, to be clear: The bug report is valid and the impact is real—unnecessary 
reprocessing costs time and money, and that matters. 

But I also want to note that Kafka's current behavior is safe in the sense that 
matters most: with auto.offset.reset=earliest, you don't lose data. But "safe" 
isn't the same as "optimal," and there's room to do better. Also, if the reset 
strategy is "latest" it means loosing more data.

Where I respectfully disagree with the proposed fix (changing which epoch gets 
committed). The way commit works now is the right approach. The consumer should 
record the epoch of the data it actually consumed—that's accurate provenance. 
The problem isn't the commit logic; it's how the consumer recovers when that 
committed epoch can no longer be validated against the new leader.

The Problem With Current Recovery

When epoch validation fails (the UNDEFINED_EPOCH_OFFSET case that KAFKA-7415 
partially addressed), the consumer currently has two choices: reset to earliest 
(reprocess everything) or reset to latest (lose everything since commit). Both 
are worse than necessary.

What We Actually Want

The consumer already knows exactly what it consumed: (epoch=N, offset=5000). 
When it can't validate this against the new leader, it shouldn't forget this 
knowledge. It should ask: "What's the first message in the log that I haven't 
processed?"

The Insight

There's a natural total ordering on (epoch, offset) pairs — compare epoch 
first, then offset within epoch. The consumer's committed position represents a 
point in this ordering. Recovery should resume from the first position in the 
current log that's greater than the committed position in this ordering.

What This Achieves

Messages rewritten under new epochs get consumed (they're genuinely new)
Already-processed messages get skipped
Data loss equals exactly what was lost in the leader transition — the 
unavoidable minimum
No unnecessary reprocessing

Questions for Maintainers

When UNDEFINED_EPOCH_OFFSET is returned, does the broker provide enough info to 
locate the next valid (epoch, offset)?

Does this align with the original intent of KIP-320/KIP-101?

Gist: Purpose is to keep data loss to same level as full reset, while 
optimizing safely. We want to not loose offset ranges that were potentially 
rewritten by subsequent leaders, so we want to rewind to those, but not to 
start of partition. But still skip already processed (epoch, offset) "range", 
where range is defined in the total ordered set of (epoch, offset), not a naive 
offset range.

Please check the total ordering (epoch, offset) proposal.
---

**Related Issues**

This appears related to prior JIRAs dealing with epoch validation edge cases:

| JIRA | Summary | Status | Relevance |
|------|---------|--------|-----------|
| KAFKA-7415 | OffsetsForLeaderEpoch may incorrectly respond with 
`UNDEFINED_EPOCH` | Fixed 2.0.1 | Partial fix — handles `requestedEpoch < 
earliestCachedEpoch` but NOT `requestedEpoch > latestCachedEpoch` |
| KAFKA-9840 | Consumer should not use OffsetForLeaderEpoch without current 
epoch validation | Open | Same symptom — consumer gets `UNDEFINED` response and 
resets |

---

**Critical Finding**

Looking at the KAFKA-7415 fix in `LeaderEpochFileCache.endOffsetFor()`:

```java
if (higherEntry == null) {
    // Comment: "This case should never be hit"
    return UNDEFINED_EPOCH_OFFSET;  // <-- But it IS being hit!
}
```

The `requestedEpoch > latestCachedEpoch` case occurs when a consumer commits an 
offset under an epoch that the new leader doesn't have. This happens because 
the consumer correctly records the epoch of the data it consumed, but that 
epoch may have existed only briefly on the old leader before failover.

This isn't a bug in the consumer's commit logic — it's accurate provenance. The 
problem is how the system responds when that provenance can't be validated.

---

**The Core Issue**

The consumer already commits (epoch, offset) pairs — that's correct. The 
problem is on the **recovery path** when the committed epoch can no longer be 
validated.

Currently:
1. Consumer committed (epoch=N, offset=5000)
2. Leader changed, new leader has epoch=N+1
3. New leader can't validate epoch=N → returns `UNDEFINED_EPOCH_OFFSET`
4. Consumer rewinds to earliest and reprocesses everything

---

**Proposal: How the consumption resume should Work, equivalently to full reset 
but skipping only processed (epoch, offsets) pairs in an easy efficient manner**

There is a natural total ordering over (epoch, offset) pairs. When validation 
fails, the consumer shouldn't panic and rewind to the beginning. It should ask: 
"What's the first (epoch, offset) in the log that comes *after* my committed 
position in this ordering?"

The recovery logic should:
1. Keep the committed (epoch=N, offset=5000) as ground truth for what was 
consumed
2. When validation fails, query the broker for the log's current epoch 
boundaries
3. Find the first (epoch, offset) that is greater than (N, 5000) in the total 
ordering
4. Resume from there

This means:
- If the log was rewritten under a new epoch, those are new messages — they get 
consumed
- Already-consumed messages under the old epoch get skipped (they're gone 
anyway)
- No unnecessary reprocessing of data that still exists and was already consumed
- Data loss is bounded to exactly what was lost in the leader transition — the 
unavoidable minimum

---

**Request**

Before discussing implementation, I'd like to confirm with maintainers:

1. Does this total ordering model align with the original intent of epoch-based 
truncation detection (KIP-320, KIP-101)?
2. When `UNDEFINED_EPOCH_OFFSET` is returned, does the broker provide enough 
information to locate the next valid (epoch, offset) in the log?

Best regards,
Nicolae Marasoiu


was (Author: nmarasoiu):
Hi RivenSun and lianetm,

Here are my proposed thoughts,

The current functionality is correct but can be safely optimized without losing 
data.

I respectfully disagree with the proposed fix to change which epoch gets 
committed. The consumer should record the epoch of the data it actually 
consumed — that's accurate provenance. The problem isn't the commit logic; it's 
how the system recovers when that committed epoch can no longer be validated 
against the new leader.

The Problem With Current Recovery

When epoch validation fails (the UNDEFINED_EPOCH_OFFSET case that KAFKA-7415 
partially addressed), the consumer currently has two choices: reset to earliest 
(reprocess everything) or reset to latest (lose everything since commit). Both 
are worse than necessary.

What We Actually Want

The consumer already knows exactly what it consumed: (epoch=N, offset=5000). 
When it can't validate this against the new leader, it shouldn't forget this 
knowledge. It should ask: "What's the first message in the log that I haven't 
processed?"

The Insight

There's a natural total ordering on (epoch, offset) pairs — compare epoch 
first, then offset within epoch. The consumer's committed position represents a 
point in this ordering. Recovery should resume from the first position in the 
current log that's greater than the committed position in this ordering.

What This Achieves

Messages rewritten under new epochs get consumed (they're genuinely new)
Already-processed messages get skipped
Data loss equals exactly what was lost in the leader transition — the 
unavoidable minimum
No unnecessary reprocessing

Questions for Maintainers

When UNDEFINED_EPOCH_OFFSET is returned, does the broker provide enough info to 
locate the next valid (epoch, offset)?

Does this align with the original intent of KIP-320/KIP-101?

Gist: Purpose is to keep data loss to same level as full reset, while 
optimizing safely. We want to not loose offset ranges that were potentially 
rewritten by subsequent leaders, so we want to rewind to those, but not to 
start of partition. But still skip already processed (epoch, offset) "range", 
where range is defined in the total ordered set of (epoch, offset), not a naive 
offset range.

Please check the total ordering (epoch, offset) proposal.
---

**Related Issues**

This appears related to prior JIRAs dealing with epoch validation edge cases:

| JIRA | Summary | Status | Relevance |
|------|---------|--------|-----------|
| KAFKA-7415 | OffsetsForLeaderEpoch may incorrectly respond with 
`UNDEFINED_EPOCH` | Fixed 2.0.1 | Partial fix — handles `requestedEpoch < 
earliestCachedEpoch` but NOT `requestedEpoch > latestCachedEpoch` |
| KAFKA-9840 | Consumer should not use OffsetForLeaderEpoch without current 
epoch validation | Open | Same symptom — consumer gets `UNDEFINED` response and 
resets |

---

**Critical Finding**

Looking at the KAFKA-7415 fix in `LeaderEpochFileCache.endOffsetFor()`:

```java
if (higherEntry == null) {
    // Comment: "This case should never be hit"
    return UNDEFINED_EPOCH_OFFSET;  // <-- But it IS being hit!
}
```

The `requestedEpoch > latestCachedEpoch` case occurs when a consumer commits an 
offset under an epoch that the new leader doesn't have. This happens because 
the consumer correctly records the epoch of the data it consumed, but that 
epoch may have existed only briefly on the old leader before failover.

This isn't a bug in the consumer's commit logic — it's accurate provenance. The 
problem is how the system responds when that provenance can't be validated.

---

**The Core Issue**

The consumer already commits (epoch, offset) pairs — that's correct. The 
problem is on the **recovery path** when the committed epoch can no longer be 
validated.

Currently:
1. Consumer committed (epoch=N, offset=5000)
2. Leader changed, new leader has epoch=N+1
3. New leader can't validate epoch=N → returns `UNDEFINED_EPOCH_OFFSET`
4. Consumer rewinds to earliest and reprocesses everything

---

**Proposal: How the consumption resume should Work, equivalently to full reset 
but skipping only processed (epoch, offsets) pairs in an easy efficient manner**

There is a natural total ordering over (epoch, offset) pairs. When validation 
fails, the consumer shouldn't panic and rewind to the beginning. It should ask: 
"What's the first (epoch, offset) in the log that comes *after* my committed 
position in this ordering?"

The recovery logic should:
1. Keep the committed (epoch=N, offset=5000) as ground truth for what was 
consumed
2. When validation fails, query the broker for the log's current epoch 
boundaries
3. Find the first (epoch, offset) that is greater than (N, 5000) in the total 
ordering
4. Resume from there

This means:
- If the log was rewritten under a new epoch, those are new messages — they get 
consumed
- Already-consumed messages under the old epoch get skipped (they're gone 
anyway)
- No unnecessary reprocessing of data that still exists and was already consumed
- Data loss is bounded to exactly what was lost in the leader transition — the 
unavoidable minimum

---

**Request**

Before discussing implementation, I'd like to confirm with maintainers:

1. Does this total ordering model align with the original intent of epoch-based 
truncation detection (KIP-320, KIP-101)?
2. When `UNDEFINED_EPOCH_OFFSET` is returned, does the broker provide enough 
information to locate the next valid (epoch, offset) in the log?

Best regards,
Nicolae Marasoiu

> Consumer triggers OFFSET_OUT_OF_RANGE when committed offset uses stale epoch 
> after leader change
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19902
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.9.0
>            Reporter: RivenSun
>            Priority: Major
>         Attachments: image-2025-11-21-18-03-24-592.png, 
> image-2025-11-21-18-11-24-316.png, image-2025-11-21-20-00-10-862.png
>
>
> h2. Summary
> When a partition leader changes and the consumer commits offsets with a stale 
> epoch, if the log segments containing that epoch are subsequently deleted due 
> to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error 
> and reset to earliest (if auto.offset.reset=earliest), causing massive 
> message reprocessing.The root cause is that SubscriptionState.allConsumed() 
> uses position.offsetEpoch instead of position.currentLeader.epoch when 
> constructing OffsetAndMetadata for commit, which can become stale when leader 
> changes occur.
> If `auto.offset.reset=latest`, *the consequences will be more severe, 
> resulting in message loss.*
> ----
> h2. Environment
> Cluster Configuration:
>  * Kafka Server Version: 3.9.0
>  * Kafka Client Version: 3.9.0
>  * Topic: 200 partitions, 7-day retention, no tiered storage
>  * Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
>  * No broker/controller restarts occurred
>  * High throughput producer continuously writing messages
> Consumer Configuration:
> {code:java}
> auto.offset.reset=earliest
> enable.auto.commit=true {code}
>  
> Consumer Code:
>  * Registered ConsumerRebalanceListener
>  * Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
> ----
> h2. Problem Description
> In a scenario where the consumer group has no lag, consumers suddenly 
> consumed a massive amount of messages, far exceeding the recent few minutes 
> of producer writes. Investigation revealed that multiple partitions reset to 
> the earliest offset and reprocessed up to 7 days of historical data.
> ----
> h2. Observed Symptoms (Timeline)
>  # Consumer group rebalance occurred (triggered by normal consumer group 
> management)
>  # Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
>  # Consumer reset to earliest offset due to auto.offset.reset=earliest 
> configuration
>  # Kafka broker logged NotLeaderOrFollowerException around the same 
> timeframe, indicating partition leader changes
>  # Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG 
> level and not visible in production logs)
> The image below uses the partition 
> asyncmq_local_us_us_marketplace-ssl-a_aws_us-east-1_2a7e053c-9d90-4efd-af2d-3a8bf9564715-153
>  as an example to trace the problem log chain. {*}See attached log screenshot 
> at the bottom{*}.
> ----
> h2. Root Cause Analysis
> h3. The Problem Chain
> 1. Leader change occurs (epoch changes from N to N+1)
>    ↓
> 2. Consumer continues processing old batches (epoch=N)
>    ↓
> 3. Consumer commits offset during/after rebalance
>    ├─ Committed offset: 1000
>    └─ Committed epoch: N (using position.offsetEpoch from old batch)
>    ↓
> 4. High throughput + retention policy causes old segments (epoch=N) to be 
> deleted
>    ↓
> 5. Consumer restarts/rebalances and fetches committed offset
>    ├─ Tries to validate offset 1000 with epoch=N
>    └─ Broker cannot find epoch=N ({*}segments deleted or tp leader change{*})
>    ↓
> 6. Broker returns OFFSET_OUT_OF_RANGE
>    ↓
> 7. Consumer resets to earliest offset
> h3. Code Analysis
> The problematic code in SubscriptionState.allConsumed():
> {code:java}
> // 
> kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition())
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 partitionState.position.offsetEpoch,  // Problem: uses 
> offsetEpoch from consumed batch
>                 ""));
>     });
>     return allConsumed;
> } {code}
>  
> Why this is problematic:The FetchPosition class contains two different epoch 
> values:
>  * offsetEpoch: The epoch from the last consumed record's batch
>  * currentLeader.epoch: The current partition leader's epoch from metadata
> When committing offsets, we should use currentLeader.epoch instead of 
> offsetEpoch because:
>  # offsetEpoch represents the epoch of already consumed data (historical)
>  # currentLeader.epoch represents the current partition leader (up-to-date)
> h3. Scenarios Where These Epochs Diverge
> Scenario A: Leader changes while consumer is processing old batches
>  * T1: Consumer fetches batch with epoch=5
>  * T2: Leader changes to epoch=6
>  * T3: Metadata updates with new leader epoch=6
>  * T4: Consumer commits offset
>  * offsetEpoch = 5 (from batch being processed)
>  * currentLeader.epoch = 6 (from updated metadata)
>  * Problem: Commits epoch=5, which may soon be deleted
> Scenario B: Recovery from committed offset after leader change
>  * Consumer commits offset with old epoch=N
>  * Leader changes to epoch=N+1
>  * Old segments (epoch=N) are deleted by retention policy
>  * Consumer rebalances and tries to restore from committed offset
>  * offsetEpoch = N (from committed offset)
>  * currentLeader.epoch = N+1 (from current metadata)
>  * Problem: Validation fails because epoch=N no longer exists
> ----
> h2. Steps to Reproduce
> This is a timing-sensitive edge case. The following conditions increase the 
> likelihood:
>  # Setup:
>  * High-throughput topic (to trigger faster log rotation)
>  * Relatively short retention period (e.g., 7 days)
>  * Consumer group with rebalance listener calling commitSync()
>  * enable.auto.commit=true (or any manual commit)
>  # Trigger:
>  * Trigger a partition leader change (broker restart, controller election, 
> etc.)
>  * Simultaneously or shortly after, trigger a consumer group rebalance
>  * Wait for retention policy to delete old log segments
>  # Expected Result:
> Consumer should resume from committed offset
>  # Actual Result:
> Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
> ----
> h2. Impact
>  * Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
>  * Service Degradation: Sudden spike in consumer throughput can overwhelm 
> downstream systems
>  * Resource Waste: Unnecessary CPU, memory, and network usage
>  * Potential Duplicates: If using auto.offset.reset=earliest, duplicate 
> message processing is guaranteed
>  * If `auto.offset.reset=latest`, *the consequences will be more severe, 
> resulting in message loss.*
> ----
> h2. Proposed Fix
> h3. Root Cause Analysis
> The issue is more fundamental than a simple field selection problem. The core 
> issue is that both epoch values in FetchPosition can be stale at commit time:
>  # offsetEpoch: Contains the epoch from the last consumed record's batch. If 
> a leader change occurs after consumption but before commit, this epoch 
> becomes stale and may reference log segments that have been deleted.
>  # currentLeader.epoch: Inherited from the previous position during normal 
> consumption and only updated when:
>  * NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
>  * Position is restored from committed offsets (fetches from metadata)
>  * Explicit validation is triggered via 
> maybeValidatePositionForCurrentLeader()
> During normal, error-free consumption, currentLeader is never updated and can 
> also become stale.
> h3. Problem with Current Code
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector 
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),    // offsetEpoch: from consumed 
> batch
>             position.currentLeader);         // ❌ currentLeader: inherited, 
> NOT updated!
>     log.trace("Updating fetch position from {} to {} for partition {} and 
> returning {} records from `poll()`",
>             position, nextPosition, tp, partRecords.size());
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> }
> {code}
> The inherited currentLeader means it can be as stale as offsetEpoch in 
> certain scenarios.
> ----
> h3. Recommended Solution: Proactively Update currentLeader During Position 
> Updates
> Option 1: Update currentLeader when advancing position (Primary 
> Recommendation)Modify FetchCollector to fetch the latest leader information 
> from metadata every time the position is updated:
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     // Fetch the latest leader information from metadata
>     Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
> metadata.currentLeader(tp);
>     
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),
>             currentLeaderAndEpoch);  // ✅ Use fresh leader info from metadata
>     
>     log.trace("Updating fetch position from {} to {} for partition {} and 
> returning {} records from `poll()`",
>             position, nextPosition, tp, partRecords.size());
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> } {code}
> Advantages:
>  * Ensures currentLeader is always up-to-date
>  * Makes allConsumed() safe to use *currentLeader.epoch* for commits
> Modify SubscriptionState.allConsumed() to {color:#de350b}use 
> currentLeader.epoch instead of offsetEpoch{color}:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition())
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 partitionState.position.currentLeader.epoch,  // ✅ Use 
> current leader epoch
>                 ""));
>     });
>     return allConsumed;
> } {code}
>  
>  * Minimal performance impact (metadata lookup is O(1) from local cache)
>  * Aligns with the existing pattern in refreshCommittedOffsets()
> Potential Concerns:
>  * Adds one metadata lookup per position update
>  * If metadata is stale, currentLeader.epoch could still lag slightly, but 
> this is the same risk as today
> ----
> h3. Alternative Solutions
> Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch 
> the latest leader information at commit time:
> {code:java}
> // Note: This would require passing metadata reference to allConsumed()
> public synchronized Map<TopicPartition, OffsetAndMetadata> 
> allConsumed(ConsumerMetadata metadata) {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition()) {
>             // Fetch the latest leader epoch from metadata at commit time
>             Metadata.LeaderAndEpoch latestLeader = 
> metadata.currentLeader(topicPartition);
>             Optional<Integer> epochToCommit = latestLeader.epoch.isPresent() 
>                 ? latestLeader.epoch 
>                 : partitionState.position.offsetEpoch;  // Fallback to 
> offsetEpoch
>             
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 epochToCommit,
>                 ""));
>         }
>     });
>     return allConsumed;
> } {code}
> Advantages:
>  * Only impacts commit path, not consumption hot path
>  * Directly addresses the commit-time staleness issue
> Disadvantages:
>  * Requires changing the signature of allConsumed() (API change)
>  * May still have a race condition if leader changes between metadata fetch 
> and commit
>  * Metadata could be stale if update hasn't been processed yet
> ----
> Option 3: Use the maximum epoch valueUse the larger of the two epoch values, 
> assuming newer epochs have higher values:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition()) {
>             Optional<Integer> epochToCommit;
>             
>             if (partitionState.position.offsetEpoch.isPresent() && 
>                 partitionState.position.currentLeader.epoch.isPresent()) {
>                 // Use the maximum of the two epochs
>                 int maxEpoch = Math.max(
>                     partitionState.position.offsetEpoch.get(),
>                     partitionState.position.currentLeader.epoch.get());
>                 epochToCommit = Optional.of(maxEpoch);
>             } else {
>                 // Fallback to whichever is present
>                 epochToCommit = 
> partitionState.position.currentLeader.epoch.isPresent()
>                     ? partitionState.position.currentLeader.epoch
>                     : partitionState.position.offsetEpoch;
>             }
>             
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 epochToCommit,
>                 ""));
>         }
>     });
>     return allConsumed;
> } {code}
> Advantages:
>  * No API changes required
>  * Simple to implement
>  * Provides better protection than using only one epoch
> Disadvantages:
>  * Heuristic-based; assumes epochs are monotonically increasing
>  * Could still use a stale epoch if both values are old
>  * Doesn't solve the root cause of stale currentLeader
> ----
> h3. Recommendation
> Primary recommendation: Implement Option 1 (Update currentLeader during 
> position updates)This is the most robust solution because:
>  # It ensures currentLeader is always fresh
>  # It fixes the root cause rather than working around symptoms
>  # It has minimal performance impact
>  # It makes the codebase more consistent and maintainable
> Secondary recommendation: Implement Option 3 as a defense-in-depth 
> measureEven with Option 1, using max(offsetEpoch, currentLeader.epoch) in 
> allConsumed() provides additional safety against any edge cases where one 
> epoch might be more up-to-date than the other.Combined approach (strongest 
> protection):
> {code:java}
> // In FetchCollector.java
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
> metadata.currentLeader(tp);
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),
>             currentLeaderAndEpoch);  // ✅ Keep currentLeader fresh
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> }
>  
> // In SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition()) {
>             // Use the maximum epoch as defense-in-depth
>             Optional<Integer> epochToCommit;
>             if (partitionState.position.offsetEpoch.isPresent() && 
>                 partitionState.position.currentLeader.epoch.isPresent()) {
>                 epochToCommit = Optional.of(Math.max(
>                     partitionState.position.offsetEpoch.get(),
>                     partitionState.position.currentLeader.epoch.get()));
>             } else {
>                 epochToCommit = 
> partitionState.position.currentLeader.epoch.isPresent()
>                     ? partitionState.position.currentLeader.epoch
>                     : partitionState.position.offsetEpoch;
>             }
>             
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 epochToCommit,
>                 ""));
>         }
>     });
>     return allConsumed;
> } {code}
> This combined approach provides:
>  * Prevention: Keep currentLeader fresh during normal operation
>  * Defense: Use the best available epoch value at commit time
>  * Resilience: Minimize the window where a stale epoch can cause issues
> ----
> h2. Additional Notes
> Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side 
> handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
> {code:java}
> // FetchCollector.java line 325
> log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
>  
> // AbstractFetch.java line 207
> log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition, 
> partitionError, ...);
>  
> // OffsetsForLeaderEpochUtils.java line 102
> LOG.debug("Attempt to fetch offsets for partition {} failed due to {}, 
> retrying.", ...); {code}
>  
> This makes the issue difficult to diagnose in production environments.
> ----
> h2. Workarounds (Until Fixed)
>  # Increase retention period to reduce likelihood of epoch deletion
>  # Monitor consumer lag to ensure it stays low
>  # Reduce rebalance frequency (increase max.poll.interval.ms, 
> session.timeout.ms)
>  # Use cooperative rebalance strategy to minimize rebalance impact
>  # Consider using auto.offset.reset=latest if reprocessing is more costly 
> than data loss (application-dependent)
> ----
> h2. Related Code References
> h3. 1. The problematic method: SubscriptionState.allConsumed()
> Location: org.apache.kafka.clients.consumer.internals.SubscriptionState 
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition())
>             allConsumed.put(topicPartition, new 
> OffsetAndMetadata(partitionState.position.offset,
>                     partitionState.position.offsetEpoch, ""));  // Uses 
> offsetEpoch instead of currentLeader.epoch
>     });
>     return allConsumed;
> } {code}
> h3. 2. How FetchPosition is updated during normal consumption
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector 
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),    // offsetEpoch: from consumed 
> batch
>             position.currentLeader);         // currentLeader: inherited from 
> old position, NOT updated!
>     log.trace("Updating fetch position from {} to {} for partition {} and 
> returning {} records from `poll()`",
>             position, nextPosition, tp, partRecords.size());
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> } {code}
> Key Issue: The currentLeader field is inherited from the previous position 
> and not automatically updated during normal consumption. It only gets updated 
> when leader change errors are detected.
> h3. 3. How committed offsets are restored after rebalance
> Location: 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
> {code:java}
> public static void refreshCommittedOffsets(final Map<TopicPartition, 
> OffsetAndMetadata> offsetsAndMetadata,
>                                            final ConsumerMetadata metadata,
>                                            final SubscriptionState 
> subscriptions) {
>     for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
> offsetsAndMetadata.entrySet()) {
>         final TopicPartition tp = entry.getKey();
>         final OffsetAndMetadata offsetAndMetadata = entry.getValue();
>         if (offsetAndMetadata != null) {
>             // first update the epoch if necessary
>             entry.getValue().leaderEpoch().ifPresent(epoch -> 
> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
>  
>             // it's possible that the partition is no longer assigned when 
> the response is received,
>             // so we need to ignore seeking if that's the case
>             if (subscriptions.isAssigned(tp)) {
>                 final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>                 final SubscriptionState.FetchPosition position = new 
> SubscriptionState.FetchPosition(
>                         offsetAndMetadata.offset(), 
>                         offsetAndMetadata.leaderEpoch(),  // offsetEpoch from 
> committed offset (may be old)
>                         leaderAndEpoch);                   // currentLeader 
> from current metadata (may be new)
>  
>                 subscriptions.seekUnvalidated(tp, position);
>  
>                 log.info("Setting offset for partition {} to the committed 
> offset {}", tp, position);
>             }
>         }
>     }
> } {code}
> The Divergence Point: When restoring from committed offsets, offsetEpoch 
> comes from the stored offset (potentially old), while currentLeader comes 
> from fresh metadata (potentially new after leader change).
> h3. 4. How OffsetsForLeaderEpoch validation request is constructed
> Location: 
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
> {code:java}
> static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
>         Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
>     OffsetForLeaderTopicCollection topics = new 
> OffsetForLeaderTopicCollection(requestData.size());
>     requestData.forEach((topicPartition, fetchPosition) ->
>             fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
>                 OffsetForLeaderTopic topic = 
> topics.find(topicPartition.topic());
>                 if (topic == null) {
>                     topic = new 
> OffsetForLeaderTopic().setTopic(topicPartition.topic());
>                     topics.add(topic);
>                 }
>                 topic.partitions().add(new OffsetForLeaderPartition()
>                         .setPartition(topicPartition.partition())
>                         .setLeaderEpoch(fetchEpoch)              // Uses 
> offsetEpoch for validation
>                         
> .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
>                                 
> .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
>                 );
>             })
>     );
>     return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
> } {code}
> The Validation Problem: The validation request uses fetchEpoch (which is 
> offsetEpoch) to validate against the broker. If this epoch no longer exists 
> in the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
> h3. 5. FetchPosition class definition
> Location: 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition 
>  
> {code:java}
> /**
>  * Represents the position of a partition subscription.
>  *
>  * This includes the offset and epoch from the last record in
>  * the batch from a FetchResponse. It also includes the leader epoch at the 
> time the batch was consumed.
>  */
> public static class FetchPosition {
>     public final long offset;
>     final Optional<Integer> offsetEpoch;        // Epoch from last consumed 
> record's batch
>     final Metadata.LeaderAndEpoch currentLeader; // Current partition leader 
> info from metadata
>  
>     FetchPosition(long offset) {
>         this(offset, Optional.empty(), 
> Metadata.LeaderAndEpoch.noLeaderOrEpoch());
>     }
>  
>     public FetchPosition(long offset, Optional<Integer> offsetEpoch, 
> Metadata.LeaderAndEpoch currentLeader) {
>         this.offset = offset;
>         this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
>         this.currentLeader = Objects.requireNonNull(currentLeader);
>     }
>  
>     @Override
>     public String toString() {
>         return "FetchPosition{" +
>                 "offset=" + offset +
>                 ", offsetEpoch=" + offsetEpoch +
>                 ", currentLeader=" + currentLeader +
>                 '}';
>     }
> }{code}
> Class Design: The class contains both offsetEpoch (historical data epoch) and 
> currentLeader.epoch (current metadata epoch), but allConsumed() only uses the 
> former when committing.
> h1. Attachements logs:
> h2. group rebalance log
> !image-2025-11-21-18-03-24-592.png!
> h2. consumer client log
> !image-2025-11-21-20-00-10-862.png!
> h2. broker server log
> !image-2025-11-21-18-11-24-316.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to