dajac commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r449603684
########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion, int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; + data = new ListOffsetRequestData() + .setIsolationLevel(isolationLevel.id()) + .setReplicaId(replicaId); } - public Builder setTargetTimes(Map<TopicPartition, PartitionData> partitionTimestamps) { - this.partitionTimestamps = partitionTimestamps; + public Builder setTargetTimes(List<ListOffsetTopic> topics) { + data.setTopics(topics); return this; } @Override public ListOffsetRequest build(short version) { - return new ListOffsetRequest(replicaId, partitionTimestamps, isolationLevel, version); + return new ListOffsetRequest(version, data); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ListOffsetRequest") - .append(", replicaId=").append(replicaId); - if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(partitionTimestamps); - } - bld.append(", isolationLevel=").append(isolationLevel); - bld.append(")"); - return bld.toString(); - } - } - - public static final class PartitionData { - public final long timestamp; - public final int maxNumOffsets; // only supported in v0 - public final Optional<Integer> currentLeaderEpoch; - - private PartitionData(long timestamp, int maxNumOffsets, Optional<Integer> currentLeaderEpoch) { - this.timestamp = timestamp; - this.maxNumOffsets = maxNumOffsets; - this.currentLeaderEpoch = currentLeaderEpoch; - } - - // For V0 - public PartitionData(long timestamp, int maxNumOffsets) { - this(timestamp, maxNumOffsets, Optional.empty()); - } - - public PartitionData(long timestamp, Optional<Integer> currentLeaderEpoch) { - this(timestamp, 1, currentLeaderEpoch); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PartitionData)) return false; - PartitionData other = (PartitionData) obj; - return this.timestamp == other.timestamp && - this.currentLeaderEpoch.equals(other.currentLeaderEpoch); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, currentLeaderEpoch); - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("{timestamp: ").append(timestamp). - append(", maxNumOffsets: ").append(maxNumOffsets). - append(", currentLeaderEpoch: ").append(currentLeaderEpoch). - append("}"); - return bld.toString(); + return data.toString(); } } /** * Private constructor with a specified version. */ - private ListOffsetRequest(int replicaId, - Map<TopicPartition, PartitionData> targetTimes, - IsolationLevel isolationLevel, - short version) { + private ListOffsetRequest(short version, ListOffsetRequestData data) { super(ApiKeys.LIST_OFFSETS, version); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; - this.partitionTimestamps = targetTimes; + this.data = data; this.duplicatePartitions = Collections.emptySet(); } public ListOffsetRequest(Struct struct, short version) { super(ApiKeys.LIST_OFFSETS, version); - Set<TopicPartition> duplicatePartitions = new HashSet<>(); - replicaId = struct.get(REPLICA_ID); - isolationLevel = struct.hasField(ISOLATION_LEVEL) ? - IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) : - IsolationLevel.READ_UNCOMMITTED; - partitionTimestamps = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - long timestamp = partitionResponse.get(TIMESTAMP); - TopicPartition tp = new TopicPartition(topic, partition); - - int maxNumOffsets = partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1); - Optional<Integer> currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH); - PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets, currentLeaderEpoch); - if (partitionTimestamps.put(tp, partitionData) != null) + data = new ListOffsetRequestData(struct, version); + duplicatePartitions = new HashSet<>(); + Set<TopicPartition> partitions = new HashSet<>(); + for (ListOffsetTopic topic : data.topics()) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + if (!partitions.add(tp)) { duplicatePartitions.add(tp); + } } } - this.duplicatePartitions = duplicatePartitions; } @Override - @SuppressWarnings("deprecation") public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Review comment: It would be great if we could add unit tests for this method and perhaps others as well. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3883,21 +3886,24 @@ void handleResponse(AbstractResponse abstractResponse) { ListOffsetResponse response = (ListOffsetResponse) abstractResponse; Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>(); - for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) { - TopicPartition tp = result.getKey(); - PartitionData partitionData = result.getValue(); - - KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); - Errors error = partitionData.error; - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + for (ListOffsetTopicResponse topic : response.responseData()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); + } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { + retryTopicPartitionOffsets.put(tp, offsetRequestSpec); + } else if (error == Errors.NONE) { + Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(partition.leaderEpoch()); + future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); + } } } Review comment: I just noticed that we don't ensure that all futures of the current broker are completed. It would be great to ensure it by using `completeUnrealizedFutures` method if `retryTopicPartitionOffsets` is empty. We already do this in `alterReplicaLogDirs()` if you want to see an example. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> f * value of each partition may be null only for v0. In v1 and later the ListOffset API would not * return a null timestamp (-1 is returned instead when necessary). */ - private void handleListOffsetResponse(Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch, + private void handleListOffsetResponse(Map<TopicPartition, ListOffsetPartition> timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture<ListOffsetResult> future) { Map<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<>(); Set<TopicPartition> partitionsToRetry = new HashSet<>(); Set<String> unauthorizedTopics = new HashSet<>(); - for (Map.Entry<TopicPartition, ListOffsetRequest.PartitionData> entry : timestampsToSearch.entrySet()) { + Map<TopicPartition, ListOffsetPartitionResponse> partitionsData = byTopicPartitions(listOffsetResponse.responseData()); Review comment: I wonder if grouping by `TopicPartition` is really necessary here. We iterate over `timestampsToSearch` to get the `ListOffsetPartitionResponse` for the current `TopicPartition` but we could also iterate over the response set directly and thus avoid grouping. Moreover, we always assume that the result set contains the `TopicPartition` that we are interested in so it would not change the semantic. Am I missing something? What do you think? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java ########## @@ -58,239 +47,54 @@ public class ListOffsetResponse extends AbstractResponse { public static final long UNKNOWN_TIMESTAMP = -1L; public static final long UNKNOWN_OFFSET = -1L; + public static final int UNKNOWN_EPOCH = RecordBatch.NO_PARTITION_LEADER_EPOCH; - // top level fields - private static final Field.ComplexArray TOPICS = new Field.ComplexArray("responses", - "The listed offsets by topic"); - - // topic level fields - private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partition_responses", - "The listed offsets by partition"); - - // partition level fields - // This key is only used by ListOffsetResponse v0 - @Deprecated - private static final Field.Array OFFSETS = new Field.Array("offsets", INT64, "A list of offsets."); - private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp", - "The timestamp associated with the returned offset"); - private static final Field.Int64 OFFSET = new Field.Int64("offset", - "The offset found"); - - private static final Field PARTITIONS_V0 = PARTITIONS.withFields( - PARTITION_ID, - ERROR_CODE, - OFFSETS); - - private static final Field TOPICS_V0 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V0); - - private static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema( - TOPICS_V0); - - // V1 bumped for the removal of the offsets array - private static final Field PARTITIONS_V1 = PARTITIONS.withFields( - PARTITION_ID, - ERROR_CODE, - TIMESTAMP, - OFFSET); - - private static final Field TOPICS_V1 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V1); - - private static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema( - TOPICS_V1); - - // V2 bumped for the addition of the throttle time - private static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema( - THROTTLE_TIME_MS, - TOPICS_V1); - - // V3 bumped to indicate that on quota violation brokers send out responses before throttling. - private static final Schema LIST_OFFSET_RESPONSE_V3 = LIST_OFFSET_RESPONSE_V2; - - // V4 bumped for the addition of the current leader epoch in the request schema and the - // leader epoch in the response partition data - private static final Field PARTITIONS_V4 = PARTITIONS.withFields( - PARTITION_ID, - ERROR_CODE, - TIMESTAMP, - OFFSET, - LEADER_EPOCH); + private final ListOffsetResponseData data; - private static final Field TOPICS_V4 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V4); - - private static final Schema LIST_OFFSET_RESPONSE_V4 = new Schema( - THROTTLE_TIME_MS, - TOPICS_V4); - - private static final Schema LIST_OFFSET_RESPONSE_V5 = LIST_OFFSET_RESPONSE_V4; - - public static Schema[] schemaVersions() { - return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2, - LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4, LIST_OFFSET_RESPONSE_V5}; - } - - public static final class PartitionData { - public final Errors error; - // The offsets list is only used in ListOffsetResponse v0. - public final List<Long> offsets; - public final Long timestamp; - public final Long offset; - public final Optional<Integer> leaderEpoch; - - /** - * Constructor for ListOffsetResponse v0 - */ - public PartitionData(Errors error, List<Long> offsets) { - this.error = error; - this.offsets = offsets; - this.timestamp = null; - this.offset = null; - this.leaderEpoch = Optional.empty(); - } - - /** - * Constructor for ListOffsetResponse v1 - */ - public PartitionData(Errors error, long timestamp, long offset, Optional<Integer> leaderEpoch) { - this.error = error; - this.timestamp = timestamp; - this.offset = offset; - this.offsets = null; - this.leaderEpoch = leaderEpoch; - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("PartitionData("). - append("errorCode: ").append(error.code()); - - if (offsets == null) { - bld.append(", timestamp: ").append(timestamp). - append(", offset: ").append(offset). - append(", leaderEpoch: ").append(leaderEpoch); - } else { - bld.append(", offsets: "). - append("["). - append(Utils.join(this.offsets, ",")). - append("]"); - } - bld.append(")"); - return bld.toString(); - } + public ListOffsetResponse(ListOffsetResponseData data) { + this.data = data; } - private final int throttleTimeMs; - private final Map<TopicPartition, PartitionData> responseData; - - /** - * Constructor for all versions without throttle time - */ - public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) { - this(DEFAULT_THROTTLE_TIME, responseData); - } - - public ListOffsetResponse(int throttleTimeMs, Map<TopicPartition, PartitionData> responseData) { - this.throttleTimeMs = throttleTimeMs; - this.responseData = responseData; - } - - public ListOffsetResponse(Struct struct) { - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - responseData = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE)); - PartitionData partitionData; - if (partitionResponse.hasField(OFFSETS)) { - Object[] offsets = partitionResponse.get(OFFSETS); - List<Long> offsetsList = new ArrayList<>(); - for (Object offset : offsets) - offsetsList.add((Long) offset); - partitionData = new PartitionData(error, offsetsList); - } else { - long timestamp = partitionResponse.get(TIMESTAMP); - long offset = partitionResponse.get(OFFSET); - Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, LEADER_EPOCH); - partitionData = new PartitionData(error, timestamp, offset, leaderEpoch); - } - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } + public ListOffsetResponse(Struct struct, short version) { + data = new ListOffsetResponseData(struct, version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } - public Map<TopicPartition, PartitionData> responseData() { - return responseData; + public ListOffsetResponseData data() { + return data; + } + + public List<ListOffsetTopicResponse> responseData() { Review comment: I would call this one `topics()` as you did already in the request. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) Review comment: Same comment as above. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) Review comment: I think `oldStyleOffsets` is an empty array by default. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3883,21 +3886,24 @@ void handleResponse(AbstractResponse abstractResponse) { ListOffsetResponse response = (ListOffsetResponse) abstractResponse; Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>(); - for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) { - TopicPartition tp = result.getKey(); - PartitionData partitionData = result.getValue(); - - KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); - Errors error = partitionData.error; - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + for (ListOffsetTopicResponse topic : response.responseData()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); Review comment: This is not related to your PR at all. It seems that if `offsetRequestSpec` is `null` here, `future` will be `null` as well cause `futures` is initialised based on `topicPartitionOffsets`. If it turns out to be correct, it may be better to just log a warning here like we do in `createTopics()`. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -965,11 +994,11 @@ public void onFailure(RuntimeException e) { * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node, - final Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch, + final Map<TopicPartition, ListOffsetPartition> timestampsToSearch, boolean requireTimestamp) { ListOffsetRequest.Builder builder = ListOffsetRequest.Builder .forConsumer(requireTimestamp, isolationLevel) - .setTargetTimes(timestampsToSearch); + .setTargetTimes(toListOffsetTopics(timestampsToSearch)); Review comment: This conversion is a bit unfortunate as we have to traverse all the partitions again to build the `List<ListOffsetTopic>`. Instead, we could compute it directly within `groupListOffsetRequests` and could receive `Map<Node, List<ListOffsetTopic>` directly here. That seems doable but I may have missed something. ########## File path: clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ########## @@ -1581,8 +1582,19 @@ SaslClient createSaslClient() { @Test public void testConvertListOffsetResponseToSaslHandshakeResponse() { - ListOffsetResponse response = new ListOffsetResponse(0, Collections.singletonMap(new TopicPartition("topic", 0), - new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, Optional.empty()))); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName("topic") + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setErrorCode(Errors.NONE.code()) + .setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH) + .setPartitionIndex(0) + .setOffset(0) + .setTimestamp(0))))); + ListOffsetResponse response = new ListOffsetResponse(data); +// ListOffsetResponse response = new ListOffsetResponse(0, Collections.singletonMap(new TopicPartition("topic", 0), +// new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, Optional.empty()))); Review comment: Can we remove these two? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) Review comment: nit: Could we split the line as before? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() Review comment: nit: The indentation looks weird. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3220,12 +3286,30 @@ public void testListOffsetsMetadataRetriableErrors() throws Exception { env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 - Map<TopicPartition, PartitionData> responseData = new HashMap<>(); - responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543))); + ListOffsetTopicResponse t0 = new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(-1L) + .setOffset(345L) + .setLeaderEpoch(543))); Review comment: nit: What about creating a small helper to create a `ListOffsetTopicResponse` for a given `TopicPartition` & co? That would reduce the boilerplate code. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) Review comment: nit: Could we split the line? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.INVALID_REQUEST.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } else { + + def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + try { + val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID + val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(offsetRequest.isolationLevel) + else + None + + val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), + fetchOnlyFromLeader) + + val response = foundOpt match { + case Some(found) => { + val partitionResponse = new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setTimestamp(found.timestamp) + .setOffset(found.offset) + if (found.leaderEpoch.isPresent) + partitionResponse.setLeaderEpoch(found.leaderEpoch.get) + partitionResponse + } + case None => + new ListOffsetPartitionResponse() Review comment: Replace by `buildErrorResponse`. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.INVALID_REQUEST.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } else { + + def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + try { + val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID + val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(offsetRequest.isolationLevel) + else + None + + val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), Review comment: We could use a scala `Option` now. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.INVALID_REQUEST.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } else { + + def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } Review comment: I suggest to move this one up and ensure it is used everywhere. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + new ListOffsetPartitionResponse() Review comment: Replace by `buildErrorResponse`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org