ijuma commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r584172530
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +123,70 @@ public int sessionId() {
* @param partIterator The partition iterator.
* @return The response size in bytes.
*/
- public static <T extends BaseRecords> int sizeOf(short version,
-
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+ public static int sizeOf(short version,
+ Iterator<Map.Entry<TopicPartition,
FetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and
fixed, we can
// use arbitrary values here without affecting the result.
- FetchResponseData data = toMessage(0, Errors.NONE, partIterator,
INVALID_SESSION_ID);
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data =
new LinkedHashMap<>();
+ partIterator.forEachRemaining(entry -> data.put(entry.getKey(),
entry.getValue()));
ObjectSerializationCache cache = new ObjectSerializationCache();
- return 4 + data.size(cache, version);
+ return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
data).data.size(cache, version);
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 8;
}
-}
+
+ public static Optional<FetchResponseData.EpochEndOffset>
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+ return partitionResponse.divergingEpoch().epoch() < 0 ?
Optional.empty()
+ : Optional.of(partitionResponse.divergingEpoch());
+ }
+
+ public static boolean isDivergingEpoch(FetchResponseData.PartitionData
partitionResponse) {
+ return partitionResponse.divergingEpoch().epoch() >= 0;
+ }
+
+ public static Optional<Integer>
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+ return partitionResponse.preferredReadReplica() ==
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+ : Optional.of(partitionResponse.preferredReadReplica());
+ }
+
+ public static boolean isPreferredReplica(FetchResponseData.PartitionData
partitionResponse) {
+ return partitionResponse.preferredReadReplica() !=
INVALID_PREFERRED_REPLICA_ID;
+ }
+
+ public static FetchResponseData.PartitionData partitionResponse(int
partition, Errors error) {
+ return new FetchResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+ .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+ .setAbortedTransactions(null)
+ .setRecords(MemoryRecords.EMPTY);
+ }
+
+ /**
+ * cast the BaseRecords of PartitionData to Records. This is used to
eliminate duplicate code of type casting.
Review comment:
Does this ever fail? If so, it would be good to explain under which
conditions it can fail. Also "This is used to eliminate duplicate code of type
casting." seems a bit redundant.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -304,58 +108,12 @@ public int sessionId() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, error());
- responseDataMap.values().forEach(response ->
- updateErrorCounts(errorCounts, response.error())
- );
+ responseData().values().forEach(response ->
updateErrorCounts(errorCounts, Errors.forCode(response.errorCode())));
Review comment:
Can we update this not to use `responseData`? Then we at least have the
right behavior for the broker and we can fix the clients in the subsequent PR.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2377,14 +2378,19 @@ private ListOffsetsResponse
listOffsetsResponse(Map<TopicPartition, Long> partit
builder.append(0L, ("key-" + i).getBytes(), ("value-" +
i).getBytes());
records = builder.build();
}
- tpResponses.put(partition, new FetchResponse.PartitionData<>(
- Errors.NONE, highWatermark,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
- logStartOffset, null, records));
+ tpResponses.put(partition,
+ new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(highWatermark)
+
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(logStartOffset)
+ .setAbortedTransactions(null)
+ .setRecords(records));
Review comment:
Are some of these redundant? (eg `setAbortedTransactions(null)`)
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +123,70 @@ public int sessionId() {
* @param partIterator The partition iterator.
* @return The response size in bytes.
*/
- public static <T extends BaseRecords> int sizeOf(short version,
-
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+ public static int sizeOf(short version,
+ Iterator<Map.Entry<TopicPartition,
FetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and
fixed, we can
// use arbitrary values here without affecting the result.
- FetchResponseData data = toMessage(0, Errors.NONE, partIterator,
INVALID_SESSION_ID);
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data =
new LinkedHashMap<>();
+ partIterator.forEachRemaining(entry -> data.put(entry.getKey(),
entry.getValue()));
ObjectSerializationCache cache = new ObjectSerializationCache();
- return 4 + data.size(cache, version);
+ return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
data).data.size(cache, version);
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 8;
}
-}
+
+ public static Optional<FetchResponseData.EpochEndOffset>
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+ return partitionResponse.divergingEpoch().epoch() < 0 ?
Optional.empty()
+ : Optional.of(partitionResponse.divergingEpoch());
+ }
+
+ public static boolean isDivergingEpoch(FetchResponseData.PartitionData
partitionResponse) {
+ return partitionResponse.divergingEpoch().epoch() >= 0;
+ }
+
+ public static Optional<Integer>
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+ return partitionResponse.preferredReadReplica() ==
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+ : Optional.of(partitionResponse.preferredReadReplica());
+ }
+
+ public static boolean isPreferredReplica(FetchResponseData.PartitionData
partitionResponse) {
+ return partitionResponse.preferredReadReplica() !=
INVALID_PREFERRED_REPLICA_ID;
+ }
+
+ public static FetchResponseData.PartitionData partitionResponse(int
partition, Errors error) {
+ return new FetchResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+ .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+ .setAbortedTransactions(null)
+ .setRecords(MemoryRecords.EMPTY);
Review comment:
Aren't many of these set automatically by the generated classes?
##########
File path:
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -1270,13 +1271,24 @@ public void testFetchPositionAfterException() {
assertEquals(1, fetcher.sendFetches());
- Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>>
partitions = new LinkedHashMap<>();
- partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.NONE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, records));
- partitions.put(tp0, new
FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
- client.prepareResponse(new FetchResponse<>(Errors.NONE, new
LinkedHashMap<>(partitions),
- 0, INVALID_SESSION_ID));
+
+ Map<TopicPartition, FetchResponseData.PartitionData> partitions = new
LinkedHashMap<>();
+ partitions.put(tp1, new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(100)
+ .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+ .setAbortedTransactions(null)
+ .setRecords(records));
+ partitions.put(tp0, new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
+ .setHighWatermark(100)
+ .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+ .setAbortedTransactions(null)
+ .setRecords(MemoryRecords.EMPTY));
Review comment:
Are some of these redundant? (eg `setAbortedTransactions(null)`). Other
examples in the same file.
##########
File path:
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
##########
@@ -174,8 +173,14 @@ public int sizeInBytes() {
return null;
}
};
- initialFetched.put(tp, new
FetchResponse.PartitionData<>(Errors.NONE, 0, 0, 0,
- new LinkedList<>(), fetched));
+ initialFetched.put(tp, new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(0)
+ .setLastStableOffset(0)
+ .setLogStartOffset(0)
Review comment:
We can remove some redundant setters?
##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -963,9 +966,13 @@ class ReplicaFetcherThreadTest {
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
-
- val partitionData: thread.FetchData = new
FetchResponse.PartitionData[Records](
- Errors.NONE, 0, 0, 0, Optional.empty(), Collections.emptyList(), records)
+ val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code)
+ .setHighWatermark(0)
+ .setLastStableOffset(0)
+ .setLogStartOffset(0)
Review comment:
We can remove some redundant setters?
##########
File path: core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
##########
@@ -403,25 +401,24 @@ private class ReplicaFetcher(name: String, sourceBroker:
Node, topicPartitions:
debug("Issuing fetch request ")
- var fetchResponse: FetchResponse[MemoryRecords] = null
+ var fetchResponse: FetchResponse = null
try {
val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder)
- fetchResponse =
clientResponse.responseBody.asInstanceOf[FetchResponse[MemoryRecords]]
+ fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
} catch {
case t: Throwable =>
if (!isRunning)
throw t
}
if (fetchResponse != null) {
- fetchResponse.responseData.forEach { (tp, partitionData) =>
- replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData)
- }
+ fetchResponse.data.responses().forEach(topicResponse =>
+ topicResponse.partitions().forEach(partitionResponse =>
Review comment:
Nit: remove `()` twice.
##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -155,13 +155,28 @@ class FetchSessionTest {
assertEquals(Optional.of(1), epochs1(tp1))
assertEquals(Optional.of(2), epochs1(tp2))
- val response = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
- response.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, 100,
- 100, null, null))
- response.put(tp1, new FetchResponse.PartitionData(
- Errors.NONE, 10, 10, 10, null, null))
- response.put(tp2, new FetchResponse.PartitionData(
- Errors.NONE, 5, 5, 5, null, null))
+ val response = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
+ response.put(tp0, new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code)
+ .setHighWatermark(100)
+ .setLastStableOffset(100)
+ .setLogStartOffset(100)
+ .setAbortedTransactions(null)
+ .setRecords(null))
Review comment:
We can remove some redundant setters?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
// For fetch requests from clients, check if down-conversion is
disabled for the particular partition
if (!fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_VERSION)
} else {
try {
trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
// Because down-conversion is extremely memory intensive, we
want to try and delay the down-conversion as much
// as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
// client.
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
partitionData.highWatermark,
- partitionData.lastStableOffset, partitionData.logStartOffset,
- partitionData.preferredReadReplica,
partitionData.abortedTransactions,
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time))
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(new LazyDownConversionRecords(tp,
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+
.setPreferredReadReplica(partitionData.preferredReadReplica())
Review comment:
Do we have to copy like this or can we mutate the response?
##########
File path:
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -150,22 +150,23 @@ private static void assertListEquals(List<TopicPartition>
expected, List<TopicPa
private static final class RespEntry {
final TopicPartition part;
- final FetchResponse.PartitionData<MemoryRecords> data;
+ final FetchResponseData.PartitionData data;
RespEntry(String topic, int partition, long highWatermark, long
lastStableOffset) {
this.part = new TopicPartition(topic, partition);
- this.data = new FetchResponse.PartitionData<>(
- Errors.NONE,
- highWatermark,
- lastStableOffset,
- 0,
- null,
- null);
+
+ this.data = new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code())
Review comment:
Nit: indenting seems excessive.
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -364,7 +361,7 @@ abstract class AbstractFetcherThread(name: String,
}
}
if (isTruncationOnFetchSupported) {
- partitionData.divergingEpoch.ifPresent { divergingEpoch
=>
+ FetchResponse.divergingEpoch(partitionData).ifPresent {
divergingEpoch =>
Review comment:
Nit: indenting seems wrong.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -150,22 +150,23 @@ private static void assertListEquals(List<TopicPartition>
expected, List<TopicPa
private static final class RespEntry {
final TopicPartition part;
- final FetchResponse.PartitionData<MemoryRecords> data;
+ final FetchResponseData.PartitionData data;
RespEntry(String topic, int partition, long highWatermark, long
lastStableOffset) {
this.part = new TopicPartition(topic, partition);
- this.data = new FetchResponse.PartitionData<>(
- Errors.NONE,
- highWatermark,
- lastStableOffset,
- 0,
- null,
- null);
+
+ this.data = new FetchResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(highWatermark)
+ .setLastStableOffset(lastStableOffset)
+ .setLogStartOffset(0)
+ .setAbortedTransactions(null)
+ .setRecords(null);
Review comment:
Are some of these redundant? (eg setAbortedTransactions(null))
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -734,7 +730,7 @@ abstract class AbstractFetcherThread(name: String,
Option(partitionStates.stateValue(topicPartition))
}
- protected def toMemoryRecords(records: Records): MemoryRecords = {
+ protected def toMemoryRecords(records: BaseRecords): MemoryRecords = {
Review comment:
Do we have to update the matching inside the method to handle other
potential records types? Or do we want to avoid changing this method signature
instead?
##########
File path:
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
##########
@@ -78,19 +78,25 @@ public void setup() {
for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
String topic = UUID.randomUUID().toString();
for (int partitionId = 0; partitionId < partitionCount;
partitionId++) {
- FetchResponse.PartitionData<MemoryRecords> partitionData = new
FetchResponse.PartitionData<>(
- Errors.NONE, 0, 0, 0, Optional.empty(),
Collections.emptyList(), records);
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(partitionId)
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(0)
+ .setLastStableOffset(0)
+ .setLogStartOffset(0)
Review comment:
We can remove some redundant setters?
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -340,7 +336,8 @@ abstract class AbstractFetcherThread(name: String,
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset
== currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
- partitionData.error match {
+ val partitionError = Errors.forCode(partitionData.errorCode)
Review comment:
Not clear why we need this val. Seems like we can introduce a variable
in the `case _` instead.
##########
File path:
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
##########
@@ -70,24 +70,25 @@ public void setUp() {
handler = new FetchSessionHandler(LOG_CONTEXT, 1);
FetchSessionHandler.Builder builder = handler.newBuilder();
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> respMap = new LinkedHashMap<>();
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> respMap
= new LinkedHashMap<>();
for (int i = 0; i < partitionCount; i++) {
TopicPartition tp = new TopicPartition("foo", i);
FetchRequest.PartitionData partitionData = new
FetchRequest.PartitionData(0, 0, 200,
Optional.empty());
fetches.put(tp, partitionData);
builder.add(tp, partitionData);
- respMap.put(tp, new FetchResponse.PartitionData<>(
- Errors.NONE,
- 0L,
- 0L,
- 0,
- null,
- null));
+ respMap.put(tp, new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(0)
+ .setLastStableOffset(0)
+ .setLogStartOffset(0)
+ .setAbortedTransactions(null)
+ .setRecords(null));
Review comment:
We can remove some redundant set calls?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
// For fetch requests from clients, check if down-conversion is
disabled for the particular partition
if (!fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_VERSION)
} else {
try {
trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
// Because down-conversion is extremely memory intensive, we
want to try and delay the down-conversion as much
// as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
// client.
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
partitionData.highWatermark,
- partitionData.lastStableOffset, partitionData.logStartOffset,
- partitionData.preferredReadReplica,
partitionData.abortedTransactions,
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time))
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(new LazyDownConversionRecords(tp,
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+
.setPreferredReadReplica(partitionData.preferredReadReplica())
} catch {
case e: UnsupportedCompressionTypeException =>
trace("Received unsupported compression type error during
down-conversion", e)
- errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_COMPRESSION_TYPE)
}
}
case None =>
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
- partitionData.highWatermark,
- partitionData.lastStableOffset,
- partitionData.logStartOffset,
- partitionData.preferredReadReplica,
- partitionData.abortedTransactions,
- partitionData.divergingEpoch,
- unconvertedRecords)
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(unconvertedRecords)
+ .setPreferredReadReplica(partitionData.preferredReadReplica)
+ .setDivergingEpoch(partitionData.divergingEpoch)
Review comment:
Similar question, is the copy required?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
// For fetch requests from clients, check if down-conversion is
disabled for the particular partition
if (!fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_VERSION)
} else {
try {
trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
// Because down-conversion is extremely memory intensive, we
want to try and delay the down-conversion as much
// as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
// client.
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
partitionData.highWatermark,
- partitionData.lastStableOffset, partitionData.logStartOffset,
- partitionData.preferredReadReplica,
partitionData.abortedTransactions,
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time))
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(new LazyDownConversionRecords(tp,
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+
.setPreferredReadReplica(partitionData.preferredReadReplica())
} catch {
case e: UnsupportedCompressionTypeException =>
trace("Received unsupported compression type error during
down-conversion", e)
- errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_COMPRESSION_TYPE)
}
}
case None =>
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
- partitionData.highWatermark,
- partitionData.lastStableOffset,
- partitionData.logStartOffset,
- partitionData.preferredReadReplica,
- partitionData.abortedTransactions,
- partitionData.divergingEpoch,
- unconvertedRecords)
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(unconvertedRecords)
+ .setPreferredReadReplica(partitionData.preferredReadReplica)
+ .setDivergingEpoch(partitionData.divergingEpoch)
}
}
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition,
FetchPartitionData)]): Unit = {
- val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
+ val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicPartition]()
responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset =
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- if (data.isReassignmentFetch)
- reassigningPartitions.add(tp)
- val error = maybeDownConvertStorageError(data.error)
- partitions.put(tp, new FetchResponse.PartitionData(
- error,
- data.highWatermark,
- lastStableOffset,
- data.logStartOffset,
- data.preferredReadReplica.map(int2Integer).asJava,
- abortedTransactions,
- data.divergingEpoch.asJava,
- data.records))
+ if (data.isReassignmentFetch) reassigningPartitions.add(tp)
+ partitions.put(tp, new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(maybeDownConvertStorageError(data.error).code)
+ .setHighWatermark(data.highWatermark)
+ .setLastStableOffset(lastStableOffset)
+ .setLogStartOffset(data.logStartOffset)
+ .setAbortedTransactions(abortedTransactions)
+ .setRecords(data.records)
+
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
+ .setDivergingEpoch(data.divergingEpoch.getOrElse(new
FetchResponseData.EpochEndOffset)))
Review comment:
Similar question, is the copy required?
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -304,58 +115,12 @@ public int sessionId() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, error());
- responseDataMap.values().forEach(response ->
- updateErrorCounts(errorCounts, response.error())
- );
+ dataByTopicPartition.values().forEach(response ->
updateErrorCounts(errorCounts, Errors.forCode(response.errorCode())));
return errorCounts;
}
- public static FetchResponse<MemoryRecords> parse(ByteBuffer buffer, short
version) {
- return new FetchResponse<>(new FetchResponseData(new
ByteBufferAccessor(buffer), version));
- }
-
- @SuppressWarnings("unchecked")
- private static <T extends BaseRecords> LinkedHashMap<TopicPartition,
PartitionData<T>> toResponseDataMap(
- FetchResponseData message) {
- LinkedHashMap<TopicPartition, PartitionData<T>> responseMap = new
LinkedHashMap<>();
- message.responses().forEach(topicResponse -> {
- topicResponse.partitionResponses().forEach(partitionResponse -> {
- TopicPartition tp = new TopicPartition(topicResponse.topic(),
partitionResponse.partition());
- PartitionData<T> partitionData = new
PartitionData<>(partitionResponse);
- responseMap.put(tp, partitionData);
- });
- });
- return responseMap;
- }
-
- private static <T extends BaseRecords> FetchResponseData toMessage(int
throttleTimeMs, Errors error,
Review comment:
Can you clarify what you mean here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]