dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1226632779


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {

Review Comment:
   My understanding is that we don't retry when `commitOffsetsAsync` is used. 
Is it correct? If it is, it may be better to split the test in two. It is 
really misleading otherwise.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -46,6 +49,7 @@
  *   - {@link Errors#INVALID_COMMIT_OFFSET_SIZE}
  *   - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
  *   - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ *   - {@link Errors#STALE_MEMBER_EPOCH}

Review Comment:
   Should we remove this one for now as it is not implemented yet?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> 
testRetryCommitWithUnknownTopicIdSetup() {

Review Comment:
   nit: It may be better to name this one `prepare....`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> 
testRetryCommitWithUnknownTopicIdSetup() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        return singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+    }
+
+    @Test
+    public void testRetryCommitAsyncUnknownTopicId() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
testRetryCommitWithUnknownTopicIdSetup();
+
+        AtomicBoolean callbackInvoked = new AtomicBoolean();
+        coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> {
+            // Unlike the commit offset sync API, the async API does not retry.
+            assertSame(inputOffsets, offsets);
+            assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+            assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+            callbackInvoked.set(true);
+        });
+
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+        assertTrue(callbackInvoked.get());
+    }
+
+    @Test
+    public void testRetryCommitSyncUnknownTopicId() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
testRetryCommitWithUnknownTopicIdSetup();
+
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata(Map<TopicIdPartition, Long> offsets) {
+        return offsets.entrySet().stream()
+            .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new 
OffsetAndMetadata(e.getValue(), "metadata")))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @Test
+    public void testTopicIdsArePopulatedByTheConsumerCoordinatorInV9() {
+        Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>();
+        byTopicIdOffsets.put(ti1p, 100L);
+        byTopicIdOffsets.put(ti2p, 200L);
+
+        TopicIdPartition unknownTopicIdPartition =
+            new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);

Review Comment:
   Is this used anywhere?



##########
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java:
##########
@@ -85,19 +96,186 @@ public void testParse() {
             ))
             .setThrottleTimeMs(throttleTimeMs);
 
-        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
-            ByteBuffer buffer = MessageUtil.toByteBuffer(data, version);
-            OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, 
version);
-            assertEquals(expectedErrorCounts, response.errorCounts());
+        ByteBuffer buffer = MessageUtil.toByteBuffer(data, version);
+        OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, 
version);
+        assertEquals(expectedErrorCounts, response.errorCounts());
 
