feyman2016 commented on a change in pull request #9739:
URL: https://github.com/apache/kafka/pull/9739#discussion_r561587550
##########
File path:
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -180,7 +179,7 @@ private void startNewBatch() {
nextOffset,
time.milliseconds(),
false,
- RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ epoch,
Review comment:
Let me check
##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1057,7 +1057,8 @@ class Log(@volatile private var _dir: File,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.Client,
interBrokerProtocolVersion: ApiVersion =
ApiVersion.latestVersion): LogAppendInfo = {
- append(records, origin, interBrokerProtocolVersion, assignOffsets = true,
leaderEpoch, ignoreRecordSize = false)
+ val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
+ append(records, origin, interBrokerProtocolVersion,
validateAndAssignOffsets, leaderEpoch, ignoreRecordSize = false)
Review comment:
Make sense to me, it is also a little bit odd to me, but I put it here
because I think `assignOffsets`==true for `appendAsLeader` and ==false for
`appendAsFollower`, which means normally `assignOffsets` is determined by the
caller, the `RaftLeader` is just a special case for `appendAsLeader`, if we
move the logic in `analyzeAndValidateRecords`, that means it need to determine
whether to `assignOffsets` without caller info, does that doable?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]