yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r615020981
##########
File path:
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean
isKey) {
}
}
+ private void serializeNullIndexList(final DataOutputStream out,
List<Inner> data) throws IOException {
+ List<Integer> nullIndexList = IntStream.range(0, data.size())
+ .filter(i -> data.get(i) == null)
+ .boxed().collect(Collectors.toList());
+ out.writeInt(nullIndexList.size());
+ for (int i : nullIndexList) out.writeInt(i);
+ }
+
@Override
public byte[] serialize(String topic, List<Inner> data) {
if (data == null) {
return null;
}
- final int size = data.size();
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeByte(serStrategy.ordinal()); // write serialization
strategy flag
+ if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+ serializeNullIndexList(out, data);
+ }
+ final int size = data.size();
out.writeInt(size);
for (Inner entry : data) {
- final byte[] bytes = inner.serialize(topic, entry);
- if (!isFixedLength) {
- out.writeInt(bytes.length);
+ if (entry == null) {
+ if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+ out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+ }
+ } else {
+ final byte[] bytes = inner.serialize(topic, entry);
+ if (!isFixedLength || serStrategy ==
SerializationStrategy.NEGATIVE_SIZE) {
+ out.writeInt(bytes.length);
Review comment:
Hey, no worries.
For:
> How about: if it's a primitive type, and we can detect this (I think we
should be able to), then we never encode the size info. If a user opts to do
so, just log a warning and ignore it.
I think I am already doing that in the constructors:
```
public ListSerializer(Serializer<Inner> serializer) {
this.inner = serializer;
this.isFixedLength = serializer != null &&
fixedLengthSerializers.contains(serializer.getClass());
this.serStrategy = this.isFixedLength ?
SerializationStrategy.NULL_INDEX_LIST : SerializationStrategy.NEGATIVE_SIZE;
}
public ListSerializer(Serializer<Inner> serializer,
SerializationStrategy serStrategy) {
this(serializer);
this.serStrategy = serStrategy;
}
```
If a user doesn't pass `serStrategy` flag, we pick the best one for her
based on passed serializer.
If a user passes her own `serStrategy` flag, we simply obey to it.
However, we don't print any warning logs, since I assumed if the user passes
the flag, then she probably knows what she is doing.
What do you think? I could add a warning log otherwise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org