-            if (version >= 3) {
-                assertEquals(throttleTimeMs, response.throttleTimeMs());
-            } else {
-                assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
-            }
+        if (version >= 3) {
+            assertEquals(throttleTimeMs, response.throttleTimeMs());
+        } else {
+            assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
+        }
+
+        assertEquals(version >= 4, response.shouldClientThrottle(version));
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testOffsetCommitResponseBuilder(short version) {
+        NameAndId topic3 = new NameAndId("topic3");
+        NameAndId topic4 = new NameAndId("topic4");
+        NameAndId topic5 = new NameAndId("topic5");
+        NameAndId topic6 = new NameAndId("topic6");
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(topicOne, topic1Id);
+        asList(topic3, topic4, topic5, topic6).forEach(nai -> 
topicIds.put(nai.name, nai.id));
+
+        OffsetCommitResponse.Builder builder = new 
OffsetCommitResponse.Builder()
+            .addPartition(topicOne, topic1Id, partitionOne, Errors.NONE)
+            .addPartition(topicOne, topic1Id, partitionTwo, Errors.NONE)
+            .addPartitions(topic6.name, topic6.id, asList(11, 12), identity(), 
Errors.NONE);
+
+        List<OffsetCommitResponseTopic> expectedTopics = new ArrayList<>();
+
+        if (version < 9) {
+            builder.addPartition(topicTwo, Uuid.ZERO_UUID, 3, Errors.NONE)
+                .addPartition(topicTwo, Uuid.ZERO_UUID, 4, Errors.NONE)
+                .addPartition(topic3.name, Uuid.ZERO_UUID, 5, Errors.NONE)
+                .addPartition(topic3.name, Uuid.ZERO_UUID, 6, Errors.NONE);
+
+            expectedTopics.addAll(asList(
+                createResponseTopic(topicOne, topic1Id, partitionOne, 
partitionTwo, Errors.NONE),
+                createResponseTopic(topic6.name, topic6.id, 11, 12, 
Errors.NONE),
+                createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, 4, 
Errors.NONE),
+                createResponseTopic(topic3.name, Uuid.ZERO_UUID, 5, 6, 
Errors.NONE)
+            ));
 
-            assertEquals(version >= 4, response.shouldClientThrottle(version));
+        } else {
+            builder.addPartition(topic4.name, topic4.id, 7, Errors.NONE)
+                .addPartition(topic4.name, topic4.id, 8, Errors.NONE)
+                .addPartition(topic5.name, topic5.id, 9, Errors.NONE)
+                .addPartition(topic5.name, topic5.id, 10, Errors.NONE)
+                .addPartition(topicTwo, Uuid.ZERO_UUID, 3, Errors.NONE);
+
+            expectedTopics.addAll(asList(
+                createResponseTopic(topicOne, topic1Id, partitionOne, 
partitionTwo, Errors.NONE),
+                createResponseTopic(topic6.name, topic6.id, 11, 12, 
Errors.NONE),
+                createResponseTopic(topic4.name, topic4.id, 7, 8, Errors.NONE),
+                createResponseTopic(topic5.name, topic5.id, 9, 10, 
Errors.NONE),
+                createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, Errors.NONE)
+            ));
         }
+
+        assertEquals(new OffsetCommitResponseData().setTopics(expectedTopics), 
builder.build().data());
+    }
+
+    @Test
+    public void testAddPartitionRequiresAValidTopicName() {
+        assertThrows(IllegalArgumentException.class,
+            () -> new OffsetCommitResponse.Builder()
+                .addPartition("", Uuid.randomUuid(), 0, Errors.NONE));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> new OffsetCommitResponse.Builder()
+                .addPartition(null, Uuid.randomUuid(), 0, Errors.NONE));
+    }
+
+    @Test
+    public void testMergeOffsetCommitRequestData() {
+        NameAndId topic3 = new NameAndId("topic3");
+        NameAndId topic4 = new NameAndId("topic4");
+        NameAndId topic5 = new NameAndId("topic5");
+        NameAndId topic6 = new NameAndId("topic6");
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(topicOne, topic1Id);
+        asList(topic3, topic4, topic5, topic6).forEach(nai -> 
topicIds.put(nai.name, nai.id));
+
+        OffsetCommitResponse.Builder builder = new 
OffsetCommitResponse.Builder()
+            .addPartition(topicOne, topic1Id, partitionOne, Errors.NONE)
+            .addPartition(topicOne, topic1Id, partitionTwo, Errors.NONE)
+            .addPartitions(topic6.name, topic6.id, asList(11, 12), identity(), 
Errors.NONE);
+
+        OffsetCommitResponseData coordinatorResults = new 
OffsetCommitResponseData()
+            .setTopics(Arrays.asList(
+                createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, 4, 
Errors.NONE),
+                createResponseTopic(topic3.name, topic3.id, 5, 6, Errors.NONE),
+                createResponseTopic(topic4.name, topic4.id, 7, 8, Errors.NONE),
+                createResponseTopic(topic5.name, topic5.id, 9, 10, Errors.NONE)
+            ));
+
+        List<OffsetCommitResponseTopic> expectedTopics = new ArrayList<>();
+        expectedTopics.addAll(asList(
+            createResponseTopic(topicOne, topic1Id, partitionOne, 
partitionTwo, Errors.NONE),
+            createResponseTopic(topic6.name, topic6.id, 11, 12, Errors.NONE),
+            createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, 4, Errors.NONE),
+            createResponseTopic(topic3.name, topic3.id, 5, 6, Errors.NONE),
+            createResponseTopic(topic4.name, topic4.id, 7, 8, Errors.NONE),
+            createResponseTopic(topic5.name, topic5.id, 9, 10, Errors.NONE)
+        ));
+
+        OffsetCommitResponse response = 
builder.merge(coordinatorResults).build();
+        assertEquals(new OffsetCommitResponseData().setTopics(expectedTopics), 
response.data());
+    }
+
+    private static OffsetCommitResponseTopic createResponseTopic(
+        String topicName,
+        Uuid topicId,
+        int partition,
+        Errors error
+    ) {
+        return new OffsetCommitResponseTopic()
+            .setTopicId(topicId)
+            .setName(topicName)
+            .setPartitions(new ArrayList<>(asList(
+                new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partition)
+                    .setErrorCode(error.code())
+            )));
+    }
+
+    private static OffsetCommitResponseTopic createResponseTopic(
+        String topicName,
+        Uuid topicId,
+        int firstPartition,
+        int secondPartition,
+        Errors error
+    ) {
+        OffsetCommitResponseTopic topic = createResponseTopic(topicName, 
topicId, firstPartition, error);
+        topic.partitions().add(new OffsetCommitResponsePartition()
+            .setPartitionIndex(secondPartition)
+            .setErrorCode(error.code()));
+
+        return topic;
     }
 
