yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r453892850
##########
File path:
clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -131,20 +132,53 @@ private void configureInnerSerde(Map<String, ?> configs,
boolean isKey) {
}
}
+ private SerializationStrategy parseSerializationStrategyFlag(final int
serializationStrategyFlag) throws IOException {
+ if (serializationStrategyFlag < 0 || serializationStrategyFlag >=
SerializationStrategy.VALUES.length) {
+ throw new SerializationException("Invalid serialization strategy
flag value");
+ }
+ return SerializationStrategy.VALUES[serializationStrategyFlag];
+ }
+
+ private List<Integer> deserializeNullIndexList(final DataInputStream dis)
throws IOException {
+ int nullIndexListSize = dis.readInt();
+ List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize);
+ while (nullIndexListSize != 0) {
+ nullIndexList.add(dis.readInt());
+ nullIndexListSize--;
+ }
+ return nullIndexList;
+ }
+
@Override
public List<Inner> deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try (final DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(data))) {
+ SerializationStrategy serStrategy =
parseSerializationStrategyFlag(dis.readByte());
+ List<Integer> nullIndexList = null;
+ if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+ nullIndexList = deserializeNullIndexList(dis);
+ }
final int size = dis.readInt();
List<Inner> deserializedList = getListInstance(size);
for (int i = 0; i < size; i++) {
- byte[] payload = new byte[primitiveSize == null ?
dis.readInt() : primitiveSize];
- if (dis.read(payload) == -1) {
- throw new SerializationException("End of the stream was
reached prematurely");
+ if (serStrategy == SerializationStrategy.NULL_INDEX_LIST
+ && nullIndexList.contains(i)) {
+ deserializedList.add(null);
+ continue;
+ }
+ int entrySize = primitiveSize == null || serStrategy ==
SerializationStrategy.NEGATIVE_SIZE ? dis.readInt() : primitiveSize;
Review comment:
It's probably better to wrap it into `if/else` construct instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]