dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1423030220
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2361,25 +2361,27 @@ class KafkaApis(val requestChannel: RequestChannel, * request, so there could be multiple appends of markers to the log. The final response will be sent only * after all appends have returned. */ - def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { - trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus") - val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava) + def maybeSendResponseCallback(producerId: Long, result: TransactionResult, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = { Review Comment: This is where the largest changes are. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2361,25 +2361,27 @@ class KafkaApis(val requestChannel: RequestChannel, * request, so there could be multiple appends of markers to the log. The final response will be sent only * after all appends have returned. */ - def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { - trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus") - val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava) + def maybeSendResponseCallback(producerId: Long, result: TransactionResult, currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = { + trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors") updateErrors(producerId, currentErrors) - val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) => - topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE - }.keys - if (successfulOffsetsPartitions.nonEmpty) { - // as soon as the end transaction marker has been written for a transactional offset commit, - // call to the group coordinator to materialize the offsets into the cache - try { - groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result) - } catch { - case e: Exception => - error(s"Received an exception while trying to update the offsets cache on transaction marker append", e) - val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]() - successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR)) - updateErrors(producerId, updatedErrors) + if (!config.isNewGroupCoordinatorEnabled) { Review Comment: This is the logic used by the old coordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org