lianetm commented on code in PR #19461:
URL: https://github.com/apache/kafka/pull/19461#discussion_r2045463288
##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -45,20 +45,24 @@ public static class Builder extends
AbstractRequest.Builder<OffsetCommitRequest>
private final OffsetCommitRequestData data;
- public Builder(OffsetCommitRequestData data, boolean
enableUnstableLastVersion) {
- super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion);
+ private Builder(OffsetCommitRequestData data, short
oldestAllowedVersion, short latestAllowedVersion) {
+ super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion,
latestAllowedVersion);
this.data = data;
}
- public Builder(OffsetCommitRequestData data) {
- this(data, false);
+ public static Builder forTopicIdsAndNames(OffsetCommitRequestData
data, boolean enableUnstableLastVersion) {
Review Comment:
nit: should this be `forTopicIdsOrNames`?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -271,40 +271,62 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val offsetCommitRequest = request.body[OffsetCommitRequest]
- // Reject the request if not authorized to the group
+ // Reject the request if not authorized to the group.
if (!authHelper.authorize(request.context, READ, GROUP,
offsetCommitRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request,
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
+ val useTopicIds =
OffsetCommitResponse.useTopicIds(request.header.apiVersion)
+
+ if (useTopicIds) {
+ offsetCommitRequest.data.topics.forEach { topic =>
+ if (topic.topicId != Uuid.ZERO_UUID) {
+ metadataCache.getTopicName(topic.topicId).ifPresent(name =>
topic.setName(name))
+ }
+ }
+ }
+
val authorizedTopics = authHelper.filterByAuthorized(
request.context,
READ,
TOPIC,
offsetCommitRequest.data.topics.asScala
)(_.name)
- val responseBuilder = new OffsetCommitResponse.Builder()
+ val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds)
val authorizedTopicsRequest = new
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
offsetCommitRequest.data.topics.forEach { topic =>
- if (!authorizedTopics.contains(topic.name)) {
+ if (useTopicIds && topic.name.isEmpty) {
+ // If the topic name is undefined, it means that the topic id is
unknown so we add
+ // the topic and all its partitions to the response with
UNKNOWN_TOPIC_ID.
+
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+ topic.topicId, topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_ID)
+ } else if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its
partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
- topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ topic.topicId, topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
Review Comment:
if we're on version >=10 and include topic.name here, wouldn't we end up
leaking a topic name in the reponse for which the user is not authorized? (and
that wasn't included in the request).
##########
clients/src/main/resources/common/message/OffsetCommitResponse.json:
##########
@@ -34,7 +34,9 @@
// Version 9 is the first version that can be used with the new consumer
group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new
consumer group protocol is used and
// GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
- "validVersions": "2-9",
+ //
+ // Version 10 adds support for topic ids (KIP-848).
Review Comment:
ditto
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1308,6 +1308,75 @@ public void testConsumerGroupOffsetCommit() {
);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithTopicIds() {
+ Uuid topicId = Uuid.randomUuid();
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ // Add member.
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()
+ );
+
+ CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result
= context.commitOffset(
+ new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(10)
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setTopicId(topicId)
+ .setName("bar")
Review Comment:
Just to check, in practice I would expect that having both is something that
will never happen with the current approach (the request will either contain
name if v<10, or topic Id if v>=10). Is my understanding correct?
##########
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##########
@@ -36,8 +36,11 @@
//
// Version 9 is the first version that can be used with the new consumer
group protocol (KIP-848). The
// request is the same as version 8.
- "validVersions": "2-9",
+ //
+ // Version 10 adds support for topic ids (KIP-848).
Review Comment:
should we clarify that it also removes support for topic names?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]