yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r615020981
########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean isKey) { } } + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) + .boxed().collect(Collectors.toList()); + out.writeInt(nullIndexList.size()); + for (int i : nullIndexList) out.writeInt(i); + } + @Override public byte[] serialize(String topic, List<Inner> data) { if (data == null) { return null; } - final int size = data.size(); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { + out.writeByte(serStrategy.ordinal()); // write serialization strategy flag + if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) { + serializeNullIndexList(out, data); + } + final int size = data.size(); out.writeInt(size); for (Inner entry : data) { - final byte[] bytes = inner.serialize(topic, entry); - if (!isFixedLength) { - out.writeInt(bytes.length); + if (entry == null) { + if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) { + out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE); + } + } else { + final byte[] bytes = inner.serialize(topic, entry); + if (!isFixedLength || serStrategy == SerializationStrategy.NEGATIVE_SIZE) { + out.writeInt(bytes.length); Review comment: Hey, no worries. For: > How about: if it's a primitive type, and we can detect this (I think we should be able to), then we never encode the size info. If a user opts to do so, just log a warning and ignore it. I think I am already doing that in the constructors: ``` public ListSerializer(Serializer<Inner> serializer) { this.inner = serializer; this.isFixedLength = serializer != null && fixedLengthSerializers.contains(serializer.getClass()); this.serStrategy = this.isFixedLength ? SerializationStrategy.NULL_INDEX_LIST : SerializationStrategy.NEGATIVE_SIZE; } public ListSerializer(Serializer<Inner> serializer, SerializationStrategy serStrategy) { this(serializer); this.serStrategy = serStrategy; } ``` If a user doesn't pass `serStrategy` flag, we pick the best one for her based on passed serializer. If a user passes her own `serStrategy` flag, we simply obey to it. However, we don't print any warning logs, since I assumed if the user passes the flag, then she probably knows what she is doing. What do you think? I could add a warning log otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org