yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r615021158
########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean isKey) { } } + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) + .boxed().collect(Collectors.toList()); + out.writeInt(nullIndexList.size()); + for (int i : nullIndexList) out.writeInt(i); + } + @Override public byte[] serialize(String topic, List<Inner> data) { if (data == null) { return null; } - final int size = data.size(); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { + out.writeByte(serStrategy.ordinal()); // write serialization strategy flag + if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) { + serializeNullIndexList(out, data); + } + final int size = data.size(); out.writeInt(size); for (Inner entry : data) { - final byte[] bytes = inner.serialize(topic, entry); - if (!isFixedLength) { - out.writeInt(bytes.length); + if (entry == null) { + if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) { + out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE); + } + } else { + final byte[] bytes = inner.serialize(topic, entry); + if (!isFixedLength || serStrategy == SerializationStrategy.NEGATIVE_SIZE) { + out.writeInt(bytes.length); Review comment: As per flag names, totally agreed. Changing them to `VARIABLE_SIZE` and `CONSTANT_SIZE`. ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean isKey) { } } + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) + .boxed().collect(Collectors.toList()); + out.writeInt(nullIndexList.size()); + for (int i : nullIndexList) out.writeInt(i); + } + @Override public byte[] serialize(String topic, List<Inner> data) { if (data == null) { return null; } - final int size = data.size(); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { + out.writeByte(serStrategy.ordinal()); // write serialization strategy flag + if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) { + serializeNullIndexList(out, data); + } + final int size = data.size(); out.writeInt(size); for (Inner entry : data) { - final byte[] bytes = inner.serialize(topic, entry); - if (!isFixedLength) { - out.writeInt(bytes.length); + if (entry == null) { + if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) { + out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE); + } + } else { + final byte[] bytes = inner.serialize(topic, entry); + if (!isFixedLength || serStrategy == SerializationStrategy.NEGATIVE_SIZE) { + out.writeInt(bytes.length); Review comment: As per flag names, totally agree. Changing them to `VARIABLE_SIZE` and `CONSTANT_SIZE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org