XComp commented on code in PR #23490:
URL: https://github.com/apache/flink/pull/23490#discussion_r1364075854


##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;

Review Comment:
   ```suggestion
           Preconditions.checkArguments((argIndexMapping == null) == 
(arrayFieldInitializers == null));
           this.recordConstructor = recordConstructor;
   ```
   Could we add a Precondition to reflect the invariant in the code and fail 
early in case of misuse?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;

Review Comment:
   ```suggestion
       @Nullable private final int[] paramIndexMapping;
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,

Review Comment:
   `migrating` seems to be obsolete. We can mark `arrayFieldInitializers` as 
`@Nullable` and document the semantics of the `null` value properly (maybe 
through a method `migratingEnabled()`?).



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;

Review Comment:
   ```suggestion
       @Nullable private final LinkedList<Consumer<Object[]>> 
arrayFieldInitializers;
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {

Review Comment:
   ```suggestion
       public static <T> JavaRecordSerializationHelper<T> create(Class<T> 
clazz, Field[] fields) {
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {

Review Comment:
   ```suggestion
           } catch (IllegalArgumentException e) {
               throw e;
           } catch (Exception e) {
   ```
   nit: Maybe, pass over the `IllegalArgumentException` as a valid 
`RuntimeException` rather than wrapping it in another one?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11

Review Comment:
   ```suggestion
               // Java 13 and lower
   ```
   nit: just to keep the comments as generic as possible



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            .filter(f -> f != null)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    argIndexMapping[i] =
+                            field == null ? -1 : 
componentNames.indexOf(fields[i].getName());
+                }
+
+                // We have to initialize newly added primitive fields to their 
correct default value
+                LinkedList<Consumer<Object[]>> arrayFieldInitializers = new 
LinkedList<>();

Review Comment:
   Is there a reason to use `LinkedList` here? :thinking:  `ArrayList` would 
have a better memory footprint.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {

Review Comment:
   ```suggestion
       private JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            .filter(f -> f != null)

Review Comment:
   On the other note: Wouldn't that be an invalid state? ...having a field 
that's null? It might be better to throw an `IllegalArgumentException` rather 
than filtering those cases.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            .filter(f -> f != null)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    argIndexMapping[i] =
+                            field == null ? -1 : 
componentNames.indexOf(fields[i].getName());
+                }
+
+                // We have to initialize newly added primitive fields to their 
correct default value
+                LinkedList<Consumer<Object[]>> arrayFieldInitializers = new 
LinkedList<>();
+                for (int i = 0; i < componentNames.size(); i++) {
+                    Class<?> fieldType = componentTypes[i];
+                    if (fieldType.isPrimitive()
+                            && 
!previousFields.contains(componentNames.get(i))) {
+                        final int index = i;
+                        arrayFieldInitializers.add(
+                                args -> {
+                                    Object defaultValue = 
Defaults.defaultValue(fieldType);
+                                    args[index] = defaultValue;
+                                });
+                    }
+                }
+                return new JavaRecordSerializationHelper(
+                        recordConstructor, true, argIndexMapping, 
arrayFieldInitializers);
+            } else {
+                return new JavaRecordSerializationHelper(recordConstructor);

Review Comment:
   ```suggestion
                   return new 
JavaRecordSerializationHelper<T>(recordConstructor);
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -118,6 +122,9 @@ public PojoSerializer(
                 createRegisteredSubclassSerializers(registeredSubclasses, 
executionConfig);
 
         this.subclassSerializerCache = new HashMap<>();
+        if (this.isRecord = TypeExtractor.isRecord(clazz)) {

Review Comment:
   I'm not sure about this syntactic sugar: This might trigger PR proposals in 
the future where people think that a equals sign is missing even though it's 
correct. I would suggest using the more explicit way where `this.isRecord` is 
set in the if block.
   
   We could even argue that `isRecord` is an obsolete field because 
`recordHelper` is actually `@Nullable`. The `isRecord` field could be replaced 
by a method `isRecord()` that returns `recordHelper != null` to make the 
semantics of `recordHelper` being `null` reflected in the code. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java:
##########
@@ -2057,6 +2057,7 @@ protected <OUT, IN1, IN2> TypeInformation<OUT> 
analyzePojo(
                     GENERIC_TYPE_DOC_HINT);
             return new GenericTypeInfo<>(clazz);
         }
+        boolean isRecord = isRecord(clazz);

Review Comment:
   ```suggestion
           boolean isRecord = isRecord(clazz);
   ```
   nit: we could move this local variable closer to the block where it is used. 
Or is there a reason why it's kept separated from the `fields` for loop?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            .filter(f -> f != null)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    argIndexMapping[i] =
+                            field == null ? -1 : 
componentNames.indexOf(fields[i].getName());
+                }
+
+                // We have to initialize newly added primitive fields to their 
correct default value
+                LinkedList<Consumer<Object[]>> arrayFieldInitializers = new 
LinkedList<>();
+                for (int i = 0; i < componentNames.size(); i++) {
+                    Class<?> fieldType = componentTypes[i];
+                    if (fieldType.isPrimitive()
+                            && 
!previousFields.contains(componentNames.get(i))) {
+                        final int index = i;
+                        arrayFieldInitializers.add(
+                                args -> {
+                                    Object defaultValue = 
Defaults.defaultValue(fieldType);
+                                    args[index] = defaultValue;
+                                });
+                    }
+                }
+                return new JavaRecordSerializationHelper(

Review Comment:
   ```suggestion
                   return new JavaRecordSerializationHelper<T>(
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {

Review Comment:
   Could we add some unit tests for this class to have some more specific test 
cases as well? But I checked that both code paths (migration and non-migration) 
are covered by the newly added {{Java17PojoRecordSerializerUpgradeTest`.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     private transient ClassLoader cl;
 
+    private final boolean isRecord;
+
+    private transient JavaRecordSerializationHelper<T> recordHelper;
+
     /** Constructor to create a new {@link PojoSerializer}. */
     @SuppressWarnings("unchecked")
     public PojoSerializer(

Review Comment:
   I wasn't sure where to put this comment. So it's not really related to the 
marked line.
   
   The documentation for the Pojo serialization happens to be in the JavaDoc of 
`PojoTypeInfo` and Types.POJO`. Should we add additional information about the 
Java Record support there as well?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {

Review Comment:
   ```suggestion
       public RecordBuilder createRecordBuilder() {
   ```
   There is a common pattern in how we use the `initParams`, `setParams` and 
`instantiateRecord` methods here. The `Object[]` that's returned by 
`initParams` is passed to the other two methods but not used anywhere else. 
   
   Essentially, what we have is a Builder pattern here. The Builder can be 
created by the `JavaRecordSerializationHelper` using a factory method 
`createRecordBuilder()` that replaces `initParams()`. Its state contains the 
field mapping, the initialization routines and the record constructor. 
`instantiateRecord` becomes a `.build()` method which doesn't require any 
parameters because the record can be created using the Builder's internal 
state. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java:
##########
@@ -2174,6 +2175,11 @@ protected <OUT, IN1, IN2> TypeInformation<OUT> 
analyzePojo(
         return pojoType;
     }
 
+    public static boolean isRecord(Class<?> clazz) {

Review Comment:
   Should we add JavaDoc considering that it's a `@Public` class? It might make 
sense to label it as `@PublicEvolving` analogously to the other static utitlity 
methods for now to give it some room to evolve?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java:
##########
@@ -2075,7 +2076,7 @@ protected <OUT, IN1, IN2> TypeInformation<OUT> 
analyzePojo(
         List<PojoField> pojoFields = new ArrayList<>();
         for (Field field : fields) {
             Type fieldType = field.getGenericType();
-            if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != 
Row.class) {
+            if (!isRecord && !isValidPojoField(field, clazz, typeHierarchy) && 
clazz != Row.class) {

Review Comment:
   What about adding the following code block before the default constructor 
extraction in line 2143 (essentially right after validating the methods and 
before retrieving the default constructor for that class):
   ```java
   if (isRecord) {
       // no default constructor extraction needs to be applied for Java records
       return pojoType;
   }
   ```
   ...to be more explicit on that specific use case instead of hiding the logic 
in the default constructor extraction logic. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;

Review Comment:
   After having gone through the code (keep in mind that I haven't worked with 
this part of the code, yet), I feel like "migrating" describes the scenario in 
which the parameter mapping is used. But it's kind of misleading when looking 
at this class and not having knowledge of the scenario it is used in.
   
   I would suggest to remove "migration" as a concept from this specific class. 
The purpose of this class is to hold the `recordConstructor` and optional field 
mapping functionality. That should be good enough to grasp the purpose of this 
class. WDYT?



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assert (serializer instanceof PojoSerializer);
+        return serializer;
+    }
+
+    @Override
+    protected boolean allowNullInstances(TypeSerializer serializer) {
+        return true;
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<TestUserClass> getTypeClass() {
+        return TestUserClass.class;
+    }
+
+    @Override
+    protected TestUserClass[] getTestData() {
+        Random rnd = new Random(874597969123412341L);
+
+        return new TestUserClass[] {
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "foo",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "foo@boo", rnd.nextDouble(), new 
int[] {10, 11, 12})),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    null,
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22})),
+            new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, 
null),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22}))
+        };
+    }
+
+    // User code class for testing the serializer
+    public record TestUserClass(
+            int dumm1, String dumm2, double dumm3, Date dumm5, 
NestedTestUserClass nestedClass) {}
+
+    public static class NestedTestUserClass {
+        public int dumm1;
+        public String dumm2;
+        public double dumm3;
+        public int[] dumm4;
+
+        public NestedTestUserClass() {}
+
+        public NestedTestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4) {
+            this.dumm1 = dumm1;
+            this.dumm2 = dumm2;
+            this.dumm3 = dumm3;
+            this.dumm4 = dumm4;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dumm1, dumm2, dumm3, dumm4);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof NestedTestUserClass)) {
+                return false;
+            }
+            NestedTestUserClass otherTUC = (NestedTestUserClass) other;
+            if (dumm1 != otherTUC.dumm1) {
+                return false;
+            }
+            if (!dumm2.equals(otherTUC.dumm2)) {
+                return false;
+            }
+            if (dumm3 != otherTUC.dumm3) {
+                return false;
+            }
+            if (dumm4.length != otherTUC.dumm4.length) {
+                return false;
+            }
+            for (int i = 0; i < dumm4.length; i++) {
+                if (dumm4[i] != otherTUC.dumm4[i]) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /** This tests if the hashes returned by the pojo and tuple comparators 
are the same */

Review Comment:
   ```suggestion
       /** This tests if the hashes returned by the pojo and tuple comparators 
are the same. */
   ```
   nit: I don't know why checkstyle complains in my IDE but not in the Maven 
run about that one. :thinking: 



##########
pom.xml:
##########
@@ -205,6 +205,7 @@ under the License.
                <!-- Can be set to any value to reproduce a specific build. -->
                <test.randomization.seed/>
                <test.unit.pattern>**/*Test.*</test.unit.pattern>
+               <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern>

Review Comment:
   ```suggestion
                <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern>
   ```
   That looks reasonable. I guess the other "more maven-like" way would be to 
move the Java 17 code into a separate module? :thinking: But that feels like 
overkill for a few test classes.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            .filter(f -> f != null)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    argIndexMapping[i] =
+                            field == null ? -1 : 
componentNames.indexOf(fields[i].getName());

Review Comment:
   Here as well, shouldn't a field being `null` cause an 
`IllegalArgumentException`? Or is there a reason why a user should pass in 
`null`?



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assert (serializer instanceof PojoSerializer);

Review Comment:
   ```suggestion
           assertThat(serializer).isInstanceOf(PojoSerializer.class);
   ```
   I'm not too comfortable with the JDK assert because we forgot to enable it 
in the past in CI. And I don't see a reason to not use assertj in test code. 
WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/

Review Comment:
   Adding more information on what this class does would help understanding its 
purpose (covering the field mapping).



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(

Review Comment:
   ```suggestion
       private JavaRecordSerializationHelper(
   ```



##########
pom.xml:
##########
@@ -1102,6 +1103,12 @@ under the License.
 
                <profile>
                        <id>java17-target</id>
+
+                       <!-- Include Java 17 specific tests (by not excluding 
them) -->
+                       <properties>

Review Comment:
   Shouldn't we move that into the `java17` profile? :thinking: that's the one 
that's when JDK 17 is selected for compilation and which enable us to parse the 
Java record code :thinking: 



##########
tools/maven/suppressions-core.xml:
##########
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- suppress check java 17 tests -->

Review Comment:
   ```suggestion
        <!-- Suppress for Java17-specific test classes where Java records are 
used. -->
        <!-- Checkstyle applies MethodNameCheck on those Java records in the 
current configuration. -->
   ```
   we should add more context to the comment. The current comment only 
describes the line below which makes the comment kind of obsolete.



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assert (serializer instanceof PojoSerializer);
+        return serializer;
+    }
+
+    @Override
+    protected boolean allowNullInstances(TypeSerializer serializer) {

Review Comment:
   ```suggestion
       protected boolean allowNullInstances(TypeSerializer<TestUserClass> 
serializer) {
   ```
   nit



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);

Review Comment:
   ```suggestion
       private final TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
   ```
   nit



##########
tools/maven/suppressions-core.xml:
##########
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- suppress check java 17 tests -->
+       <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/>

Review Comment:
   I'm wondering whether we should create a follow-up task for checkstyle 
Java17 support in Flink an reference this rule. You added this because of the 
method name rule error that checkstyle raises, right?



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assert (serializer instanceof PojoSerializer);
+        return serializer;
+    }
+
+    @Override
+    protected boolean allowNullInstances(TypeSerializer serializer) {
+        return true;
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<TestUserClass> getTypeClass() {
+        return TestUserClass.class;
+    }
+
+    @Override
+    protected TestUserClass[] getTestData() {
+        Random rnd = new Random(874597969123412341L);
+
+        return new TestUserClass[] {
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "foo",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "foo@boo", rnd.nextDouble(), new 
int[] {10, 11, 12})),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    null,
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22})),
+            new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, 
null),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22}))
+        };
+    }
+
+    // User code class for testing the serializer
+    public record TestUserClass(
+            int dumm1, String dumm2, double dumm3, Date dumm5, 
NestedTestUserClass nestedClass) {}
+
+    public static class NestedTestUserClass {
+        public int dumm1;
+        public String dumm2;
+        public double dumm3;
+        public int[] dumm4;
+
+        public NestedTestUserClass() {}
+
+        public NestedTestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4) {
+            this.dumm1 = dumm1;
+            this.dumm2 = dumm2;
+            this.dumm3 = dumm3;
+            this.dumm4 = dumm4;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dumm1, dumm2, dumm3, dumm4);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof NestedTestUserClass)) {
+                return false;
+            }
+            NestedTestUserClass otherTUC = (NestedTestUserClass) other;
+            if (dumm1 != otherTUC.dumm1) {
+                return false;
+            }
+            if (!dumm2.equals(otherTUC.dumm2)) {
+                return false;
+            }
+            if (dumm3 != otherTUC.dumm3) {
+                return false;
+            }
+            if (dumm4.length != otherTUC.dumm4.length) {
+                return false;
+            }
+            for (int i = 0; i < dumm4.length; i++) {
+                if (dumm4[i] != otherTUC.dumm4[i]) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /** This tests if the hashes returned by the pojo and tuple comparators 
are the same */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Test
+    void testTuplePojoTestEquality() throws IncompatibleKeysException {
+
+        // test with a simple, string-key first.
+        PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type;
+        List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();
+        pType.getFlatFields("nestedClass.dumm2", 0, result);
+        int[] fields = new int[1]; // see below
+        fields[0] = result.get(0).getPosition();
+        TypeComparator<TestUserClass> pojoComp =
+                pType.createComparator(fields, new boolean[] {true}, 0, new 
ExecutionConfig());
+
+        TestUserClass pojoTestRecord =
+                new TestUserClass(
+                        0,
+                        "abc",
+                        3d,
+                        new Date(),
+                        new NestedTestUserClass(1, "haha", 4d, new int[] {5, 
4, 3}));
+        int pHash = pojoComp.hash(pojoTestRecord);
+
+        Tuple1<String> tupleTest = new Tuple1<String>("haha");
+        TupleTypeInfo<Tuple1<String>> tType =
+                (TupleTypeInfo<Tuple1<String>>) 
TypeExtractor.getForObject(tupleTest);
+        TypeComparator<Tuple1<String>> tupleComp =
+                tType.createComparator(
+                        new int[] {0}, new boolean[] {true}, 0, new 
ExecutionConfig());
+
+        int tHash = tupleComp.hash(tupleTest);
+
+        assertThat(tHash)
+                .isEqualTo(pHash)
+                .withFailMessage(
+                        "The hashing for tuples and pojos must be the same, so 
that they are mixable");
+
+        Tuple3<Integer, String, Double> multiTupleTest =
+                new Tuple3<Integer, String, Double>(

Review Comment:
   ```suggestion
                   new Tuple3<>(
   ```
   nit



##########
tools/maven/suppressions-core.xml:
##########
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- suppress check java 17 tests -->
+       <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/>

Review Comment:
   ```suggestion
           <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" 
checks="MethodNameCheck"/>
   ```
   We should make the rule as specific as possible to allow the checkstyle to 
be applied in any other file. We could even select the two files specificly. 
WDYT?



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assert (serializer instanceof PojoSerializer);
+        return serializer;
+    }
+
+    @Override
+    protected boolean allowNullInstances(TypeSerializer serializer) {
+        return true;
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<TestUserClass> getTypeClass() {
+        return TestUserClass.class;
+    }
+
+    @Override
+    protected TestUserClass[] getTestData() {
+        Random rnd = new Random(874597969123412341L);
+
+        return new TestUserClass[] {
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "foo",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "foo@boo", rnd.nextDouble(), new 
int[] {10, 11, 12})),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    null,
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22})),
+            new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, 
null),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22}))
+        };
+    }
+
+    // User code class for testing the serializer
+    public record TestUserClass(
+            int dumm1, String dumm2, double dumm3, Date dumm5, 
NestedTestUserClass nestedClass) {}
+
+    public static class NestedTestUserClass {
+        public int dumm1;
+        public String dumm2;
+        public double dumm3;
+        public int[] dumm4;
+
+        public NestedTestUserClass() {}
+
+        public NestedTestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4) {
+            this.dumm1 = dumm1;
+            this.dumm2 = dumm2;
+            this.dumm3 = dumm3;
+            this.dumm4 = dumm4;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dumm1, dumm2, dumm3, dumm4);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof NestedTestUserClass)) {
+                return false;
+            }
+            NestedTestUserClass otherTUC = (NestedTestUserClass) other;
+            if (dumm1 != otherTUC.dumm1) {
+                return false;
+            }
+            if (!dumm2.equals(otherTUC.dumm2)) {
+                return false;
+            }
+            if (dumm3 != otherTUC.dumm3) {
+                return false;
+            }
+            if (dumm4.length != otherTUC.dumm4.length) {
+                return false;
+            }
+            for (int i = 0; i < dumm4.length; i++) {
+                if (dumm4[i] != otherTUC.dumm4[i]) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /** This tests if the hashes returned by the pojo and tuple comparators 
are the same */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Test
+    void testTuplePojoTestEquality() throws IncompatibleKeysException {
+
+        // test with a simple, string-key first.
+        PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type;
+        List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();
+        pType.getFlatFields("nestedClass.dumm2", 0, result);
+        int[] fields = new int[1]; // see below
+        fields[0] = result.get(0).getPosition();
+        TypeComparator<TestUserClass> pojoComp =
+                pType.createComparator(fields, new boolean[] {true}, 0, new 
ExecutionConfig());
+
+        TestUserClass pojoTestRecord =
+                new TestUserClass(
+                        0,
+                        "abc",
+                        3d,
+                        new Date(),
+                        new NestedTestUserClass(1, "haha", 4d, new int[] {5, 
4, 3}));
+        int pHash = pojoComp.hash(pojoTestRecord);
+
+        Tuple1<String> tupleTest = new Tuple1<String>("haha");

Review Comment:
   ```suggestion
           Tuple1<String> tupleTest = new Tuple1<>("haha");
   ```
   nit



##########
tools/maven/suppressions-core.xml:
##########
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- suppress check java 17 tests -->
+       <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/>

Review Comment:
   When looking for the reason for this weird checkstyle rule, I found 
https://github.com/checkstyle/checkstyle/issues/8023. I verified that this is 
actual an issue in our current setup.
   
   It's not an issue for this PR because we don't need this feature of records. 
But I'm wondering whether we should start collecting reasons like that to 
initiate a discussion on upgrading the checkstyle version. 🤔



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+public class JavaRecordSerializationHelper<T> {
+
+    /** Record canonical constructor. Null for non-record classes. */
+    private final Constructor<T> recordConstructor;
+
+    private final boolean migrating;
+
+    /** Record constructor parameter index mapping for record migration. */
+    private final int[] paramIndexMapping;
+    /** Record constructor arg initializer, null when not needed. */
+    private final LinkedList<Consumer<Object[]>> arrayFieldInitializers;
+
+    public JavaRecordSerializationHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, false, null, null);
+    }
+
+    public JavaRecordSerializationHelper(
+            Constructor<T> recordConstructor,
+            boolean migrating,
+            int[] argIndexMapping,
+            LinkedList<Consumer<Object[]>> arrayFieldInitializers) {
+        this.recordConstructor = recordConstructor;
+        this.migrating = migrating;
+        this.paramIndexMapping = argIndexMapping;
+        this.arrayFieldInitializers = arrayFieldInitializers;
+    }
+
+    public void setParam(Object[] params, int i, Object field) {
+        if (migrating) {
+            params[paramIndexMapping[i]] = field;
+        } else {
+            params[i] = field;
+        }
+    }
+
+    public Object[] initParams() {
+        Object[] args = new Object[recordConstructor.getParameterCount()];
+        if (migrating) {
+            arrayFieldInitializers.forEach(c -> c.accept(args));
+        }
+        return args;
+    }
+
+    public T instantiateRecord(Object[] args) {
+        try {
+            return recordConstructor.newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate record", e);
+        }
+    }
+
+    public static <T> JavaRecordSerializationHelper create(Class<T> clazz, 
Field[] fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // Java 11
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            .filter(f -> f != null)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    argIndexMapping[i] =
+                            field == null ? -1 : 
componentNames.indexOf(fields[i].getName());
+                }
+
+                // We have to initialize newly added primitive fields to their 
correct default value
+                LinkedList<Consumer<Object[]>> arrayFieldInitializers = new 
LinkedList<>();

Review Comment:
   After another iteration over the code: What about using `Object[]` instead 
of a List of functions:
   ```java
       final Object[] defaultValues = new Object[componentNames.size()];
       for (int i = 0; i < componentNames.size(); i++) {
           final Class<?> fieldType = componentTypes[i];
           defaultValues[i] =
                   fieldType.isPrimitive()
                                   && 
!previousFields.contains(componentNames.get(i))
                           ? Defaults.defaultValue(fieldType)
                           : null;
       }
   ```
   
   That way we could just copy the array when initializing the parameter array. 
No fancy for loop is necessary:
   ```java
       public Object[] initParams() {
           return Arrays.copyOf(defaultParams, defaultParams.length);
       }
   ```
   
   To me it feels like it's easier to read. And it's more consistent with what 
we do for the field mapping



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assert (serializer instanceof PojoSerializer);
+        return serializer;
+    }
+
+    @Override
+    protected boolean allowNullInstances(TypeSerializer serializer) {
+        return true;
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<TestUserClass> getTypeClass() {
+        return TestUserClass.class;
+    }
+
+    @Override
+    protected TestUserClass[] getTestData() {
+        Random rnd = new Random(874597969123412341L);
+
+        return new TestUserClass[] {
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "foo",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "foo@boo", rnd.nextDouble(), new 
int[] {10, 11, 12})),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    null,
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22})),
+            new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, 
null),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22}))
+        };
+    }
+
+    // User code class for testing the serializer
+    public record TestUserClass(
+            int dumm1, String dumm2, double dumm3, Date dumm5, 
NestedTestUserClass nestedClass) {}
+
+    public static class NestedTestUserClass {
+        public int dumm1;
+        public String dumm2;
+        public double dumm3;
+        public int[] dumm4;
+
+        public NestedTestUserClass() {}
+
+        public NestedTestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4) {
+            this.dumm1 = dumm1;
+            this.dumm2 = dumm2;
+            this.dumm3 = dumm3;
+            this.dumm4 = dumm4;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dumm1, dumm2, dumm3, dumm4);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof NestedTestUserClass)) {
+                return false;
+            }
+            NestedTestUserClass otherTUC = (NestedTestUserClass) other;
+            if (dumm1 != otherTUC.dumm1) {
+                return false;
+            }
+            if (!dumm2.equals(otherTUC.dumm2)) {
+                return false;
+            }
+            if (dumm3 != otherTUC.dumm3) {
+                return false;
+            }
+            if (dumm4.length != otherTUC.dumm4.length) {
+                return false;
+            }
+            for (int i = 0; i < dumm4.length; i++) {
+                if (dumm4[i] != otherTUC.dumm4[i]) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /** This tests if the hashes returned by the pojo and tuple comparators 
are the same */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Test
+    void testTuplePojoTestEquality() throws IncompatibleKeysException {
+
+        // test with a simple, string-key first.
+        PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type;
+        List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();

Review Comment:
   ```suggestion
           List<FlatFieldDescriptor> result = new ArrayList<>();
   ```
   nit



##########
tools/maven/suppressions-core.xml:
##########
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- suppress check java 17 tests -->
+       <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/>

Review Comment:
   When looking for the reason for this weird checkstyle rule, I found 
https://github.com/checkstyle/checkstyle/issues/8023. I verified that this is 
actual an issue in our current setup.
   
   It's not an issue for this PR because we don't need this feature of records. 
But I'm wondering whether we should start collecting reasons like that to 
initiate a discussion on upgrading the checkstyle version. 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to