guozhangwang commented on code in PR #12667:
URL: https://github.com/apache/kafka/pull/12667#discussion_r975630163
##########
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##########
@@ -216,12 +216,12 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset:
Long): Unit = {
if (producerEpoch != updatedEntry.producerEpoch) {
- if (appendFirstSeq != 0) {
- if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
- throw new OutOfOrderSequenceException(s"Invalid sequence number for
new epoch of producer $producerId " +
- s"at offset $offset in partition $topicPartition: $producerEpoch
(request epoch), $appendFirstSeq (seq. number), " +
- s"${updatedEntry.producerEpoch} (current producer epoch)")
- }
+ if (appendFirstSeq != 0 && updatedEntry.producerEpoch !=
RecordBatch.NO_PRODUCER_EPOCH) {
Review Comment:
Consolidating nested if-s
##########
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##########
@@ -233,10 +233,16 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
// If there is no current producer epoch (possibly because all producer
records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence
number
- if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH ||
inSequence(currentLastSeq, appendFirstSeq))) {
+ if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH &&
!inSequence(currentLastSeq, appendFirstSeq)) {
Review Comment:
Unfolding the conditions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]