XComp commented on code in PR #23490: URL: https://github.com/apache/flink/pull/23490#discussion_r1377868319
########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utility class for constructing Java records in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordBuilderFactory<T> { + + private final Constructor<T> canonicalConstructor; + + /** + * Record constructor parameter index mapping in case the new constructor has a different + * parameter order than the serialized data. Used for schema evolution or `null` if no schema + * evolution is applied for that record class. + */ + @Nullable private final int[] paramIndexMapping; + + /** + * Default record args used for newly introduced primitive fields during schema evolution. + * `null` if no schema evolution is applied for that record class. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordBuilderFactory(Constructor<T> canonicalConstructor) { + this(canonicalConstructor, null, null); + } + + private JavaRecordBuilderFactory( + Constructor<T> canonicalConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { Review Comment: ```suggestion @Nullable int[] argIndexMapping, @Nullable Object[] defaultConstructorArgs) { ``` ...to be consistent ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -473,25 +519,41 @@ public T deserialize(T reuse, DataInputView source) throws IOException { } } - if ((flags & NO_SUBCLASS) != 0) { + if (isRecord) { try { + JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordHelper.newBuilder(); for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); if (fields[i] != null) { if (isNull) { - fields[i].set(reuse, null); + builder.setField(i, null); } else { - Object field; + Object reuseField = reuse == null ? null : fields[i].get(reuse); Review Comment: `reuse` will never be `null` here. We should reiterate over the code here. We create quite a bit of redundant code. The for loop and the exception handling are the same. The only difference is how the field values are set. We could use some consumer to out-source that and move the rest of the code into its own method. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -400,21 +444,23 @@ public T deserialize(DataInputView source) throws IOException { target = createInstance(); } - if ((flags & NO_SUBCLASS) != 0) { + if (isRecord) { + JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordHelper.newBuilder(); Review Comment: did we forget the try/catch block in this if block? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -473,25 +519,41 @@ public T deserialize(T reuse, DataInputView source) throws IOException { } } - if ((flags & NO_SUBCLASS) != 0) { + if (isRecord) { Review Comment: tbh, I feel like the `TypeSerializer` interface could be cleaned up in this regard. Some `reuse` should be marked as `@Nullable` and each implementation should be able to handle that specific case (i.e. instantiating a new instance) rather than having two separate implementations. There would be a default implementation of `copy(T)` and `deserialize(T)` in that case. But that's out-of-scope for this PR. ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/> Review Comment: I'm curious about that one. ...because you didn't mark it as resolved, either. Just to make sure you didn't miss it accidentally. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -473,25 +519,41 @@ public T deserialize(T reuse, DataInputView source) throws IOException { } } - if ((flags & NO_SUBCLASS) != 0) { + if (isRecord) { Review Comment: Correct me here, if I'm wrong but I have the impression that `deserialize(DataInputView source)` can be implemented in the following way: ```java public T deserialize(DataInputView source) throws IOException { deserialize(null, source); } ``` It's a bit tricky to investigate because if conditions are differently implemented. But the code paths match with `reuse == null` as far as I can see. This would reduce the amount of code redundancy in this class tremendously (it might deserve its own commit, I guess? :thinking: ). The same seems to be true for the `copy` command. Refactoring that would reduce the diff by quite a bit as well. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; + + private transient JavaRecordBuilderFactory<T> recordHelper; Review Comment: ```suggestion // this field is null if the class that's subject to serialization isn't a Java record private transient JavaRecordBuilderFactory<T> recordHelper; ``` Adding `@Nullable` here would introduce other warning. Fixing the warning would make the code unnecessarily complex. But we could still add a comment on the field's contract. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; + + private transient JavaRecordSerializationHelper<T> recordHelper; + /** Constructor to create a new {@link PojoSerializer}. */ @SuppressWarnings("unchecked") public PojoSerializer( Review Comment: Any opinion on that proposal? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; Review Comment: That one I am still curious about. The `isRecord` field could be replaced by a method that checks for `null`: ```java private boolean isRecord() { return this.recordHelper != null; } ``` that would mean one less field to worry about in this class. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org