jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be 
overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the 
same as v5.
+    //   - If v1 and explicit retention time is provided we calculate 
expiration timestamp based on that

Review Comment:
   This comment is a little confusing. 
   So it seems like I understand v1 semantics -- we use the commit timestamp if 
provided for "now". 
   
   For v2 and beyond, I'm a big confused about the last two bullets. It makes 
it seem like there is no difference between v2-v4 and v5+, but I think the 
difference is that the retention can no longer be overridden in v5+. That part 
is unclear in the last bullet as it says "partition expiration" but "partition 
retention" is the field name.
   
   This is my understanding based on the code
   
   ```
   version:  (can define commit time aka "now"), (can define retention time)
      1                 yes                                no
      2                 no                                 yes
      3                 no                                 yes
      4                 no                                 yes
     5+                 no                                 no
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to