ijuma commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r585656791
##########
File path:
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -791,15 +791,17 @@ public void produceRequestGetErrorResponseTest() {
@Test
public void fetchResponseVersionTest() {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
- responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(
- Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- 0L, Optional.empty(), Collections.emptyList(), records));
-
- FetchResponse<MemoryRecords> v0Response = new
FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
- FetchResponse<MemoryRecords> v1Response = new
FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
+ responseData.put(new TopicPartition("test", 0),
+ new FetchResponseData.PartitionData()
Review comment:
Set partition id.
##########
File path:
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1146,38 +1160,45 @@ private FetchRequest createFetchRequest(int version) {
return FetchRequest.Builder.forConsumer(100, 100000,
fetchData).setMaxBytes(1000).build((short) version);
}
- private FetchResponse<MemoryRecords> createFetchResponse(Errors error, int
sessionId) {
- return new FetchResponse<>(error, new LinkedHashMap<>(), 25,
sessionId);
+ private FetchResponse createFetchResponse(Errors error, int sessionId) {
+ return FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>());
}
- private FetchResponse<MemoryRecords> createFetchResponse(int sessionId) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ private FetchResponse createFetchResponse(int sessionId) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
- responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), Collections.emptyList(), records));
- List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.singletonList(
- new FetchResponse.AbortedTransaction(234L, 999L));
- responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
- return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
- }
-
- private FetchResponse<MemoryRecords> createFetchResponse(boolean
includeAborted) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ responseData.put(new TopicPartition("test", 0), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
Review comment:
Set partition id.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,79 +754,85 @@ class KafkaApis(val requestChannel: RequestChannel,
// For fetch requests from clients, check if down-conversion is
disabled for the particular partition
if (!fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_VERSION)
} else {
try {
trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
// Because down-conversion is extremely memory intensive, we
want to try and delay the down-conversion as much
// as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
// client.
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
partitionData.highWatermark,
- partitionData.lastStableOffset, partitionData.logStartOffset,
- partitionData.preferredReadReplica,
partitionData.abortedTransactions,
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time))
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(new LazyDownConversionRecords(tp,
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+
.setPreferredReadReplica(partitionData.preferredReadReplica())
} catch {
case e: UnsupportedCompressionTypeException =>
trace("Received unsupported compression type error during
down-conversion", e)
- errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_COMPRESSION_TYPE)
}
}
case None =>
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
- partitionData.highWatermark,
- partitionData.lastStableOffset,
- partitionData.logStartOffset,
- partitionData.preferredReadReplica,
- partitionData.abortedTransactions,
- partitionData.divergingEpoch,
- unconvertedRecords)
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(unconvertedRecords)
+ .setPreferredReadReplica(partitionData.preferredReadReplica)
+ .setDivergingEpoch(partitionData.divergingEpoch)
}
}
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition,
FetchPartitionData)]): Unit = {
- val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
+ val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicPartition]()
responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset =
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- if (data.isReassignmentFetch)
- reassigningPartitions.add(tp)
- val error = maybeDownConvertStorageError(data.error)
- partitions.put(tp, new FetchResponse.PartitionData(
- error,
- data.highWatermark,
- lastStableOffset,
- data.logStartOffset,
- data.preferredReadReplica.map(int2Integer).asJava,
- abortedTransactions,
- data.divergingEpoch.asJava,
- data.records))
+ if (data.isReassignmentFetch) reassigningPartitions.add(tp)
+ val partitionData = new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(maybeDownConvertStorageError(data.error).code)
+ .setHighWatermark(data.highWatermark)
+ .setLastStableOffset(lastStableOffset)
+ .setLogStartOffset(data.logStartOffset)
+ .setAbortedTransactions(abortedTransactions)
+ .setRecords(data.records)
+
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
+ data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
+ partitions.put(tp, partitionData)
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
- var unconvertedFetchResponse: FetchResponse[Records] = null
+ var unconvertedFetchResponse: FetchResponse = null
- def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
+ def createResponse(throttleTimeMs: Int): FetchResponse = {
// Down-convert messages for each partition if required
- val convertedData = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[BaseRecords]]
+ val convertedData = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
unconvertedFetchResponse.responseData.forEach { (tp,
unconvertedPartitionData) =>
- if (unconvertedPartitionData.error != Errors.NONE)
+ val error = Errors.forCode(unconvertedPartitionData.errorCode)
+ if (error != Errors.NONE)
debug(s"Fetch request with correlation id
${request.header.correlationId} from client $clientId " +
- s"on partition $tp failed due to
${unconvertedPartitionData.error.exceptionName}")
+ s"on partition $tp failed due to ${error.exceptionName}")
convertedData.put(tp, maybeConvertFetchedData(tp,
unconvertedPartitionData))
}
// Prepare fetch response from converted data
- val response = new FetchResponse(unconvertedFetchResponse.error,
convertedData, throttleTimeMs,
- unconvertedFetchResponse.sessionId)
+ val response = FetchResponse.of(unconvertedFetchResponse.error,
throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData)
// record the bytes out metrics only when the response is being sent
response.responseData.forEach { (tp, data) =>
- brokerTopicStats.updateBytesOut(tp.topic,
fetchRequest.isFromFollower, reassigningPartitions.contains(tp),
data.records.sizeInBytes)
+ brokerTopicStats.updateBytesOut(tp.topic,
fetchRequest.isFromFollower,
+ reassigningPartitions.contains(tp), if (data.records == null) 0
else data.records.sizeInBytes)
Review comment:
We can use the utility method you added to avoid the null check.
##########
File path:
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1146,38 +1160,45 @@ private FetchRequest createFetchRequest(int version) {
return FetchRequest.Builder.forConsumer(100, 100000,
fetchData).setMaxBytes(1000).build((short) version);
}
- private FetchResponse<MemoryRecords> createFetchResponse(Errors error, int
sessionId) {
- return new FetchResponse<>(error, new LinkedHashMap<>(), 25,
sessionId);
+ private FetchResponse createFetchResponse(Errors error, int sessionId) {
+ return FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>());
}
- private FetchResponse<MemoryRecords> createFetchResponse(int sessionId) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ private FetchResponse createFetchResponse(int sessionId) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
- responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), Collections.emptyList(), records));
- List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.singletonList(
- new FetchResponse.AbortedTransaction(234L, 999L));
- responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
- return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
- }
-
- private FetchResponse<MemoryRecords> createFetchResponse(boolean
includeAborted) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ responseData.put(new TopicPartition("test", 0), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
+ .setLogStartOffset(0)
+ .setRecords(records));
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
Collections.singletonList(
+ new
FetchResponseData.AbortedTransaction().setProducerId(234L).setFirstOffset(999L));
+ responseData.put(new TopicPartition("test", 1), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
+ .setLogStartOffset(0)
+ .setAbortedTransactions(abortedTransactions));
+ return FetchResponse.of(Errors.NONE, 25, sessionId, responseData);
+ }
+
+ private FetchResponse createFetchResponse(boolean includeAborted) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
+ responseData.put(new TopicPartition("test", 0), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
Review comment:
Set partition id.
##########
File path:
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +126,98 @@ public int sessionId() {
* @param partIterator The partition iterator.
* @return The response size in bytes.
*/
- public static <T extends BaseRecords> int sizeOf(short version,
-
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+ public static int sizeOf(short version,
+ Iterator<Map.Entry<TopicPartition,
FetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and
fixed, we can
// use arbitrary values here without affecting the result.
- FetchResponseData data = toMessage(0, Errors.NONE, partIterator,
INVALID_SESSION_ID);
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data =
new LinkedHashMap<>();
+ partIterator.forEachRemaining(entry -> data.put(entry.getKey(),
entry.getValue()));
ObjectSerializationCache cache = new ObjectSerializationCache();
- return 4 + data.size(cache, version);
+ return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
data).data.size(cache, version);
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 8;
}
-}
+
+ public static Optional<FetchResponseData.EpochEndOffset>
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+ return partitionResponse.divergingEpoch().epoch() < 0 ?
Optional.empty()
+ : Optional.of(partitionResponse.divergingEpoch());
+ }
+
+ public static boolean isDivergingEpoch(FetchResponseData.PartitionData
partitionResponse) {
+ return partitionResponse.divergingEpoch().epoch() >= 0;
+ }
+
+ public static Optional<Integer>
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+ return partitionResponse.preferredReadReplica() ==
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+ : Optional.of(partitionResponse.preferredReadReplica());
+ }
+
+ public static boolean isPreferredReplica(FetchResponseData.PartitionData
partitionResponse) {
+ return partitionResponse.preferredReadReplica() !=
INVALID_PREFERRED_REPLICA_ID;
+ }
+
+ public static FetchResponseData.PartitionData partitionResponse(int
partition, Errors error) {
+ return new FetchResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+ }
+
+ /**
+ * Returns `partition.records` as `Records` (instead of `BaseRecords`). If
`records` is `null`, returns `MemoryRecords.EMPTY`.
+ *
+ * If this response was deserialized after a fetch, this method should
never fail. An example where this would
+ * fail is a down-converted response (e.g. LazyDownConversionRecords) on
the broker (before it's serialized and
+ * sent on the wire).
+ *
+ * @param partition partition data
+ * @return Records or empty record if the records in PartitionData is null.
+ */
+ public static Records records(FetchResponseData.PartitionData partition) {
+ if (partition.records() == null) return MemoryRecords.EMPTY;
+ else if (partition.records() instanceof Records) return (Records)
partition.records();
+ else throw new IllegalStateException("the record type is " +
partition.records().getClass().getSimpleName() +
Review comment:
No `else` needed since we used `return` for both other cases. For the
exception, I think we can just throw `ClassCastException` since
`IllegalStateException` doesn't fit very well for this case. I would also make
the message a bit more generic to avoid it going stale when we add more
`Records` subtypes. For example:
```java
"The record type is " + partition.records().getClass().getSimpleName() + ",
which is not a subtype of " +
Records.class.getSimpleName() + ". This method is only safe to call if the
`FetchResponse` was
deserialized from bytes."
##########
File path:
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1146,38 +1160,45 @@ private FetchRequest createFetchRequest(int version) {
return FetchRequest.Builder.forConsumer(100, 100000,
fetchData).setMaxBytes(1000).build((short) version);
}
- private FetchResponse<MemoryRecords> createFetchResponse(Errors error, int
sessionId) {
- return new FetchResponse<>(error, new LinkedHashMap<>(), 25,
sessionId);
+ private FetchResponse createFetchResponse(Errors error, int sessionId) {
+ return FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>());
}
- private FetchResponse<MemoryRecords> createFetchResponse(int sessionId) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ private FetchResponse createFetchResponse(int sessionId) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
- responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), Collections.emptyList(), records));
- List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.singletonList(
- new FetchResponse.AbortedTransaction(234L, 999L));
- responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
- return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
- }
-
- private FetchResponse<MemoryRecords> createFetchResponse(boolean
includeAborted) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ responseData.put(new TopicPartition("test", 0), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
+ .setLogStartOffset(0)
+ .setRecords(records));
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
Collections.singletonList(
+ new
FetchResponseData.AbortedTransaction().setProducerId(234L).setFirstOffset(999L));
+ responseData.put(new TopicPartition("test", 1), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
Review comment:
Set partition id.
##########
File path:
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1146,38 +1160,45 @@ private FetchRequest createFetchRequest(int version) {
return FetchRequest.Builder.forConsumer(100, 100000,
fetchData).setMaxBytes(1000).build((short) version);
}
- private FetchResponse<MemoryRecords> createFetchResponse(Errors error, int
sessionId) {
- return new FetchResponse<>(error, new LinkedHashMap<>(), 25,
sessionId);
+ private FetchResponse createFetchResponse(Errors error, int sessionId) {
+ return FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>());
}
- private FetchResponse<MemoryRecords> createFetchResponse(int sessionId) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ private FetchResponse createFetchResponse(int sessionId) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
- responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), Collections.emptyList(), records));
- List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.singletonList(
- new FetchResponse.AbortedTransaction(234L, 999L));
- responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
- return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
- }
-
- private FetchResponse<MemoryRecords> createFetchResponse(boolean
includeAborted) {
- LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseData = new
LinkedHashMap<>();
+ responseData.put(new TopicPartition("test", 0), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
+ .setLogStartOffset(0)
+ .setRecords(records));
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
Collections.singletonList(
+ new
FetchResponseData.AbortedTransaction().setProducerId(234L).setFirstOffset(999L));
+ responseData.put(new TopicPartition("test", 1), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
+ .setLogStartOffset(0)
+ .setAbortedTransactions(abortedTransactions));
+ return FetchResponse.of(Errors.NONE, 25, sessionId, responseData);
+ }
+
+ private FetchResponse createFetchResponse(boolean includeAborted) {
+ LinkedHashMap<TopicPartition, FetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
MemoryRecords records =
MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("blah".getBytes()));
+ responseData.put(new TopicPartition("test", 0), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
+ .setLogStartOffset(0)
+ .setRecords(records));
- responseData.put(new TopicPartition("test", 0), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), Collections.emptyList(), records));
-
- List<FetchResponse.AbortedTransaction> abortedTransactions =
Collections.emptyList();
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
Collections.emptyList();
if (includeAborted) {
abortedTransactions = Collections.singletonList(
- new FetchResponse.AbortedTransaction(234L, 999L));
+ new
FetchResponseData.AbortedTransaction().setProducerId(234L).setFirstOffset(999L));
}
- responseData.put(new TopicPartition("test", 1), new
FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
+ responseData.put(new TopicPartition("test", 1), new
FetchResponseData.PartitionData()
+ .setHighWatermark(1000000)
Review comment:
Set partition id.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]