yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r615020981



##########
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean 
isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, 
List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Hey, no worries.
   
   For:
   > How about: if it's a primitive type, and we can detect this (I think we 
should be able to), then we never encode the size info. If a user opts to do 
so, just log a warning and ignore it.
   
   I think I am already doing that in the constructors:
   ```
       public ListSerializer(Serializer<Inner> serializer) {
           this.inner = serializer;
           this.isFixedLength = serializer != null && 
fixedLengthSerializers.contains(serializer.getClass());
           this.serStrategy = this.isFixedLength ? 
SerializationStrategy.NULL_INDEX_LIST : SerializationStrategy.NEGATIVE_SIZE;
       }
   
       public ListSerializer(Serializer<Inner> serializer, 
SerializationStrategy serStrategy) {
           this(serializer);
           this.serStrategy = serStrategy;
       }
   ```
   
   If a user doesn't pass `serStrategy` flag, we pick the best one for her 
based on passed serializer.
   If a user passes her own `serStrategy` flag, we simply obey to it. 
   However, we don't print any warning logs, since I assumed if the user passes 
the flag, then she probably knows what she is doing.
   
   What do you think? I could add a warning log otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to