+    public static final class NameAndId {

Review Comment:
   It is a bit weird to have this class defined here but I cannot think of a 
better place for now. Thoughts?



##########
clients/src/main/resources/common/message/OffsetCommitResponse.json:
##########
@@ -28,15 +28,20 @@
   // Version 7 offsetCommitRequest supports a new field called groupInstanceId 
to indicate member identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 adds TopicId field and can return STALE_MEMBER_EPOCH, 
UNKNOWN_MEMBER_ID
+  // and UNKNOWN_TOPIC_ID errors (KIP-848).

Review Comment:
   Should we remove STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID for now?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> 
testRetryCommitWithUnknownTopicIdSetup() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        return singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+    }
+
+    @Test
+    public void testRetryCommitAsyncUnknownTopicId() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
testRetryCommitWithUnknownTopicIdSetup();
+
+        AtomicBoolean callbackInvoked = new AtomicBoolean();
+        coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> {
+            // Unlike the commit offset sync API, the async API does not retry.
+            assertSame(inputOffsets, offsets);
+            assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+            assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+            callbackInvoked.set(true);
+        });
+
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+        assertTrue(callbackInvoked.get());
+    }
+
+    @Test
+    public void testRetryCommitSyncUnknownTopicId() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
testRetryCommitWithUnknownTopicIdSetup();
+
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata(Map<TopicIdPartition, Long> offsets) {
+        return offsets.entrySet().stream()
+            .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new 
OffsetAndMetadata(e.getValue(), "metadata")))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @Test
+    public void testTopicIdsArePopulatedByTheConsumerCoordinatorInV9() {
+        Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>();
+        byTopicIdOffsets.put(ti1p, 100L);
+        byTopicIdOffsets.put(ti2p, 200L);
+
+        TopicIdPartition unknownTopicIdPartition =
+            new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);

Review Comment:
   Is this used anywhere?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                // Unlike the commit offset sync API, the async API does not 
