twalthr commented on a change in pull request #17522: URL: https://github.com/apache/flink/pull/17522#discussion_r736613111
########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.functions.casting.rules.ArrayToArrayCastRule; +import org.apache.flink.table.planner.functions.casting.rules.IdentityCastRule; +import org.apache.flink.table.planner.functions.casting.rules.TimestampToStringCastRule; +import org.apache.flink.table.planner.functions.casting.rules.UpcastToBigIntCastRule; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** This class resolves {@link CastRule} using the input and the target type. */ +@Internal +public class CastRuleProvider { + + /* ------- Entrypoint ------- */ + + /** + * Resolve a {@link CastRule} for the provided input type and target type. Returns {@code null} + * if no rule can be resolved. + */ + public static @Nullable CastRule<?, ?> resolve(LogicalType inputType, LogicalType targetType) { + return INSTANCE.internalResolve(inputType, targetType); + } + + /** @see #resolve(LogicalType, LogicalType) */ + public static @Nullable CastRule<?, ?> resolve( + DataType inputDataType, DataType targetDataType) { + return resolve(inputDataType.getLogicalType(), targetDataType.getLogicalType()); + } + + /* ------ Implementation ------ */ + + private static final CastRuleProvider INSTANCE = new CastRuleProvider(); + + static { Review comment: move this to the top of the class to have it more prominent for readers searching for the list ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base class for {@link CastRule} supporting code generation. This base class implements {@link + * #create(CastRule.Context, LogicalType, LogicalType)} compiling the generated code block into a + * {@link CastExecutor} implementation. + * + * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from {@link Review comment: nit: who is `We`? In this case, just write `It is suggested...` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.types.logical.LogicalType; + +/** + * Cast rule that has code generation capabilities. + * + * @param <IN> Input internal type + * @param <OUT> Output internal type + */ +@Internal +public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> { + + /** + * Generates a code block composed by different statements performing the casting. The returned + * code block must have a variable containing the result and an isNull term. Review comment: you can directly reference `CastCodeBlock#result` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.CodeGenUtils.className; +import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName; +import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** Array to array casting rule. */ +public class ArrayToArrayCastRule + extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> { + + public static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule(); + + private ArrayToArrayCastRule() { + super( + CastRulePredicate.builder() + .predicate( + (input, target) -> + input.getTypeRoot() == LogicalTypeRoot.ARRAY + && target.getTypeRoot() == LogicalTypeRoot.ARRAY + && isValidArrayCasting( + ((ArrayType) input).getElementType(), + ((ArrayType) target).getElementType())) + .build()); + } + + private static boolean isValidArrayCasting( + LogicalType innerInputType, LogicalType innerTargetType) { + return CastRuleProvider.resolve(innerInputType, innerTargetType) != null; Review comment: This assumes that only casts that have been ported to the new cast rule structure are support yet, right? No, `ARRAY<INT>` to `ARRAY<DECIMAL<20, 10>`? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/TimestampToStringCastRule.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.functions.SqlDateTimeUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import static org.apache.flink.table.planner.codegen.CodeGenUtils.className; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.TIMESTAMP_TO_STRING_TIME_ZONE; +import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall; + +/** Timestamp and timestamp ltz casting to string. */ +@Internal +public class TimestampToStringCastRule extends AbstractCharacterFamilyTargetRule<TimestampData> { + + public static final TimestampToStringCastRule INSTANCE = new TimestampToStringCastRule(); + + private TimestampToStringCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.TIMESTAMP) + .target(LogicalTypeFamily.CHARACTER_STRING) + .build()); + } + + @Override + public String generateStringExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + String zoneId = + (inputLogicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) Review comment: use `LogicalType.is` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.LogicalType; + +/** + * Base class for cast rules supporting code generation. This class inherits from {@link + * AbstractCodeGeneratorCastRule} and takes care of nullability checks. + */ +@Internal +public abstract class AbstractNullAwareCodeGeneratorCastRule<IN, OUT> + extends AbstractCodeGeneratorCastRule<IN, OUT> { + + protected AbstractNullAwareCodeGeneratorCastRule(CastRulePredicate predicate) { + super(predicate); + } + + /** + * This method doesn't need to take care of null checks handling of input values. + * Implementations should write the cast result in the {@code returnVariable}. + */ + protected abstract String generateCodeBlockInternal( + CodeGeneratorCastRule.Context context, + String inputTerm, + String returnVariable, + LogicalType inputLogicalType, + LogicalType targetLogicalType); + + @Override + public CastCodeBlock generateCodeBlock( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputType, + LogicalType targetType) { + StringBuilder resultCode = new StringBuilder(); + + // Result of a casting can be null only and only if the input is null + boolean isResultNullable = inputType.isNullable(); + String nullTerm; + if (isResultNullable) { + nullTerm = context.declareVariable("boolean", "isNull"); + resultCode.append(nullTerm).append(" = ").append(inputTerm).append(" == null;\n"); + } else { + nullTerm = "false"; + } + + // Create the result value variable + String returnTerm = context.declareVariable(targetType, "result"); + + // Generate the code block + String castCodeBlock = + this.generateCodeBlockInternal( + context, inputTerm, returnTerm, inputType, targetType); + + if (isResultNullable) { + resultCode + .append("if (!") Review comment: sometimes you use `StringBuilder` and sometime `+`. Shall we keep https://github.com/apache/flink/pull/17522/files#diff-f6381501ccb5f995a532369eafaaa4d3b0f34bda58e9519216753bfb35e35558R104 consistent ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.CodeGenUtils.className; +import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName; +import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** Array to array casting rule. */ +public class ArrayToArrayCastRule + extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> { + + public static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule(); + + private ArrayToArrayCastRule() { + super( + CastRulePredicate.builder() + .predicate( + (input, target) -> + input.getTypeRoot() == LogicalTypeRoot.ARRAY + && target.getTypeRoot() == LogicalTypeRoot.ARRAY + && isValidArrayCasting( Review comment: also check `LogicalTypeCasts.supportsExplicitCast`. Also to check: Is some of the higher levels in the call stack checking for `LogicalTypeCasts.supportsAvoidingCast`? Because e.g. a `ROW<a STRING>` and `ROW<b STRING>` don't need a cast. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base class for {@link CastRule} supporting code generation. This base class implements {@link + * #create(CastRule.Context, LogicalType, LogicalType)} compiling the generated code block into a + * {@link CastExecutor} implementation. + * + * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from {@link + * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, or from {@link + * AbstractExpressionCodeGeneratorCastRule} to generate simple expression casts. + */ +public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT> Review comment: Make sure to add `@Internal` annotations to all new classes. This should become a habit similar to adding a license header. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base class for {@link CastRule} supporting code generation. This base class implements {@link + * #create(CastRule.Context, LogicalType, LogicalType)} compiling the generated code block into a + * {@link CastExecutor} implementation. + * + * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from {@link + * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, or from {@link + * AbstractExpressionCodeGeneratorCastRule} to generate simple expression casts. + */ +public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT> + implements CodeGeneratorCastRule<IN, OUT> { + + protected AbstractCodeGeneratorCastRule(CastRulePredicate predicate) { + super(predicate); + } + + @SuppressWarnings("unchecked") + @Override + public CastExecutor<IN, OUT> create( + CastRule.Context castRuleContext, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + final String inputTerm = "_myInput"; + final String className = CodeGenUtils.newName("GeneratedCastExecutor"); + final String inputTypeTerm = CodeGenUtils.boxedTypeTermForType(inputLogicalType); + final String targetTypeTerm = CodeGenUtils.boxedTypeTermForType(targetLogicalType); + + final CastExecutorCodeGeneratorContext ctx = + new CastExecutorCodeGeneratorContext(castRuleContext); + final CastCodeBlock codeBlock = + generateCodeBlock(ctx, inputTerm, inputLogicalType, targetLogicalType); + + // Class fields can contain type serializers + final String classFieldDecls = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "private final TypeSerializer " + name + ";\n") Review comment: `CodeGenUtils.className`? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.functions.casting.rules.ArrayToArrayCastRule; +import org.apache.flink.table.planner.functions.casting.rules.IdentityCastRule; +import org.apache.flink.table.planner.functions.casting.rules.TimestampToStringCastRule; +import org.apache.flink.table.planner.functions.casting.rules.UpcastToBigIntCastRule; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** This class resolves {@link CastRule} using the input and the target type. */ +@Internal +public class CastRuleProvider { + + /* ------- Entrypoint ------- */ + + /** + * Resolve a {@link CastRule} for the provided input type and target type. Returns {@code null} + * if no rule can be resolved. + */ + public static @Nullable CastRule<?, ?> resolve(LogicalType inputType, LogicalType targetType) { + return INSTANCE.internalResolve(inputType, targetType); + } + + /** @see #resolve(LogicalType, LogicalType) */ + public static @Nullable CastRule<?, ?> resolve( Review comment: drop this method, not used by production code ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.types.logical.LogicalType; + +/** + * Cast rule that has code generation capabilities. + * + * @param <IN> Input internal type + * @param <OUT> Output internal type + */ +@Internal +public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> { + + /** + * Generates a code block composed by different statements performing the casting. The returned + * code block must have a variable containing the result and an isNull term. + */ + CastCodeBlock generateCodeBlock( + Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType); + + /** Context for code generation. */ + interface Context { + /** @return the session time zone term */ + String getSessionTimeZoneTerm(); + + /** + * Declare a new variable in the global scope. + * + * @return the variable name + */ + String declareVariable(String type, String variablePrefix); + + default String declareVariable(LogicalType type, String variablePrefix) { + return declareVariable( Review comment: there might be a misunderstanding here. even nullable types should be primitive, it depends on the use case where the generated code is used. `LogicalType` nullability and the concept of boxed/primitive are independent. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.CodeGenUtils.className; +import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName; +import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** Array to array casting rule. */ +public class ArrayToArrayCastRule + extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> { + + public static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule(); + + private ArrayToArrayCastRule() { + super( + CastRulePredicate.builder() + .predicate( + (input, target) -> + input.getTypeRoot() == LogicalTypeRoot.ARRAY + && target.getTypeRoot() == LogicalTypeRoot.ARRAY + && isValidArrayCasting( + ((ArrayType) input).getElementType(), + ((ArrayType) target).getElementType())) + .build()); + } + + private static boolean isValidArrayCasting( + LogicalType innerInputType, LogicalType innerTargetType) { + return CastRuleProvider.resolve(innerInputType, innerTargetType) != null; + } + + @SuppressWarnings("rawtypes") + @Override + protected String generateCodeBlockInternal( + CodeGeneratorCastRule.Context context, + String inputTerm, + String returnVariable, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + LogicalType innerInputType = ((ArrayType) inputLogicalType).getElementType(); + LogicalType innerTargetType = ((ArrayType) targetLogicalType).getElementType(); + + String innerTargetTypeTerm = arrayElementType(innerTargetType); + String arraySize = inputTerm + ".size()"; + String objArrayTerm = newName("objArray"); + + CodeGeneratorCastRule<?, ?> innerCastRule = + (CodeGeneratorCastRule) CastRuleProvider.resolve(innerInputType, innerTargetType); + + StringBuilder result = new StringBuilder(); + result.append( + innerTargetTypeTerm + + "[] " + + objArrayTerm + + " = new " + + innerTargetTypeTerm + + "[" + + arraySize + + "];\n"); + + result.append("for (int i = 0; i < " + arraySize + "; i++) {\n"); + CastCodeBlock codeBlock = + innerCastRule.generateCodeBlock( + context, + inputTerm + "." + getArrayElement(innerInputType, "i"), + innerInputType.copy(false), // Null check is done at the array access level + innerTargetType); + + String innerElementCode = + codeBlock.getCode() + + "\n" + + objArrayTerm + + "[i] = " + + codeBlock.getReturnTerm() + + ";\n"; + + // Add null check if inner type is nullable + if (innerInputType.isNullable()) { + result.append("if (" + inputTerm + ".isNullAt(i)) {\n") + .append(objArrayTerm + "[i] = null;\n") + .append("} else {\n") + .append(innerElementCode) + .append("}\n"); + } else { + result.append(innerElementCode); + } + + result.append("}\n"); + + result.append( + returnVariable + + " = new " + + className(GenericArrayData.class) + + "(" + + objArrayTerm + + ");\n"); + + return result.toString(); + } + + private static String arrayElementType(LogicalType t) { + if (t.isNullable()) { + return "Object"; + } + switch (t.getTypeRoot()) { + case BOOLEAN: + return "boolean"; + case TINYINT: + return "byte"; + case SMALLINT: + return "short"; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + return "int"; + case BIGINT: + case INTERVAL_DAY_TIME: + return "long"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case DISTINCT_TYPE: + return arrayElementType(((DistinctType) t).getSourceType()); + } + return "Object"; + } + + private static String getArrayElement(LogicalType elementType, String indexVar) { Review comment: some here: don't we have this already somewhere? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.CodeGenUtils.className; +import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName; +import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** Array to array casting rule. */ +public class ArrayToArrayCastRule + extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> { + + public static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule(); + + private ArrayToArrayCastRule() { + super( + CastRulePredicate.builder() + .predicate( + (input, target) -> + input.getTypeRoot() == LogicalTypeRoot.ARRAY + && target.getTypeRoot() == LogicalTypeRoot.ARRAY + && isValidArrayCasting( + ((ArrayType) input).getElementType(), + ((ArrayType) target).getElementType())) + .build()); + } + + private static boolean isValidArrayCasting( + LogicalType innerInputType, LogicalType innerTargetType) { + return CastRuleProvider.resolve(innerInputType, innerTargetType) != null; + } + + @SuppressWarnings("rawtypes") + @Override + protected String generateCodeBlockInternal( + CodeGeneratorCastRule.Context context, + String inputTerm, + String returnVariable, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + LogicalType innerInputType = ((ArrayType) inputLogicalType).getElementType(); + LogicalType innerTargetType = ((ArrayType) targetLogicalType).getElementType(); + + String innerTargetTypeTerm = arrayElementType(innerTargetType); + String arraySize = inputTerm + ".size()"; + String objArrayTerm = newName("objArray"); + + CodeGeneratorCastRule<?, ?> innerCastRule = + (CodeGeneratorCastRule) CastRuleProvider.resolve(innerInputType, innerTargetType); + + StringBuilder result = new StringBuilder(); + result.append( + innerTargetTypeTerm + + "[] " + + objArrayTerm + + " = new " + + innerTargetTypeTerm + + "[" + + arraySize + + "];\n"); + + result.append("for (int i = 0; i < " + arraySize + "; i++) {\n"); + CastCodeBlock codeBlock = + innerCastRule.generateCodeBlock( + context, + inputTerm + "." + getArrayElement(innerInputType, "i"), + innerInputType.copy(false), // Null check is done at the array access level + innerTargetType); + + String innerElementCode = + codeBlock.getCode() + + "\n" + + objArrayTerm + + "[i] = " + + codeBlock.getReturnTerm() + + ";\n"; + + // Add null check if inner type is nullable + if (innerInputType.isNullable()) { + result.append("if (" + inputTerm + ".isNullAt(i)) {\n") + .append(objArrayTerm + "[i] = null;\n") + .append("} else {\n") + .append(innerElementCode) + .append("}\n"); + } else { + result.append(innerElementCode); + } + + result.append("}\n"); + + result.append( + returnVariable + + " = new " + + className(GenericArrayData.class) + + "(" + + objArrayTerm + + ");\n"); + + return result.toString(); + } + + private static String arrayElementType(LogicalType t) { Review comment: I'm sure we have this switch/case already in existing utils. We should avoid duplicate code. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base class for {@link CastRule} supporting code generation. This base class implements {@link + * #create(CastRule.Context, LogicalType, LogicalType)} compiling the generated code block into a + * {@link CastExecutor} implementation. + * + * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from {@link + * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, or from {@link + * AbstractExpressionCodeGeneratorCastRule} to generate simple expression casts. + */ +public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT> + implements CodeGeneratorCastRule<IN, OUT> { + + protected AbstractCodeGeneratorCastRule(CastRulePredicate predicate) { + super(predicate); + } + + @SuppressWarnings("unchecked") + @Override + public CastExecutor<IN, OUT> create( + CastRule.Context castRuleContext, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + final String inputTerm = "_myInput"; + final String className = CodeGenUtils.newName("GeneratedCastExecutor"); + final String inputTypeTerm = CodeGenUtils.boxedTypeTermForType(inputLogicalType); + final String targetTypeTerm = CodeGenUtils.boxedTypeTermForType(targetLogicalType); + + final CastExecutorCodeGeneratorContext ctx = + new CastExecutorCodeGeneratorContext(castRuleContext); + final CastCodeBlock codeBlock = + generateCodeBlock(ctx, inputTerm, inputLogicalType, targetLogicalType); + + // Class fields can contain type serializers + final String classFieldDecls = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "private final TypeSerializer " + name + ";\n") + .collect(Collectors.joining()); + + final String constructorSignature = + "public " + + className + + "(" + + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "TypeSerializer " + name) + .collect(Collectors.joining(", ")) + + ")"; + final String constructorBody = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "this." + name + " = " + name + ";\n") + .collect(Collectors.joining()); + + // Because janino doesn't support generics, we need to manually cast the input variable of + // the cast method + final String functionSignature = + "@Override public Object cast(Object _myInputObj) throws " + + CodeGenUtils.className(TableException.class); + final String inputVarDecl = + inputTypeTerm + " " + inputTerm + " = (" + inputTypeTerm + ") _myInputObj;\n"; + + final String returnStmt = "return " + codeBlock.getReturnTerm() + ";\n"; + + final String classCode = Review comment: checkstyle allow to use comments for disabling, not sure about spotless ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.types.logical.LogicalType; + +/** + * Cast rule that has code generation capabilities. + * + * @param <IN> Input internal type + * @param <OUT> Output internal type + */ +@Internal +public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> { + + /** + * Generates a code block composed by different statements performing the casting. The returned + * code block must have a variable containing the result and an isNull term. + */ + CastCodeBlock generateCodeBlock( + Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType); + + /** Context for code generation. */ + interface Context { + /** @return the session time zone term */ + String getSessionTimeZoneTerm(); + + /** + * Declare a new variable in the global scope. + * + * @return the variable name + */ + String declareVariable(String type, String variablePrefix); Review comment: We should avoid unnecessary JavaDocs. When you write `new variable in the global scope` then why not calling the method `declareGlobalVariable`? In the end only the `@return` might be interesting. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToArrayCastRule.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.CodeGenUtils.className; +import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName; +import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; + +/** Array to array casting rule. */ +public class ArrayToArrayCastRule + extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, ArrayData> { + + public static final ArrayToArrayCastRule INSTANCE = new ArrayToArrayCastRule(); + + private ArrayToArrayCastRule() { + super( + CastRulePredicate.builder() + .predicate( + (input, target) -> + input.getTypeRoot() == LogicalTypeRoot.ARRAY Review comment: use the new `.is()` ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java ########## @@ -125,7 +125,9 @@ private GenericArrayData copyGenericArray(GenericArrayData array) { LogicalTypeUtils.toInternalConversionClass(eleType), array.size()); for (int i = 0; i < array.size(); i++) { - newArray[i] = eleSer.copy(objectArray[i]); + if (objectArray[i] != null) { Review comment: this is a change in a very crucial component. it can simply fall through during a review of a 2K PR. please put such changes in a separate commit. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/IdentityCastRule.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.Objects; + +/** Identity casting rule. */ +@Internal +public class IdentityCastRule extends AbstractCodeGeneratorCastRule<Object, Object> { + + public static final IdentityCastRule INSTANCE = new IdentityCastRule(); + + private IdentityCastRule() { + super(CastRulePredicate.builder().predicate(Objects::equals).build()); + } + + @Override + public CastCodeBlock generateCodeBlock( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + String isNullTerm = inputLogicalType.isNullable() ? "(" + inputTerm + " == null)" : "false"; + return new CastCodeBlock("", inputTerm, isNullTerm); Review comment: this means that identity casting is more expensive than before, right? Because we need to box for the interface. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastCodeBlock.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** + * Generated cast code block result. This POJO contains the Java code of the block performing the + * cast, the output isNull term and the output variable containing the cast result. It is guaranteed + * that the result and isNull variables can be accessed within the outside scope of the code. Review comment: Use `{@code result} and {@code isNull}`. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting.rules; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.utils.CastExecutor; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.functions.casting.CastCodeBlock; +import org.apache.flink.table.planner.functions.casting.CastRule; +import org.apache.flink.table.planner.functions.casting.CastRulePredicate; +import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule; +import org.apache.flink.table.runtime.generated.CompileUtils; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Base class for {@link CastRule} supporting code generation. This base class implements {@link + * #create(CastRule.Context, LogicalType, LogicalType)} compiling the generated code block into a + * {@link CastExecutor} implementation. + * + * <p>We suggest implementing {@link CodeGeneratorCastRule} starting from {@link + * AbstractNullAwareCodeGeneratorCastRule}, which provides nullability checks, or from {@link + * AbstractExpressionCodeGeneratorCastRule} to generate simple expression casts. + */ +public abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<IN, OUT> + implements CodeGeneratorCastRule<IN, OUT> { + + protected AbstractCodeGeneratorCastRule(CastRulePredicate predicate) { + super(predicate); + } + + @SuppressWarnings("unchecked") + @Override + public CastExecutor<IN, OUT> create( + CastRule.Context castRuleContext, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + final String inputTerm = "_myInput"; + final String className = CodeGenUtils.newName("GeneratedCastExecutor"); + final String inputTypeTerm = CodeGenUtils.boxedTypeTermForType(inputLogicalType); + final String targetTypeTerm = CodeGenUtils.boxedTypeTermForType(targetLogicalType); + + final CastExecutorCodeGeneratorContext ctx = + new CastExecutorCodeGeneratorContext(castRuleContext); + final CastCodeBlock codeBlock = + generateCodeBlock(ctx, inputTerm, inputLogicalType, targetLogicalType); + + // Class fields can contain type serializers + final String classFieldDecls = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "private final TypeSerializer " + name + ";\n") + .collect(Collectors.joining()); + + final String constructorSignature = + "public " + + className + + "(" + + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "TypeSerializer " + name) + .collect(Collectors.joining(", ")) + + ")"; + final String constructorBody = + ctx.getDeclaredTypeSerializers().stream() + .map(name -> "this." + name + " = " + name + ";\n") + .collect(Collectors.joining()); + + // Because janino doesn't support generics, we need to manually cast the input variable of + // the cast method + final String functionSignature = + "@Override public Object cast(Object _myInputObj) throws " + + CodeGenUtils.className(TableException.class); + final String inputVarDecl = + inputTypeTerm + " " + inputTerm + " = (" + inputTypeTerm + ") _myInputObj;\n"; + + final String returnStmt = "return " + codeBlock.getReturnTerm() + ";\n"; + + final String classCode = + "public final class " + + className + + " implements " + + CodeGenUtils.className(CastExecutor.class) + + " {\n" + + classFieldDecls + + constructorSignature + + " {\n" + + constructorBody + + "}\n" + + functionSignature + + "{\n" + + inputVarDecl + + String.join("\n", ctx.variableDeclarationStatements) + + codeBlock.getCode() + + "\n" + + returnStmt + + "}\n}"; + + try { + Object[] constructorArgs = + ctx.getTypeSerializersInstances().toArray(new TypeSerializer[0]); + return (CastExecutor<IN, OUT>) + CompileUtils.compile( + Thread.currentThread().getContextClassLoader(), Review comment: This could cause issues in the future and should be avoided. The class loader should be configurable. At least it should come from the context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org