dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1423030220


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2361,25 +2361,27 @@ class KafkaApis(val requestChannel: RequestChannel,
       * request, so there could be multiple appends of markers to the log. The 
final response will be sent only
       * after all appends have returned.
       */
-    def maybeSendResponseCallback(producerId: Long, result: 
TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
-      trace(s"End transaction marker append for producer id $producerId 
completed with status: $responseStatus")
-      val currentErrors = new ConcurrentHashMap[TopicPartition, 
Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava)
+    def maybeSendResponseCallback(producerId: Long, result: TransactionResult, 
currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {

Review Comment:
   This is where the largest changes are.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2361,25 +2361,27 @@ class KafkaApis(val requestChannel: RequestChannel,
       * request, so there could be multiple appends of markers to the log. The 
final response will be sent only
       * after all appends have returned.
       */
-    def maybeSendResponseCallback(producerId: Long, result: 
TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
-      trace(s"End transaction marker append for producer id $producerId 
completed with status: $responseStatus")
-      val currentErrors = new ConcurrentHashMap[TopicPartition, 
Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava)
+    def maybeSendResponseCallback(producerId: Long, result: TransactionResult, 
currentErrors: ConcurrentHashMap[TopicPartition, Errors]): Unit = {
+      trace(s"End transaction marker append for producer id $producerId 
completed with status: $currentErrors")
       updateErrors(producerId, currentErrors)
-      val successfulOffsetsPartitions = responseStatus.filter { case 
(topicPartition, partitionResponse) =>
-        topicPartition.topic == GROUP_METADATA_TOPIC_NAME && 
partitionResponse.error == Errors.NONE
-      }.keys
 
-      if (successfulOffsetsPartitions.nonEmpty) {
-        // as soon as the end transaction marker has been written for a 
transactional offset commit,
-        // call to the group coordinator to materialize the offsets into the 
cache
-        try {
-          groupCoordinator.onTransactionCompleted(producerId, 
successfulOffsetsPartitions.asJava, result)
-        } catch {
-          case e: Exception =>
-            error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", e)
-            val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
-            successfulOffsetsPartitions.foreach(updatedErrors.put(_, 
Errors.UNKNOWN_SERVER_ERROR))
-            updateErrors(producerId, updatedErrors)
+      if (!config.isNewGroupCoordinatorEnabled) {

Review Comment:
   This is the logic used by the old coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to