retry.
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    static Stream<Arguments> commitOffsetTestArgs() {
+        Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>();
+        byTopicIdOffsets.put(ti1p, 100L);
+        byTopicIdOffsets.put(ti2p, 200L);
+
+        TopicIdPartition unknownTopicIdPartition =
+            new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);
+
+        OffsetCommitRequestData byTopicIdData = new OffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setGenerationId(OffsetCommitRequest.DEFAULT_GENERATION_ID)
+            .setTopics(Arrays.asList(
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti1p.topicId())
+                    .setName(topic1)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(t1p.partition())
+                        .setCommittedOffset(100L)
+                        .setCommittedMetadata("metadata"))),
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti2p.topicId())
+                    .setName(topic2)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(t2p.partition())
+                        .setCommittedOffset(200L)
+                        .setCommittedMetadata("metadata")))
+            ));
+
+        OffsetCommitRequestData byTopicNameData = byTopicIdData.duplicate();
+        byTopicNameData.topics().add(new OffsetCommitRequestTopic()
+            .setName(unknownTopicIdPartition.topic())
+            .setPartitions(singletonList(new OffsetCommitRequestPartition()
+                .setPartitionIndex(5)
+                .setCommittedOffset(300L)
+                .setCommittedMetadata("metadata")))
+        );
+
+        return Stream.of(
+            Arguments.of(true, byTopicIdOffsets, byTopicIdData, (short) 9),
+            Arguments.of(false, byTopicIdOffsets, byTopicIdData, (short) 9),
+            Arguments.of(true, byTopicNameOffsets, byTopicNameData, (short) 8),
+            Arguments.of(false, byTopicNameOffsets, byTopicNameData, (short) 8)
+        );
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata(Map<TopicIdPartition, Long> offsets) {
+        return offsets.entrySet().stream()
+            .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new 
OffsetAndMetadata(e.getValue(), "metadata")))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @ParameterizedTest
+    @MethodSource("commitOffsetTestArgs")
+    public void testTopicIdsArePopulatedByTheConsumerCoordinator(
+            boolean commitSync,
+            Map<TopicIdPartition, Long> offsets,
+            OffsetCommitRequestData expectedRequestData,
+            short expectedRequestVersion) {
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        OffsetCommitRequestCaptor captor = new OffsetCommitRequestCaptor();
+        prepareOffsetCommitRequest(offsets, Errors.NONE, false, captor);
+
+        Map<TopicPartition, OffsetAndMetadata> input = 
offsetAndMetadata(offsets);
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(input, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(input, (inputOffsets, exception) -> 
{
+                // Notes:
+                // 1) The offsets passed to the callback are the same object 
provided to the offset commit method.
+                //    The validation on the offsets is not required but 
defensive.
+                // 2) We validate that the commit was successful, which is the 
case if the exception is null.
+                // 3) We validate this callback was invoked, which is not 
necessary but defensive.
+                assertSame(inputOffsets, input);
+                assertNull(exception);
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+
+        // The consumer does not provide a guarantee on the order of 
occurrence of topics and partitions in the
+        // OffsetCommit request, since a map of offsets is provided to the 
consumer API. Here, both requests
+        // are asserted to be identical irrespective of the order in which 
topic and partitions appear in the requests.
+        assertRequestEquals(
+            new OffsetCommitRequest(expectedRequestData, 
expectedRequestVersion),
+            captor.request
+        );
+    }
+
+    @ParameterizedTest
+    @NullSource
+    @ValueSource(strings = { "", "test1" })
+    public void 
testInvalidTopicIdReturnedByBrokerWhenCommittingOffsetSync(String topicName) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "m"));
+
+        // The following offset commit response is valid and the authorization 
failure results in failing the
+        // offset commit invocation.
+        client.prepareResponse(offsetCommitResponse(topicName, ti1p.topicId(), 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertThrows(GroupAuthorizationException.class,
+            () -> coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        // The following offset commit response defines a topic incorrectly. 
The coordinator ignores the topic,
+        // and the group authorization failure is therefore not propagated.
+        client.prepareResponse(offsetCommitResponse(topicName, Uuid.ZERO_UUID, 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));

Review Comment:
   I think that the method should return false if any mismatched topic id. If I 
commit foo-topic-id and bar-topic-id, the method should not succeed if we don't 
get a response for any of them, right?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> 
testRetryCommitWithUnknownTopicIdSetup() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        return singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );

Review Comment:
   nit: I would inline this in the respective tests because it seems not 
related to what this method does.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> 
testRetryCommitWithUnknownTopicIdSetup() {

Review Comment:
   nit: It may be better to name this one `prepare....`.



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2052,9 +2052,10 @@ private OffsetCommitRequest 
createOffsetCommitRequest(short version) {
                                                 .setCommittedOffset(200)
                                                 
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
                                                 .setCommittedMetadata(null)
-                                ))
-                ))
-        ).build(version);
+                                )))
+            ),
+                true

