abbccdda commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r468735528
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4068,6 +4093,58 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsPartialResponse() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + List<Node> nodes = Arrays.asList(node0, node1); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.<String>emptySet(), + Collections.<String>emptySet(), + node0); + + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + ListOffsetTopicResponse t0 = prepareListOffsetTopicResponse(tp0, Errors.NONE, -2L, 123L, 456); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); + env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(data), node0); + + Map<TopicPartition, OffsetSpec> partitions = new HashMap<>(); + partitions.put(tp0, OffsetSpec.latest()); + partitions.put(tp1, OffsetSpec.latest()); + ListOffsetsResult result = env.adminClient().listOffsets(partitions); + assertNotNull(result.partitionResult(tp0).get()); + TestUtils.assertFutureThrows(result.partitionResult(tp1), ApiException.class); + TestUtils.assertFutureThrows(result.all(), ApiException.class); + } + } + + private static ListOffsetTopicResponse prepareListOffsetTopicResponse(TopicPartition tp, Errors error, long timestamp, long offset, int epoch) { Review comment: This could move to `ListOffsetResponse` as a helper, and maybe name as `singletonListOffsetTopicResponse` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3973,25 +3977,38 @@ void handleResponse(AbstractResponse abstractResponse) { ListOffsetResponse response = (ListOffsetResponse) abstractResponse; Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>(); - for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) { - TopicPartition tp = result.getKey(); - PartitionData partitionData = result.getValue(); - - KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); - Errors error = partitionData.error; - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + for (ListOffsetTopicResponse topic : response.topics()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { Review comment: Why do we convert the fatal scenario towards a warning? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -307,32 +169,16 @@ public static ListOffsetRequest parse(ByteBuffer buffer, short version) { @Override protected Struct toStruct() { - short version = version(); - Struct struct = new Struct(ApiKeys.LIST_OFFSETS.requestSchema(version)); - Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupPartitionDataByTopic(partitionTimestamps); - - struct.set(REPLICA_ID, replicaId); - struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id()); + return data.toStruct(version()); + } - List<Struct> topicArray = new ArrayList<>(); - for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { - Struct topicData = struct.instance(TOPICS); - topicData.set(TOPIC_NAME, topicEntry.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) { - PartitionData offsetPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS); - partitionData.set(PARTITION_ID, partitionEntry.getKey()); - partitionData.set(TIMESTAMP, offsetPartitionData.timestamp); - partitionData.setIfExists(MAX_NUM_OFFSETS, offsetPartitionData.maxNumOffsets); - RequestUtils.setLeaderEpochIfExists(partitionData, CURRENT_LEADER_EPOCH, - offsetPartitionData.currentLeaderEpoch); - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS, partitionArray.toArray()); - topicArray.add(topicData); + public static List<ListOffsetTopic> toListOffsetTopics(Map<TopicPartition, ListOffsetPartition> timestampsToSearch) { Review comment: Could we add a unit test for this function? ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -2149,20 +2163,29 @@ private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> partitionOffsets, Map<TopicPartition, Errors> partitionErrors) { - Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); + Map<String, ListOffsetTopicResponse> responses = new HashMap<>(); for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) { - partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(Errors.NONE, - ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionOffset.getValue(), - Optional.empty())); + TopicPartition tp = partitionOffset.getKey(); + ListOffsetTopicResponse topic = responses.computeIfAbsent(tp.topic(), k -> new ListOffsetTopicResponse().setName(tp.topic())); + topic.partitions().add(new ListOffsetPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(partitionOffset.getValue())); } for (Map.Entry<TopicPartition, Errors> partitionError : partitionErrors.entrySet()) { - partitionData.put(partitionError.getKey(), new ListOffsetResponse.PartitionData( - partitionError.getValue(), ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())); + TopicPartition tp = partitionError.getKey(); + ListOffsetTopicResponse topic = responses.computeIfAbsent(tp.topic(), k -> new ListOffsetTopicResponse().setName(tp.topic())); + topic.partitions().add(new ListOffsetPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(partitionError.getValue().code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)); } - - return new ListOffsetResponse(partitionData); + ListOffsetResponseData data = new ListOffsetResponseData() + .setTopics(new ArrayList<ListOffsetTopicResponse>(responses.values())); Review comment: `new ArrayList<>` is suffice ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3546,12 +3594,54 @@ private void testGetOffsetsForTimesWithUnknownOffset() { MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); client.updateMetadata(initialMetadataUpdate); - Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); - partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE, - ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET))))); + + client.prepareResponseFrom(new ListOffsetResponse(data), + metadata.fetch().leaderFor(tp0)); + + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, 0L); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = + fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); - client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), + assertTrue(offsetAndTimestampMap.containsKey(tp0)); + assertNull(offsetAndTimestampMap.get(tp0)); + } + + @Test + public void testGetOffsetsForTimesWithUnknownOffsetV0() { Review comment: Is this expected before the change in this PR? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -910,136 +913,162 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderOrFollowerException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava) + ) - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderOrFollowerException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + def buildErrorResponse(e: Errors, partition: ListOffsetPartition): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + buildErrorResponse(Errors.INVALID_REQUEST, partition) + } else { + Review comment: nit: line seems not necessary. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3732,41 +3734,43 @@ public void testListOffsets() throws Exception { Collections.<String>emptySet(), node0); - final TopicPartition tp1 = new TopicPartition("foo", 0); - final TopicPartition tp2 = new TopicPartition("bar", 0); - final TopicPartition tp3 = new TopicPartition("baz", 0); + final TopicPartition tp0 = new TopicPartition("foo", 0); Review comment: I think we could reduce the change of this PR by reverting the numbering change which seems unnecessary. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -584,14 +590,22 @@ public void testFetchProgressWithMissingPartitionPosition() { consumer.seekToEnd(singleton(tp0)); consumer.seekToBeginning(singleton(tp1)); - client.prepareResponse( - body -> { - ListOffsetRequest request = (ListOffsetRequest) body; - Map<TopicPartition, ListOffsetRequest.PartitionData> timestamps = request.partitionTimestamps(); - return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP && - timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP; - }, listOffsetsResponse(Collections.singletonMap(tp0, 50L), - Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER))); + client.prepareResponse(body -> { + ListOffsetRequest request = (ListOffsetRequest) body; + List<ListOffsetPartition> partitions = request.topics().stream().flatMap(topic -> { + if (topic.name().equals(tp0.topic())) Review comment: This looks weird, as we are only filtering tp0 partitions, why we could check tp0 and tp1 later? ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1221,28 +1226,39 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { private ListOffsetRequest createListOffsetRequest(int version) { if (version == 0) { - Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap( - new TopicPartition("test", 0), - new ListOffsetRequest.PartitionData(1000000L, 10)); + ListOffsetTopic topic = new ListOffsetTopic() Review comment: Well, the purpose is more about encapsulation to reduce the import paths in this test class. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3546,12 +3594,54 @@ private void testGetOffsetsForTimesWithUnknownOffset() { MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); client.updateMetadata(initialMetadataUpdate); - Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); - partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE, - ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET))))); + + client.prepareResponseFrom(new ListOffsetResponse(data), + metadata.fetch().leaderFor(tp0)); + + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, 0L); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = + fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); - client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), + assertTrue(offsetAndTimestampMap.containsKey(tp0)); + assertNull(offsetAndTimestampMap.get(tp0)); + } + + @Test + public void testGetOffsetsForTimesWithUnknownOffsetV0() { + buildFetcher(); + // Empty map + assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty()); + // Unknown Offset + client.reset(); + // Ensure metadata has both partition. Review comment: both partitions ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.INVALID_REQUEST.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } else { + + def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + try { + val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID + val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(offsetRequest.isolationLevel) + else + None + + val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), Review comment: @mimaison WDYT? I'm neutral. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +public class ListOffsetRequestTest { Review comment: Probably ok to skip. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1251,15 +1267,28 @@ private ListOffsetRequest createListOffsetRequest(int version) { private ListOffsetResponse createListOffsetResponse(int version) { if (version == 0) { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), - new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L))); - return new ListOffsetResponse(responseData); + ListOffsetResponseData data = new ListOffsetResponseData() Review comment: As my previous comment suggests, for the sake of encapsulation and reusability. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -910,136 +913,162 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderOrFollowerException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava) + ) - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages Review comment: special cases? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org