jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1214569080
########## core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala: ########## @@ -131,26 +132,22 @@ class PartitionWriterImpl[T]( s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } - val appendResults = replicaManager.appendToLocalLog( + var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty + replicaManager.appendRecords( + timeout = 0L, + requiredAcks = 1, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> recordsBuilder.build()), - requiredAcks = 1, - requestLocal = RequestLocal.NoCaching + responseCallback = results => appendResults = results, + actionQueueAdd = item => item() // Immediately complete the action queue item. ) val partitionResult = appendResults.getOrElse(tp, - throw new IllegalStateException("Append status %s should only have one partition %s" - .format(appendResults, tp))) - - // Complete delayed operations. - replicaManager.maybeCompletePurgatories( - tp, - partitionResult.info.leaderHwChange - ) + throw new IllegalStateException(s"Append status $appendResults should only have one partition $tp")) Review Comment: nit: this error message confused me because I thought that it was saying there was more than one tp and that's why it failed. I think maybe the message should say something like s"Append status $appendResults should have partition $tp" since the fact that it doesn't would be the reason to throw the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org