Review Comment:
   nit: this seems to be misaligned.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2734,287 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
-        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(ti1p.topicPartition(),
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> 
testRetryCommitWithUnknownTopicIdSetup() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        return singletonMap(
+            ti1p.topicPartition(),
+            new OffsetAndMetadata(100L, "metadata")
+        );
+    }
+
+    @Test
+    public void testRetryCommitAsyncUnknownTopicId() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
testRetryCommitWithUnknownTopicIdSetup();
+
+        AtomicBoolean callbackInvoked = new AtomicBoolean();
+        coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> {
+            // Unlike the commit offset sync API, the async API does not retry.
+            assertSame(inputOffsets, offsets);
+            assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+            assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+            callbackInvoked.set(true);
+        });
+
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+        assertTrue(callbackInvoked.get());
+    }
+
+    @Test
+    public void testRetryCommitSyncUnknownTopicId() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
testRetryCommitWithUnknownTopicIdSetup();
+
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata(Map<TopicIdPartition, Long> offsets) {
+        return offsets.entrySet().stream()
+            .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new 
OffsetAndMetadata(e.getValue(), "metadata")))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @Test
+    public void testTopicIdsArePopulatedByTheConsumerCoordinatorInV9() {
+        Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>();
+        byTopicIdOffsets.put(ti1p, 100L);
+        byTopicIdOffsets.put(ti2p, 200L);
+
+        TopicIdPartition unknownTopicIdPartition =
+            new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);
+
+        OffsetCommitRequestData byTopicIdData = new OffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setGenerationId(OffsetCommitRequest.DEFAULT_GENERATION_ID)
+            .setTopics(Arrays.asList(
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti1p.topicId())
+                    .setName(topic1)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(ti1p.partition())
+                        .setCommittedOffset(100L)
+                        .setCommittedMetadata("metadata"))),
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti2p.topicId())
+                    .setName(topic2)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(ti2p.partition())
+                        .setCommittedOffset(200L)
+                        .setCommittedMetadata("metadata")))
+        ));
+
+        OffsetCommitRequestData byTopicNameData = byTopicIdData.duplicate();
+        byTopicNameData.topics().add(new OffsetCommitRequestTopic()
+                .setName(unknownTopicIdPartition.topic())
+                .setPartitions(singletonList(new OffsetCommitRequestPartition()
+                        .setPartitionIndex(5)
+                        .setCommittedOffset(300L)
+                        .setCommittedMetadata("metadata")))
+        );
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        MockClient.RequestMatcher requestMatcher = request -> 
assertRequestEquals(
+            new OffsetCommitRequest(byTopicIdData, (short) 9),
+            (OffsetCommitRequest) request
+        );
+
+        prepareOffsetCommitRequest(byTopicIdOffsets, Errors.NONE, false, 
requestMatcher);
+
+        Map<TopicPartition, OffsetAndMetadata> input = 
offsetAndMetadata(byTopicIdOffsets);
+        assertTrue(coordinator.commitOffsetsSync(input, 
time.timer(Long.MAX_VALUE)));
+    }
+
+    @Test
+    public void testUseOffsetCommitRequestV8IfATopicIdIsMissing() {
+        TopicIdPartition unknownTopicIdPartition =
+                new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);
+
+        OffsetCommitRequestData byTopicNameData = new OffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setGenerationId(OffsetCommitRequest.DEFAULT_GENERATION_ID)
+            .setTopics(Arrays.asList(
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti1p.topicId())
+                    .setName(topic1)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(ti1p.partition())
+                        .setCommittedOffset(100L)
+                        .setCommittedMetadata("metadata"))),
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti2p.topicId())
+                    .setName(topic2)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(ti2p.partition())
+                        .setCommittedOffset(200L)
+                        .setCommittedMetadata("metadata"))),
+                new OffsetCommitRequestTopic()
+                    .setName(unknownTopicIdPartition.topic())
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(5)
+                        .setCommittedOffset(300L)
+                        .setCommittedMetadata("metadata")))
+            ));
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        MockClient.RequestMatcher requestMatcher = request -> 
assertRequestEquals(
+            new OffsetCommitRequest(byTopicNameData, (short) 8),
+            (OffsetCommitRequest) request
+        );
+
+        prepareOffsetCommitRequest(byTopicNameOffsets, Errors.NONE, false, 
requestMatcher);
+
+        Map<TopicPartition, OffsetAndMetadata> input = 
offsetAndMetadata(byTopicNameOffsets);
+        assertTrue(coordinator.commitOffsetsSync(input, 
time.timer(Long.MAX_VALUE)));
+    }
+
+    @ParameterizedTest
+    @NullSource
+    @ValueSource(strings = { "", "test1" })
+    public void 
testInvalidTopicIdReturnedByBrokerWhenCommittingOffsetSync(String topicName) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(ti1p.topicPartition(), new OffsetAndMetadata(100L, "m"));
+
+        // The following offset commit response is valid and the authorization 
failure results in failing the
+        // offset commit invocation.
+        client.prepareResponse(offsetCommitResponse(topicName, ti1p.topicId(), 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertThrows(GroupAuthorizationException.class,
+            () -> coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        // The following offset commit response defines a topic incorrectly. 
The coordinator ignores the topic,
+        // and the group authorization failure is therefore not propagated.
+        client.prepareResponse(offsetCommitResponse(topicName, Uuid.ZERO_UUID, 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        client.prepareResponse(offsetCommitResponse(topicName, 
Uuid.randomUuid(), Errors.GROUP_AUTHORIZATION_FAILED));
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+    }
+
+    @ParameterizedTest
+    @NullSource
+    @ValueSource(strings = { "", "test1" })
+    public void 
testInvalidTopicIdReturnedByBrokerWhenCommittingOffsetAsync(String topicName) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(ti1p.topicPartition(), new OffsetAndMetadata(100L, 
"metadata1"));
+        offsets.put(ti2p.topicPartition(), new OffsetAndMetadata(200L, 
"metadata2"));
+
+        // Response data which makes the common part of the responses 
exercised in the use cases below.
+        OffsetCommitResponseData commonData = new OffsetCommitResponseData()
+            .setTopics(Arrays.asList(
+                new OffsetCommitResponseTopic()
+                    .setName(null)
+                    .setTopicId(ti2p.topicId())
+                    .setPartitions(singletonList(new 
OffsetCommitResponsePartition().setPartitionIndex(0)))
+            ));
+
+        BiConsumer<OffsetCommitResponse, Class<? extends Exception>> asserter 
= (response, exceptionType) -> {
+            OffsetCommitResponseTopic topic = response.data().topics().get(0);
+            OffsetCommitResponseData data = commonData.duplicate();
+            data.topics().add(topic);
+            client.prepareResponse(new OffsetCommitResponse(data));
+
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (__, exception) -> {
+                if (exceptionType == null)
+                    assertNull(exception);
+                else
+                    assertEquals(exceptionType, exception.getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        };
+
+        // The following offset commit responses are valid and the 
authorization failure results in failing the
+        // offset commit invocation.
+        asserter.accept(
+            offsetCommitResponse(topicName, ti1p.topicId(), 
Errors.GROUP_AUTHORIZATION_FAILED),
+            GroupAuthorizationException.class);
+
+        // The following offset commit responses defines a topic incorrectly. 
The coordinator ignores the topic,
+        // and the group authorization failure is therefore not propagated.
+        asserter.accept(
+            offsetCommitResponse(topicName, Uuid.ZERO_UUID, 
Errors.GROUP_AUTHORIZATION_FAILED),
+            null);

Review Comment:
   This case is not correct as well in my opinion. The caller should get an 
exception in this case.



##########
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java:
##########
@@ -85,19 +96,186 @@ public void testParse() {
             ))
             .setThrottleTimeMs(throttleTimeMs);
 
-        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
-            ByteBuffer buffer = MessageUtil.toByteBuffer(data, version);
-            OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, 
version);
-            assertEquals(expectedErrorCounts, response.errorCounts());
+        ByteBuffer buffer = MessageUtil.toByteBuffer(data, version);
+        OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, 
version);
+        assertEquals(expectedErrorCounts, response.errorCounts());
 
-            if (version >= 3) {
-                assertEquals(throttleTimeMs, response.throttleTimeMs());
-            } else {
-                assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
-            }
+        if (version >= 3) {
+            assertEquals(throttleTimeMs, response.throttleTimeMs());
+        } else {
+            assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
+        }
+
+        assertEquals(version >= 4, response.shouldClientThrottle(version));
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testOffsetCommitResponseBuilder(short version) {
+        NameAndId topic3 = new NameAndId("topic3");
+        NameAndId topic4 = new NameAndId("topic4");
+        NameAndId topic5 = new NameAndId("topic5");
+        NameAndId topic6 = new NameAndId("topic6");
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(topicOne, topic1Id);
+        asList(topic3, topic4, topic5, topic6).forEach(nai -> 
topicIds.put(nai.name, nai.id));
+
+        OffsetCommitResponse.Builder builder = new 
OffsetCommitResponse.Builder()
+            .addPartition(topicOne, topic1Id, partitionOne, Errors.NONE)
+            .addPartition(topicOne, topic1Id, partitionTwo, Errors.NONE)
+            .addPartitions(topic6.name, topic6.id, asList(11, 12), identity(), 
Errors.NONE);
+
+        List<OffsetCommitResponseTopic> expectedTopics = new ArrayList<>();
+
+        if (version < 9) {
+            builder.addPartition(topicTwo, Uuid.ZERO_UUID, 3, Errors.NONE)
+                .addPartition(topicTwo, Uuid.ZERO_UUID, 4, Errors.NONE)
+                .addPartition(topic3.name, Uuid.ZERO_UUID, 5, Errors.NONE)
+                .addPartition(topic3.name, Uuid.ZERO_UUID, 6, Errors.NONE);
+
+            expectedTopics.addAll(asList(
+                createResponseTopic(topicOne, topic1Id, partitionOne, 
partitionTwo, Errors.NONE),
+                createResponseTopic(topic6.name, topic6.id, 11, 12, 
Errors.NONE),
+                createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, 4, 
Errors.NONE),
+                createResponseTopic(topic3.name, Uuid.ZERO_UUID, 5, 6, 
Errors.NONE)
+            ));
 
-            assertEquals(version >= 4, response.shouldClientThrottle(version));
+        } else {
+            builder.addPartition(topic4.name, topic4.id, 7, Errors.NONE)
+                .addPartition(topic4.name, topic4.id, 8, Errors.NONE)
+                .addPartition(topic5.name, topic5.id, 9, Errors.NONE)
+                .addPartition(topic5.name, topic5.id, 10, Errors.NONE)
+                .addPartition(topicTwo, Uuid.ZERO_UUID, 3, Errors.NONE);
+
+            expectedTopics.addAll(asList(
+                createResponseTopic(topicOne, topic1Id, partitionOne, 
partitionTwo, Errors.NONE),
+                createResponseTopic(topic6.name, topic6.id, 11, 12, 
Errors.NONE),
+                createResponseTopic(topic4.name, topic4.id, 7, 8, Errors.NONE),
+                createResponseTopic(topic5.name, topic5.id, 9, 10, 
Errors.NONE),
+                createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, Errors.NONE)
+            ));
         }
+
+        assertEquals(new OffsetCommitResponseData().setTopics(expectedTopics), 
builder.build().data());
+    }
+
+    @Test
+    public void testAddPartitionRequiresAValidTopicName() {
+        assertThrows(IllegalArgumentException.class,
+            () -> new OffsetCommitResponse.Builder()
+                .addPartition("", Uuid.randomUuid(), 0, Errors.NONE));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> new OffsetCommitResponse.Builder()
+                .addPartition(null, Uuid.randomUuid(), 0, Errors.NONE));
+    }
+
+    @Test
+    public void testMergeOffsetCommitRequestData() {
+        NameAndId topic3 = new NameAndId("topic3");
+        NameAndId topic4 = new NameAndId("topic4");
+        NameAndId topic5 = new NameAndId("topic5");
+        NameAndId topic6 = new NameAndId("topic6");
+
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put(topicOne, topic1Id);
+        asList(topic3, topic4, topic5, topic6).forEach(nai -> 
topicIds.put(nai.name, nai.id));
+
+        OffsetCommitResponse.Builder builder = new 
OffsetCommitResponse.Builder()
+            .addPartition(topicOne, topic1Id, partitionOne, Errors.NONE)
+            .addPartition(topicOne, topic1Id, partitionTwo, Errors.NONE)
+            .addPartitions(topic6.name, topic6.id, asList(11, 12), identity(), 
Errors.NONE);
+
+        OffsetCommitResponseData coordinatorResults = new 
OffsetCommitResponseData()
+            .setTopics(Arrays.asList(
+                createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, 4, 
Errors.NONE),
+                createResponseTopic(topic3.name, topic3.id, 5, 6, Errors.NONE),
+                createResponseTopic(topic4.name, topic4.id, 7, 8, Errors.NONE),
+                createResponseTopic(topic5.name, topic5.id, 9, 10, Errors.NONE)
+            ));
+
+        List<OffsetCommitResponseTopic> expectedTopics = new ArrayList<>();
+        expectedTopics.addAll(asList(
+            createResponseTopic(topicOne, topic1Id, partitionOne, 
partitionTwo, Errors.NONE),
+            createResponseTopic(topic6.name, topic6.id, 11, 12, Errors.NONE),
+            createResponseTopic(topicTwo, Uuid.ZERO_UUID, 3, 4, Errors.NONE),
+            createResponseTopic(topic3.name, topic3.id, 5, 6, Errors.NONE),
+            createResponseTopic(topic4.name, topic4.id, 7, 8, Errors.NONE),
+            createResponseTopic(topic5.name, topic5.id, 9, 10, Errors.NONE)
+        ));
+
+        OffsetCommitResponse response = 
builder.merge(coordinatorResults).build();
+        assertEquals(new OffsetCommitResponseData().setTopics(expectedTopics), 
response.data());
+    }
+
+    private static OffsetCommitResponseTopic createResponseTopic(
+        String topicName,
+        Uuid topicId,
+        int partition,
+        Errors error
+    ) {
+        return new OffsetCommitResponseTopic()
+            .setTopicId(topicId)
+            .setName(topicName)
+            .setPartitions(new ArrayList<>(asList(
+                new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partition)
+                    .setErrorCode(error.code())
+            )));
+    }
+
+    private static OffsetCommitResponseTopic createResponseTopic(
+        String topicName,
+        Uuid topicId,
+        int firstPartition,
+        int secondPartition,
+        Errors error
+    ) {
+        OffsetCommitResponseTopic topic = createResponseTopic(topicName, 
topicId, firstPartition, error);
+        topic.partitions().add(new OffsetCommitResponsePartition()
+            .setPartitionIndex(secondPartition)
+            .setErrorCode(error.code()));
+
+        return topic;
     }
 
+    public static final class NameAndId {

Review Comment:
   nit: TopicNameAndId?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to