jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1057809723


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be 
overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the 
same as v5.
+    //   - If v1 and explicit retention time is provided we calculate 
expiration timestamp based on that

Review Comment:
   Hmm. I'm not sure we made this comment much clearer.
   
   I think the main flaws are that it says that we can only override retention 
time in v2 (matches the json spec) but the first two bullets mention "explicit 
retention time". I'm not really sure what that means.
   
   The second thing is enumerating the versions. I think it's just clearer to 
say that some versions have the option to explicitly set retention time. v5 and 
any version without it set ignores the expireTimestamp field.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to