ableegoldman commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r629768563
########## File path: clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ########## @@ -265,4 +287,14 @@ public UUIDSerde() { static public Serde<Void> Void() { return new VoidSerde(); } + + /* + * A serde for {@code List} type + */ + static public <L extends List<Inner>, Inner> Serde<List<Inner>> Review comment: Same here, --> `public static`. Can you also leave it on one line? I know it's super long, but that's just the style we use in Kafka ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public class ListDeserializer<Inner> implements Deserializer<List<Inner>> { + + private static final Map<Class<? extends Deserializer<?>>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( + mkEntry(ShortDeserializer.class, Short.BYTES), + mkEntry(IntegerDeserializer.class, Integer.BYTES), + mkEntry(FloatDeserializer.class, Float.BYTES), + mkEntry(LongDeserializer.class, Long.BYTES), + mkEntry(DoubleDeserializer.class, Double.BYTES), + mkEntry(UUIDDeserializer.class, 36) + ); + + private Deserializer<Inner> inner; + private Class<?> listClass; + private Integer primitiveSize; + + public ListDeserializer() {} + + public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> inner) { + if (listClass == null || inner == null) { + throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); + } + this.listClass = listClass; + this.inner = inner; + this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } + + public Deserializer<Inner> getInnerDeserializer() { Review comment: nit: Kafka coding style doesn't use the `get` prefix in getters, ie this should be named `innerDeserializer` (same applies for any other getters in this PR, I won't bug you by commenting on every single one of them) ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer<Inner> implements Serializer<List<Inner>> { + + private static final List<Class<? extends Serializer<?>>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( + ShortSerializer.class, + IntegerSerializer.class, + FloatSerializer.class, + LongSerializer.class, + DoubleSerializer.class, + UUIDSerializer.class); + + private Serializer<Inner> inner; + private SerializationStrategy serStrategy; + private boolean isFixedLength; + + public ListSerializer() {} + + public ListSerializer(Serializer<Inner> inner) { + if (inner == null) { + throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); + } + this.inner = inner; + this.isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + this.serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } + + public Serializer<Inner> getInnerSerializer() { + return inner; + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (inner != null) { + throw new ConfigException("List serializer was already initialized using a non-default constructor"); + } + final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); + } else { + throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); + } + } + + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) + .boxed().collect(Collectors.toList()); + out.writeInt(nullIndexList.size()); + for (int i : nullIndexList) out.writeInt(i); + } + + @Override + public byte[] serialize(String topic, List<Inner> data) { + if (data == null) { + return null; + } + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + out.writeByte(serStrategy.ordinal()); // write serialization strategy flag + if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { + serializeNullIndexList(out, data); + } + final int size = data.size(); + out.writeInt(size); + for (Inner entry : data) { + if (entry == null) { + if (serStrategy == SerializationStrategy.VARIABLE_SIZE) { + out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE); + } + } else { + final byte[] bytes = inner.serialize(topic, entry); + if (!isFixedLength || serStrategy == SerializationStrategy.VARIABLE_SIZE) { Review comment: Also similar to a comment in ListDeserializer: it should not be possible for only one of these to be true, so let's just check one or the other here. In fact maybe we can get rid of the `isFixedLength` flag entirely now, since `SerializationStrategy.VARIABLE_SIZE` means exactly the same thing (or rather, the opposite of it) ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public class ListDeserializer<Inner> implements Deserializer<List<Inner>> { + + private static final Map<Class<? extends Deserializer<?>>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( + mkEntry(ShortDeserializer.class, Short.BYTES), + mkEntry(IntegerDeserializer.class, Integer.BYTES), + mkEntry(FloatDeserializer.class, Float.BYTES), + mkEntry(LongDeserializer.class, Long.BYTES), + mkEntry(DoubleDeserializer.class, Double.BYTES), + mkEntry(UUIDDeserializer.class, 36) + ); + + private Deserializer<Inner> inner; + private Class<?> listClass; + private Integer primitiveSize; + + public ListDeserializer() {} + + public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> inner) { + if (listClass == null || inner == null) { + throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); + } + this.listClass = listClass; + this.inner = inner; + this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } + + public Deserializer<Inner> getInnerDeserializer() { + return inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (listClass != null || inner != null) { + throw new ConfigException("List deserializer was already initialized using a non-default constructor"); + } + configureListClass(configs, isKey); + configureInnerSerde(configs, isKey); + } + + private void configureListClass(Map<String, ?> configs, boolean isKey) { + String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS; + final Object listClassOrName = configs.get(listTypePropertyName); + if (listClassOrName == null) { + throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config."); + } + try { + if (listClassOrName instanceof String) { + listClass = Utils.loadClass((String) listClassOrName, Object.class); + } else if (listClassOrName instanceof Class) { + listClass = (Class<?>) listClassOrName; + } else { + throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property."); + } + } catch (final ClassNotFoundException e) { + throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found."); + } + } + + @SuppressWarnings("unchecked") + private void configureInnerSerde(Map<String, ?> configs, boolean isKey) { + String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Deserializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer(); + } else { + throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found."); + } + } + + + @SuppressWarnings("unchecked") + private List<Inner> getListInstance(int listSize) { + try { + Constructor<List<Inner>> listConstructor; + try { + listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(Integer.TYPE); + return listConstructor.newInstance(listSize); + } catch (NoSuchMethodException e) { + listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(); + return listConstructor.newInstance(); + } + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | + IllegalArgumentException | InvocationTargetException e) { + throw new KafkaException("Could not construct a list instance of \"" + listClass.getCanonicalName() + "\"", e); + } + } + + private SerializationStrategy parseSerializationStrategyFlag(final int serializationStrategyFlag) throws IOException { + if (serializationStrategyFlag < 0 || serializationStrategyFlag >= SerializationStrategy.VALUES.length) { + throw new SerializationException("Invalid serialization strategy flag value"); + } + return SerializationStrategy.VALUES[serializationStrategyFlag]; + } + + private List<Integer> deserializeNullIndexList(final DataInputStream dis) throws IOException { + int nullIndexListSize = dis.readInt(); + List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize); + while (nullIndexListSize != 0) { + nullIndexList.add(dis.readInt()); + nullIndexListSize--; + } + return nullIndexList; + } + + @Override + public List<Inner> deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data))) { + SerializationStrategy serStrategy = parseSerializationStrategyFlag(dis.readByte()); + List<Integer> nullIndexList = null; + if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { + nullIndexList = deserializeNullIndexList(dis); + } + final int size = dis.readInt(); + List<Inner> deserializedList = getListInstance(size); + for (int i = 0; i < size; i++) { + if (serStrategy == SerializationStrategy.CONSTANT_SIZE + && nullIndexList.contains(i)) { + deserializedList.add(null); + continue; + } + int entrySize = (primitiveSize == null || serStrategy == SerializationStrategy.VARIABLE_SIZE) ? dis.readInt() : primitiveSize; Review comment: Since we no longer expose the SerializationStrategy or let users explicitly select it, these two equality checks should have both be true or both be false, right? Might read a bit easier if we only check `serStrategy == SerializationStrategy.VARIABLE_SIZE` here, and then just verify that `primitiveSize` is not null when we parse the serialization strategy flag at the top. WDYT? ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer<Inner> implements Serializer<List<Inner>> { + + private static final List<Class<? extends Serializer<?>>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( + ShortSerializer.class, + IntegerSerializer.class, + FloatSerializer.class, + LongSerializer.class, + DoubleSerializer.class, + UUIDSerializer.class); + + private Serializer<Inner> inner; + private SerializationStrategy serStrategy; + private boolean isFixedLength; + + public ListSerializer() {} + + public ListSerializer(Serializer<Inner> inner) { + if (inner == null) { + throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); + } + this.inner = inner; + this.isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + this.serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } + + public Serializer<Inner> getInnerSerializer() { + return inner; + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (inner != null) { + throw new ConfigException("List serializer was already initialized using a non-default constructor"); + } + final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); + } else { + throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); + } + } + + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) Review comment: Since we don't know what the underlying list structure is, using `get(index)` like this could be pretty costly -- for example with a LinkedList this will be O(N), which makes it O(N^2) overall. Might be safer to just iterate through the list with a plain `for int i` loop and take note of the nulls that way ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer<Inner> implements Serializer<List<Inner>> { + + private static final List<Class<? extends Serializer<?>>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( + ShortSerializer.class, + IntegerSerializer.class, + FloatSerializer.class, + LongSerializer.class, + DoubleSerializer.class, + UUIDSerializer.class); + + private Serializer<Inner> inner; + private SerializationStrategy serStrategy; + private boolean isFixedLength; + + public ListSerializer() {} + + public ListSerializer(Serializer<Inner> inner) { + if (inner == null) { + throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); + } + this.inner = inner; + this.isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + this.serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } + + public Serializer<Inner> getInnerSerializer() { + return inner; + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (inner != null) { + throw new ConfigException("List serializer was already initialized using a non-default constructor"); + } + final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); + } else { + throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); + } + } + + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) + .boxed().collect(Collectors.toList()); + out.writeInt(nullIndexList.size()); + for (int i : nullIndexList) out.writeInt(i); Review comment: nit: put the `out.writeInt` on its own line ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ########## @@ -125,6 +126,27 @@ public UUIDSerde() { } } + static public final class ListSerde<Inner> extends WrapperSerde<List<Inner>> { Review comment: super nit: we put the modifier first, ie use `public static` ordering. ########## File path: clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ########## @@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() { } } + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnEmptyCollection() { + List<Integer> testData = Arrays.asList(); + Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get empty collection after serialization and deserialization on an empty list"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnNull() { + List<Integer> testData = null; + Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get null after serialization and deserialization on an empty list"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripIntPrimitiveInput() { + List<Integer> testData = Arrays.asList(1, 2, 3); + Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of integer primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() { + List<Integer> testData = Arrays.asList(1, 2, 3); + Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(21, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 21 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripShortPrimitiveInput() { + List<Short> testData = Arrays.asList((short) 1, (short) 2, (short) 3); + Serde<List<Short>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of short primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() { + List<Short> testData = Arrays.asList((short) 1, (short) 2, (short) 3); + Serde<List<Short>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); + assertEquals(15, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 15 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripFloatPrimitiveInput() { + List<Float> testData = Arrays.asList((float) 1, (float) 2, (float) 3); + Serde<List<Float>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of float primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() { + List<Float> testData = Arrays.asList((float) 1, (float) 2, (float) 3); + Serde<List<Float>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); + assertEquals(21, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 21 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripLongPrimitiveInput() { + List<Long> testData = Arrays.asList((long) 1, (long) 2, (long) 3); + Serde<List<Long>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Long()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of long primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() { + List<Long> testData = Arrays.asList((long) 1, (long) 2, (long) 3); + Serde<List<Long>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Long()); + assertEquals(33, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 33 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripDoublePrimitiveInput() { + List<Double> testData = Arrays.asList((double) 1, (double) 2, (double) 3); + Serde<List<Double>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Double()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of double primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForDoublePrimitiveInput() { + List<Double> testData = Arrays.asList((double) 1, (double) 2, (double) 3); + Serde<List<Double>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Double()); + assertEquals(33, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 33 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripUUIDInput() { + List<UUID> testData = Arrays.asList(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()); + Serde<List<UUID>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.UUID()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of UUID after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForUUIDInput() { + List<UUID> testData = Arrays.asList(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()); + Serde<List<UUID>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.UUID()); + assertEquals(117, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 117 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripNonPrimitiveInput() { + List<String> testData = Arrays.asList("A", "B", "C"); + Serde<List<String>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.String()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of strings list after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripPrimitiveInputWithNullEntries() { + List<Integer> testData = Arrays.asList(1, null, 3); + Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of integer primitives with null entries " + + "after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripNonPrimitiveInputWithNullEntries() { + List<String> testData = Arrays.asList("A", null, "C"); + Serde<List<String>> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.String()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of strings list with null entries " + + "after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnLinkedList() { + List<Integer> testData = new LinkedList<>(); + Serde<List<Integer>> listSerde = Serdes.ListSerde(LinkedList.class, Serdes.Integer()); + assertTrue(listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)) + instanceof LinkedList, "Should return List instance of type LinkedList"); + } + + @SuppressWarnings("unchecked") Review comment: Is the unchecked warning coming from something in the test itself, or just from using the serde? It should be possible to just use the Serde without getting a warning. I don't see anything in the test that looks suspicious so I'm guessing we need another suppression somewhere in the Serde implementation? ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer<Inner> implements Serializer<List<Inner>> { + + private static final List<Class<? extends Serializer<?>>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( + ShortSerializer.class, + IntegerSerializer.class, + FloatSerializer.class, + LongSerializer.class, + DoubleSerializer.class, + UUIDSerializer.class); + + private Serializer<Inner> inner; + private SerializationStrategy serStrategy; + private boolean isFixedLength; + + public ListSerializer() {} + + public ListSerializer(Serializer<Inner> inner) { + if (inner == null) { + throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); + } + this.inner = inner; + this.isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + this.serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } + + public Serializer<Inner> getInnerSerializer() { + return inner; + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (inner != null) { + throw new ConfigException("List serializer was already initialized using a non-default constructor"); + } + final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); + } else { + throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); + serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); + } + } + + private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { + List<Integer> nullIndexList = IntStream.range(0, data.size()) + .filter(i -> data.get(i) == null) + .boxed().collect(Collectors.toList()); + out.writeInt(nullIndexList.size()); + for (int i : nullIndexList) out.writeInt(i); + } + + @Override + public byte[] serialize(String topic, List<Inner> data) { + if (data == null) { + return null; + } + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + out.writeByte(serStrategy.ordinal()); // write serialization strategy flag + if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { Review comment: Same as my suggestion in ListDeserializer, can you add a quick comment here or above the `serializeNullIndexList` method explaining what this is doing (like you have above with `// write serialization strategy flag`) ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public class ListDeserializer<Inner> implements Deserializer<List<Inner>> { + + private static final Map<Class<? extends Deserializer<?>>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( + mkEntry(ShortDeserializer.class, Short.BYTES), + mkEntry(IntegerDeserializer.class, Integer.BYTES), + mkEntry(FloatDeserializer.class, Float.BYTES), + mkEntry(LongDeserializer.class, Long.BYTES), + mkEntry(DoubleDeserializer.class, Double.BYTES), + mkEntry(UUIDDeserializer.class, 36) + ); + + private Deserializer<Inner> inner; + private Class<?> listClass; + private Integer primitiveSize; + + public ListDeserializer() {} + + public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> inner) { + if (listClass == null || inner == null) { + throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); + } + this.listClass = listClass; + this.inner = inner; + this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } + + public Deserializer<Inner> getInnerDeserializer() { + return inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (listClass != null || inner != null) { + throw new ConfigException("List deserializer was already initialized using a non-default constructor"); + } + configureListClass(configs, isKey); + configureInnerSerde(configs, isKey); + } + + private void configureListClass(Map<String, ?> configs, boolean isKey) { + String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS; + final Object listClassOrName = configs.get(listTypePropertyName); + if (listClassOrName == null) { + throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config."); + } + try { + if (listClassOrName instanceof String) { + listClass = Utils.loadClass((String) listClassOrName, Object.class); + } else if (listClassOrName instanceof Class) { + listClass = (Class<?>) listClassOrName; + } else { + throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property."); + } + } catch (final ClassNotFoundException e) { + throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found."); + } + } + + @SuppressWarnings("unchecked") + private void configureInnerSerde(Map<String, ?> configs, boolean isKey) { + String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Deserializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer(); + } else { + throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found."); + } + } + Review comment: super nit: extra blank line ########## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public class ListDeserializer<Inner> implements Deserializer<List<Inner>> { + + private static final Map<Class<? extends Deserializer<?>>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( + mkEntry(ShortDeserializer.class, Short.BYTES), + mkEntry(IntegerDeserializer.class, Integer.BYTES), + mkEntry(FloatDeserializer.class, Float.BYTES), + mkEntry(LongDeserializer.class, Long.BYTES), + mkEntry(DoubleDeserializer.class, Double.BYTES), + mkEntry(UUIDDeserializer.class, 36) + ); + + private Deserializer<Inner> inner; + private Class<?> listClass; + private Integer primitiveSize; + + public ListDeserializer() {} + + public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> inner) { + if (listClass == null || inner == null) { + throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); + } + this.listClass = listClass; + this.inner = inner; + this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } + + public Deserializer<Inner> getInnerDeserializer() { + return inner; + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + if (listClass != null || inner != null) { + throw new ConfigException("List deserializer was already initialized using a non-default constructor"); + } + configureListClass(configs, isKey); + configureInnerSerde(configs, isKey); + } + + private void configureListClass(Map<String, ?> configs, boolean isKey) { + String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS; + final Object listClassOrName = configs.get(listTypePropertyName); + if (listClassOrName == null) { + throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config."); + } + try { + if (listClassOrName instanceof String) { + listClass = Utils.loadClass((String) listClassOrName, Object.class); + } else if (listClassOrName instanceof Class) { + listClass = (Class<?>) listClassOrName; + } else { + throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property."); + } + } catch (final ClassNotFoundException e) { + throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found."); + } + } + + @SuppressWarnings("unchecked") + private void configureInnerSerde(Map<String, ?> configs, boolean isKey) { + String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Deserializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer(); + } else { + throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found."); + } + } + + + @SuppressWarnings("unchecked") + private List<Inner> getListInstance(int listSize) { + try { + Constructor<List<Inner>> listConstructor; + try { + listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(Integer.TYPE); + return listConstructor.newInstance(listSize); + } catch (NoSuchMethodException e) { + listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(); + return listConstructor.newInstance(); + } + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | + IllegalArgumentException | InvocationTargetException e) { + throw new KafkaException("Could not construct a list instance of \"" + listClass.getCanonicalName() + "\"", e); + } + } + + private SerializationStrategy parseSerializationStrategyFlag(final int serializationStrategyFlag) throws IOException { + if (serializationStrategyFlag < 0 || serializationStrategyFlag >= SerializationStrategy.VALUES.length) { + throw new SerializationException("Invalid serialization strategy flag value"); + } + return SerializationStrategy.VALUES[serializationStrategyFlag]; + } + + private List<Integer> deserializeNullIndexList(final DataInputStream dis) throws IOException { Review comment: I found it a bit difficult to understand what was going on here since I'm reading this first, before the serialize implementation, but I take it we just encode the indices of any null values at the beginning of the serialized list? Can you leave a comment pointing that out, either here on the method itself or else down below where the method is used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org