XComp commented on code in PR #23490: URL: https://github.com/apache/flink/pull/23490#discussion_r1364075854
########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; Review Comment: ```suggestion Preconditions.checkArguments((argIndexMapping == null) == (arrayFieldInitializers == null)); this.recordConstructor = recordConstructor; ``` Could we add a Precondition to reflect the invariant in the code and fail early in case of misuse? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; Review Comment: ```suggestion @Nullable private final int[] paramIndexMapping; ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, Review Comment: `migrating` seems to be obsolete. We can mark `arrayFieldInitializers` as `@Nullable` and document the semantics of the `null` value properly (maybe through a method `migratingEnabled()`?). ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; Review Comment: ```suggestion @Nullable private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { Review Comment: ```suggestion public static <T> JavaRecordSerializationHelper<T> create(Class<T> clazz, Field[] fields) { ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { Review Comment: ```suggestion } catch (IllegalArgumentException e) { throw e; } catch (Exception e) { ``` nit: Maybe, pass over the `IllegalArgumentException` as a valid `RuntimeException` rather than wrapping it in another one? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 Review Comment: ```suggestion // Java 13 and lower ``` nit: just to keep the comments as generic as possible ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + .filter(f -> f != null) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + argIndexMapping[i] = + field == null ? -1 : componentNames.indexOf(fields[i].getName()); + } + + // We have to initialize newly added primitive fields to their correct default value + LinkedList<Consumer<Object[]>> arrayFieldInitializers = new LinkedList<>(); Review Comment: Is there a reason to use `LinkedList` here? :thinking: `ArrayList` would have a better memory footprint. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { Review Comment: ```suggestion private JavaRecordSerializationHelper(Constructor<T> recordConstructor) { ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + .filter(f -> f != null) Review Comment: On the other note: Wouldn't that be an invalid state? ...having a field that's null? It might be better to throw an `IllegalArgumentException` rather than filtering those cases. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + .filter(f -> f != null) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + argIndexMapping[i] = + field == null ? -1 : componentNames.indexOf(fields[i].getName()); + } + + // We have to initialize newly added primitive fields to their correct default value + LinkedList<Consumer<Object[]>> arrayFieldInitializers = new LinkedList<>(); + for (int i = 0; i < componentNames.size(); i++) { + Class<?> fieldType = componentTypes[i]; + if (fieldType.isPrimitive() + && !previousFields.contains(componentNames.get(i))) { + final int index = i; + arrayFieldInitializers.add( + args -> { + Object defaultValue = Defaults.defaultValue(fieldType); + args[index] = defaultValue; + }); + } + } + return new JavaRecordSerializationHelper( + recordConstructor, true, argIndexMapping, arrayFieldInitializers); + } else { + return new JavaRecordSerializationHelper(recordConstructor); Review Comment: ```suggestion return new JavaRecordSerializationHelper<T>(recordConstructor); ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -118,6 +122,9 @@ public PojoSerializer( createRegisteredSubclassSerializers(registeredSubclasses, executionConfig); this.subclassSerializerCache = new HashMap<>(); + if (this.isRecord = TypeExtractor.isRecord(clazz)) { Review Comment: I'm not sure about this syntactic sugar: This might trigger PR proposals in the future where people think that a equals sign is missing even though it's correct. I would suggest using the more explicit way where `this.isRecord` is set in the if block. We could even argue that `isRecord` is an obsolete field because `recordHelper` is actually `@Nullable`. The `isRecord` field could be replaced by a method `isRecord()` that returns `recordHelper != null` to make the semantics of `recordHelper` being `null` reflected in the code. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java: ########## @@ -2057,6 +2057,7 @@ protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo( GENERIC_TYPE_DOC_HINT); return new GenericTypeInfo<>(clazz); } + boolean isRecord = isRecord(clazz); Review Comment: ```suggestion boolean isRecord = isRecord(clazz); ``` nit: we could move this local variable closer to the block where it is used. Or is there a reason why it's kept separated from the `fields` for loop? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + .filter(f -> f != null) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + argIndexMapping[i] = + field == null ? -1 : componentNames.indexOf(fields[i].getName()); + } + + // We have to initialize newly added primitive fields to their correct default value + LinkedList<Consumer<Object[]>> arrayFieldInitializers = new LinkedList<>(); + for (int i = 0; i < componentNames.size(); i++) { + Class<?> fieldType = componentTypes[i]; + if (fieldType.isPrimitive() + && !previousFields.contains(componentNames.get(i))) { + final int index = i; + arrayFieldInitializers.add( + args -> { + Object defaultValue = Defaults.defaultValue(fieldType); + args[index] = defaultValue; + }); + } + } + return new JavaRecordSerializationHelper( Review Comment: ```suggestion return new JavaRecordSerializationHelper<T>( ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { Review Comment: Could we add some unit tests for this class to have some more specific test cases as well? But I checked that both code paths (migration and non-migration) are covered by the newly added {{Java17PojoRecordSerializerUpgradeTest`. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; + + private transient JavaRecordSerializationHelper<T> recordHelper; + /** Constructor to create a new {@link PojoSerializer}. */ @SuppressWarnings("unchecked") public PojoSerializer( Review Comment: I wasn't sure where to put this comment. So it's not really related to the marked line. The documentation for the Pojo serialization happens to be in the JavaDoc of `PojoTypeInfo` and Types.POJO`. Should we add additional information about the Java Record support there as well? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { Review Comment: ```suggestion public RecordBuilder createRecordBuilder() { ``` There is a common pattern in how we use the `initParams`, `setParams` and `instantiateRecord` methods here. The `Object[]` that's returned by `initParams` is passed to the other two methods but not used anywhere else. Essentially, what we have is a Builder pattern here. The Builder can be created by the `JavaRecordSerializationHelper` using a factory method `createRecordBuilder()` that replaces `initParams()`. Its state contains the field mapping, the initialization routines and the record constructor. `instantiateRecord` becomes a `.build()` method which doesn't require any parameters because the record can be created using the Builder's internal state. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java: ########## @@ -2174,6 +2175,11 @@ protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo( return pojoType; } + public static boolean isRecord(Class<?> clazz) { Review Comment: Should we add JavaDoc considering that it's a `@Public` class? It might make sense to label it as `@PublicEvolving` analogously to the other static utitlity methods for now to give it some room to evolve? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java: ########## @@ -2075,7 +2076,7 @@ protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo( List<PojoField> pojoFields = new ArrayList<>(); for (Field field : fields) { Type fieldType = field.getGenericType(); - if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { + if (!isRecord && !isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { Review Comment: What about adding the following code block before the default constructor extraction in line 2143 (essentially right after validating the methods and before retrieving the default constructor for that class): ```java if (isRecord) { // no default constructor extraction needs to be applied for Java records return pojoType; } ``` ...to be more explicit on that specific use case instead of hiding the logic in the default constructor extraction logic. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; Review Comment: After having gone through the code (keep in mind that I haven't worked with this part of the code, yet), I feel like "migrating" describes the scenario in which the parameter mapping is used. But it's kind of misleading when looking at this class and not having knowledge of the scenario it is used in. I would suggest to remove "migration" as a concept from this specific class. The purpose of this class is to hold the `recordConstructor` and optional field mapping functionality. That should be good enough to grasp the purpose of this class. WDYT? ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assert (serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected boolean allowNullInstances(TypeSerializer serializer) { + return true; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClass> getTypeClass() { + return TestUserClass.class; + } + + @Override + protected TestUserClass[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClass[] { + new TestUserClass( + rnd.nextInt(), + "foo", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[] {10, 11, 12})), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + null, + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})), + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})) + }; + } + + // User code class for testing the serializer + public record TestUserClass( + int dumm1, String dumm2, double dumm3, Date dumm5, NestedTestUserClass nestedClass) {} + + public static class NestedTestUserClass { + public int dumm1; + public String dumm2; + public double dumm3; + public int[] dumm4; + + public NestedTestUserClass() {} + + public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + this.dumm3 = dumm3; + this.dumm4 = dumm4; + } + + @Override + public int hashCode() { + return Objects.hash(dumm1, dumm2, dumm3, dumm4); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NestedTestUserClass)) { + return false; + } + NestedTestUserClass otherTUC = (NestedTestUserClass) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + if (dumm4.length != otherTUC.dumm4.length) { + return false; + } + for (int i = 0; i < dumm4.length; i++) { + if (dumm4[i] != otherTUC.dumm4[i]) { + return false; + } + } + return true; + } + } + + /** This tests if the hashes returned by the pojo and tuple comparators are the same */ Review Comment: ```suggestion /** This tests if the hashes returned by the pojo and tuple comparators are the same. */ ``` nit: I don't know why checkstyle complains in my IDE but not in the Maven run about that one. :thinking: ########## pom.xml: ########## @@ -205,6 +205,7 @@ under the License. <!-- Can be set to any value to reproduce a specific build. --> <test.randomization.seed/> <test.unit.pattern>**/*Test.*</test.unit.pattern> + <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern> Review Comment: ```suggestion <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern> ``` That looks reasonable. I guess the other "more maven-like" way would be to move the Java 17 code into a separate module? :thinking: But that feels like overkill for a few test classes. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + .filter(f -> f != null) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + argIndexMapping[i] = + field == null ? -1 : componentNames.indexOf(fields[i].getName()); Review Comment: Here as well, shouldn't a field being `null` cause an `IllegalArgumentException`? Or is there a reason why a user should pass in `null`? ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assert (serializer instanceof PojoSerializer); Review Comment: ```suggestion assertThat(serializer).isInstanceOf(PojoSerializer.class); ``` I'm not too comfortable with the JDK assert because we forgot to enable it in the past in CI. And I don't see a reason to not use assertj in test code. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ Review Comment: Adding more information on what this class does would help understanding its purpose (covering the field mapping). ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( Review Comment: ```suggestion private JavaRecordSerializationHelper( ``` ########## pom.xml: ########## @@ -1102,6 +1103,12 @@ under the License. <profile> <id>java17-target</id> + + <!-- Include Java 17 specific tests (by not excluding them) --> + <properties> Review Comment: Shouldn't we move that into the `java17` profile? :thinking: that's the one that's when JDK 17 is selected for compilation and which enable us to parse the Java record code :thinking: ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> Review Comment: ```suggestion <!-- Suppress for Java17-specific test classes where Java records are used. --> <!-- Checkstyle applies MethodNameCheck on those Java records in the current configuration. --> ``` we should add more context to the comment. The current comment only describes the line below which makes the comment kind of obsolete. ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assert (serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected boolean allowNullInstances(TypeSerializer serializer) { Review Comment: ```suggestion protected boolean allowNullInstances(TypeSerializer<TestUserClass> serializer) { ``` nit ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); Review Comment: ```suggestion private final TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); ``` nit ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/> Review Comment: I'm wondering whether we should create a follow-up task for checkstyle Java17 support in Flink an reference this rule. You added this because of the method name rule error that checkstyle raises, right? ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assert (serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected boolean allowNullInstances(TypeSerializer serializer) { + return true; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClass> getTypeClass() { + return TestUserClass.class; + } + + @Override + protected TestUserClass[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClass[] { + new TestUserClass( + rnd.nextInt(), + "foo", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[] {10, 11, 12})), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + null, + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})), + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})) + }; + } + + // User code class for testing the serializer + public record TestUserClass( + int dumm1, String dumm2, double dumm3, Date dumm5, NestedTestUserClass nestedClass) {} + + public static class NestedTestUserClass { + public int dumm1; + public String dumm2; + public double dumm3; + public int[] dumm4; + + public NestedTestUserClass() {} + + public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + this.dumm3 = dumm3; + this.dumm4 = dumm4; + } + + @Override + public int hashCode() { + return Objects.hash(dumm1, dumm2, dumm3, dumm4); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NestedTestUserClass)) { + return false; + } + NestedTestUserClass otherTUC = (NestedTestUserClass) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + if (dumm4.length != otherTUC.dumm4.length) { + return false; + } + for (int i = 0; i < dumm4.length; i++) { + if (dumm4[i] != otherTUC.dumm4[i]) { + return false; + } + } + return true; + } + } + + /** This tests if the hashes returned by the pojo and tuple comparators are the same */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + void testTuplePojoTestEquality() throws IncompatibleKeysException { + + // test with a simple, string-key first. + PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type; + List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>(); + pType.getFlatFields("nestedClass.dumm2", 0, result); + int[] fields = new int[1]; // see below + fields[0] = result.get(0).getPosition(); + TypeComparator<TestUserClass> pojoComp = + pType.createComparator(fields, new boolean[] {true}, 0, new ExecutionConfig()); + + TestUserClass pojoTestRecord = + new TestUserClass( + 0, + "abc", + 3d, + new Date(), + new NestedTestUserClass(1, "haha", 4d, new int[] {5, 4, 3})); + int pHash = pojoComp.hash(pojoTestRecord); + + Tuple1<String> tupleTest = new Tuple1<String>("haha"); + TupleTypeInfo<Tuple1<String>> tType = + (TupleTypeInfo<Tuple1<String>>) TypeExtractor.getForObject(tupleTest); + TypeComparator<Tuple1<String>> tupleComp = + tType.createComparator( + new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); + + int tHash = tupleComp.hash(tupleTest); + + assertThat(tHash) + .isEqualTo(pHash) + .withFailMessage( + "The hashing for tuples and pojos must be the same, so that they are mixable"); + + Tuple3<Integer, String, Double> multiTupleTest = + new Tuple3<Integer, String, Double>( Review Comment: ```suggestion new Tuple3<>( ``` nit ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/> Review Comment: ```suggestion <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks="MethodNameCheck"/> ``` We should make the rule as specific as possible to allow the checkstyle to be applied in any other file. We could even select the two files specificly. WDYT? ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assert (serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected boolean allowNullInstances(TypeSerializer serializer) { + return true; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClass> getTypeClass() { + return TestUserClass.class; + } + + @Override + protected TestUserClass[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClass[] { + new TestUserClass( + rnd.nextInt(), + "foo", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[] {10, 11, 12})), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + null, + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})), + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})) + }; + } + + // User code class for testing the serializer + public record TestUserClass( + int dumm1, String dumm2, double dumm3, Date dumm5, NestedTestUserClass nestedClass) {} + + public static class NestedTestUserClass { + public int dumm1; + public String dumm2; + public double dumm3; + public int[] dumm4; + + public NestedTestUserClass() {} + + public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + this.dumm3 = dumm3; + this.dumm4 = dumm4; + } + + @Override + public int hashCode() { + return Objects.hash(dumm1, dumm2, dumm3, dumm4); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NestedTestUserClass)) { + return false; + } + NestedTestUserClass otherTUC = (NestedTestUserClass) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + if (dumm4.length != otherTUC.dumm4.length) { + return false; + } + for (int i = 0; i < dumm4.length; i++) { + if (dumm4[i] != otherTUC.dumm4[i]) { + return false; + } + } + return true; + } + } + + /** This tests if the hashes returned by the pojo and tuple comparators are the same */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + void testTuplePojoTestEquality() throws IncompatibleKeysException { + + // test with a simple, string-key first. + PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type; + List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>(); + pType.getFlatFields("nestedClass.dumm2", 0, result); + int[] fields = new int[1]; // see below + fields[0] = result.get(0).getPosition(); + TypeComparator<TestUserClass> pojoComp = + pType.createComparator(fields, new boolean[] {true}, 0, new ExecutionConfig()); + + TestUserClass pojoTestRecord = + new TestUserClass( + 0, + "abc", + 3d, + new Date(), + new NestedTestUserClass(1, "haha", 4d, new int[] {5, 4, 3})); + int pHash = pojoComp.hash(pojoTestRecord); + + Tuple1<String> tupleTest = new Tuple1<String>("haha"); Review Comment: ```suggestion Tuple1<String> tupleTest = new Tuple1<>("haha"); ``` nit ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/> Review Comment: When looking for the reason for this weird checkstyle rule, I found https://github.com/checkstyle/checkstyle/issues/8023. I verified that this is actual an issue in our current setup. It's not an issue for this PR because we don't need this feature of records. But I'm wondering whether we should start collecting reasons like that to initiate a discussion on upgrading the checkstyle version. 🤔 ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +public class JavaRecordSerializationHelper<T> { + + /** Record canonical constructor. Null for non-record classes. */ + private final Constructor<T> recordConstructor; + + private final boolean migrating; + + /** Record constructor parameter index mapping for record migration. */ + private final int[] paramIndexMapping; + /** Record constructor arg initializer, null when not needed. */ + private final LinkedList<Consumer<Object[]>> arrayFieldInitializers; + + public JavaRecordSerializationHelper(Constructor<T> recordConstructor) { + this(recordConstructor, false, null, null); + } + + public JavaRecordSerializationHelper( + Constructor<T> recordConstructor, + boolean migrating, + int[] argIndexMapping, + LinkedList<Consumer<Object[]>> arrayFieldInitializers) { + this.recordConstructor = recordConstructor; + this.migrating = migrating; + this.paramIndexMapping = argIndexMapping; + this.arrayFieldInitializers = arrayFieldInitializers; + } + + public void setParam(Object[] params, int i, Object field) { + if (migrating) { + params[paramIndexMapping[i]] = field; + } else { + params[i] = field; + } + } + + public Object[] initParams() { + Object[] args = new Object[recordConstructor.getParameterCount()]; + if (migrating) { + arrayFieldInitializers.forEach(c -> c.accept(args)); + } + return args; + } + + public T instantiateRecord(Object[] args) { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + public static <T> JavaRecordSerializationHelper create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // Java 11 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + .filter(f -> f != null) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + argIndexMapping[i] = + field == null ? -1 : componentNames.indexOf(fields[i].getName()); + } + + // We have to initialize newly added primitive fields to their correct default value + LinkedList<Consumer<Object[]>> arrayFieldInitializers = new LinkedList<>(); Review Comment: After another iteration over the code: What about using `Object[]` instead of a List of functions: ```java final Object[] defaultValues = new Object[componentNames.size()]; for (int i = 0; i < componentNames.size(); i++) { final Class<?> fieldType = componentTypes[i]; defaultValues[i] = fieldType.isPrimitive() && !previousFields.contains(componentNames.get(i)) ? Defaults.defaultValue(fieldType) : null; } ``` That way we could just copy the array when initializing the parameter array. No fancy for loop is necessary: ```java public Object[] initParams() { return Arrays.copyOf(defaultParams, defaultParams.length); } ``` To me it feels like it's easier to read. And it's more consistent with what we do for the field mapping ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assert (serializer instanceof PojoSerializer); + return serializer; + } + + @Override + protected boolean allowNullInstances(TypeSerializer serializer) { + return true; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClass> getTypeClass() { + return TestUserClass.class; + } + + @Override + protected TestUserClass[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClass[] { + new TestUserClass( + rnd.nextInt(), + "foo", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[] {10, 11, 12})), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + null, + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})), + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})) + }; + } + + // User code class for testing the serializer + public record TestUserClass( + int dumm1, String dumm2, double dumm3, Date dumm5, NestedTestUserClass nestedClass) {} + + public static class NestedTestUserClass { + public int dumm1; + public String dumm2; + public double dumm3; + public int[] dumm4; + + public NestedTestUserClass() {} + + public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + this.dumm3 = dumm3; + this.dumm4 = dumm4; + } + + @Override + public int hashCode() { + return Objects.hash(dumm1, dumm2, dumm3, dumm4); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NestedTestUserClass)) { + return false; + } + NestedTestUserClass otherTUC = (NestedTestUserClass) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + if (dumm4.length != otherTUC.dumm4.length) { + return false; + } + for (int i = 0; i < dumm4.length; i++) { + if (dumm4[i] != otherTUC.dumm4[i]) { + return false; + } + } + return true; + } + } + + /** This tests if the hashes returned by the pojo and tuple comparators are the same */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + void testTuplePojoTestEquality() throws IncompatibleKeysException { + + // test with a simple, string-key first. + PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type; + List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>(); Review Comment: ```suggestion List<FlatFieldDescriptor> result = new ArrayList<>(); ``` nit ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/> Review Comment: When looking for the reason for this weird checkstyle rule, I found https://github.com/checkstyle/checkstyle/issues/8023. I verified that this is actual an issue in our current setup. It's not an issue for this PR because we don't need this feature of records. But I'm wondering whether we should start collecting reasons like that to initiate a discussion on upgrading the checkstyle version. 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org