twalthr commented on a change in pull request #17522:
URL: https://github.com/apache/flink/pull/17522#discussion_r733586413



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CastRuleProvider.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.casting.rules.IdentityCastRule$;
+import org.apache.flink.table.data.casting.rules.TimestampToStringCastRule$;
+import org.apache.flink.table.data.casting.rules.UpcastToBigIntCastRule$;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class resolves {@link CastRule} using the input and the target type. 
*/
+@Internal
+public class CastRuleProvider {
+
+    /* ------- Entrypoint ------- */
+
+    /**
+     * Resolve a {@link CastRule} for the provided input data type and target 
data type. Returns
+     * {@code null} if no rule can be resolved.
+     */
+    public static @Nullable CastRule<?, ?> resolve(
+            LogicalType inputDataType, LogicalType targetDataType) {
+        return INSTANCE.internalResolve(inputDataType, targetDataType);
+    }
+
+    /** @see #resolve(LogicalType, LogicalType) */
+    public static @Nullable CastRule<?, ?> resolve(
+            DataType inputDataType, DataType targetDataType) {
+        return resolve(inputDataType.getLogicalType(), 
targetDataType.getLogicalType());
+    }
+
+    /* ------ Implementation ------ */
+
+    private static final CastRuleProvider INSTANCE = new CastRuleProvider();
+
+    static {
+        INSTANCE.addRule(TimestampToStringCastRule$.MODULE$)
+                .addRule(IdentityCastRule$.MODULE$)
+                .addRule(UpcastToBigIntCastRule$.MODULE$)
+                .freeze();
+    }
+
+    // Map<Target family or root, Map<Input family or root, rule>>
+    private Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();
+    private List<CastRule<?, ?>> rulesWithCustomPredicate = new ArrayList<>();

Review comment:
       can be `final`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CastRuleProvider.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.casting.rules.IdentityCastRule$;
+import org.apache.flink.table.data.casting.rules.TimestampToStringCastRule$;
+import org.apache.flink.table.data.casting.rules.UpcastToBigIntCastRule$;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class resolves {@link CastRule} using the input and the target type. 
*/
+@Internal
+public class CastRuleProvider {
+
+    /* ------- Entrypoint ------- */
+
+    /**
+     * Resolve a {@link CastRule} for the provided input data type and target 
data type. Returns
+     * {@code null} if no rule can be resolved.
+     */
+    public static @Nullable CastRule<?, ?> resolve(
+            LogicalType inputDataType, LogicalType targetDataType) {
+        return INSTANCE.internalResolve(inputDataType, targetDataType);
+    }
+
+    /** @see #resolve(LogicalType, LogicalType) */
+    public static @Nullable CastRule<?, ?> resolve(
+            DataType inputDataType, DataType targetDataType) {
+        return resolve(inputDataType.getLogicalType(), 
targetDataType.getLogicalType());
+    }
+
+    /* ------ Implementation ------ */
+
+    private static final CastRuleProvider INSTANCE = new CastRuleProvider();
+
+    static {
+        INSTANCE.addRule(TimestampToStringCastRule$.MODULE$)
+                .addRule(IdentityCastRule$.MODULE$)
+                .addRule(UpcastToBigIntCastRule$.MODULE$)
+                .freeze();
+    }
+
+    // Map<Target family or root, Map<Input family or root, rule>>
+    private Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();
+    private List<CastRule<?, ?>> rulesWithCustomPredicate = new ArrayList<>();
+
+    private CastRuleProvider addRule(CastRule<?, ?> rule) {
+        CastRulePredicate predicate = rule.getPredicateDefinition();
+
+        for (LogicalTypeRoot targetTypeRoot : predicate.getTargetTypes()) {
+            Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetTypeRoot, k -> new 
HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : 
predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+        for (LogicalTypeFamily targetTypeFamily : 
predicate.getTargetTypeFamilies()) {
+            Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetTypeFamily, k -> new 
HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : 
predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+
+        if (predicate.getCustomPredicate() != null) {
+            rulesWithCustomPredicate.add(rule);
+        }
+
+        return this;
+    }
+
+    private CastRule<?, ?> internalResolve(LogicalType inputDataType, 
LogicalType targetDataType) {

Review comment:
       nit: for `LogicalType` we should not use `inputDataType` as the name but 
`inputType`. This also makes the code shorter.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CastRuleProvider.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.casting.rules.IdentityCastRule$;
+import org.apache.flink.table.data.casting.rules.TimestampToStringCastRule$;
+import org.apache.flink.table.data.casting.rules.UpcastToBigIntCastRule$;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class resolves {@link CastRule} using the input and the target type. 
*/
+@Internal
+public class CastRuleProvider {
+
+    /* ------- Entrypoint ------- */
+
+    /**
+     * Resolve a {@link CastRule} for the provided input data type and target 
data type. Returns
+     * {@code null} if no rule can be resolved.
+     */
+    public static @Nullable CastRule<?, ?> resolve(
+            LogicalType inputDataType, LogicalType targetDataType) {
+        return INSTANCE.internalResolve(inputDataType, targetDataType);
+    }
+
+    /** @see #resolve(LogicalType, LogicalType) */
+    public static @Nullable CastRule<?, ?> resolve(
+            DataType inputDataType, DataType targetDataType) {
+        return resolve(inputDataType.getLogicalType(), 
targetDataType.getLogicalType());
+    }
+
+    /* ------ Implementation ------ */
+
+    private static final CastRuleProvider INSTANCE = new CastRuleProvider();
+
+    static {
+        INSTANCE.addRule(TimestampToStringCastRule$.MODULE$)
+                .addRule(IdentityCastRule$.MODULE$)
+                .addRule(UpcastToBigIntCastRule$.MODULE$)
+                .freeze();
+    }
+
+    // Map<Target family or root, Map<Input family or root, rule>>
+    private Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();
+    private List<CastRule<?, ?>> rulesWithCustomPredicate = new ArrayList<>();
+
+    private CastRuleProvider addRule(CastRule<?, ?> rule) {
+        CastRulePredicate predicate = rule.getPredicateDefinition();
+
+        for (LogicalTypeRoot targetTypeRoot : predicate.getTargetTypes()) {
+            Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetTypeRoot, k -> new 
HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : 
predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+        for (LogicalTypeFamily targetTypeFamily : 
predicate.getTargetTypeFamilies()) {
+            Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetTypeFamily, k -> new 
HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : 
predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+
+        if (predicate.getCustomPredicate() != null) {
+            rulesWithCustomPredicate.add(rule);
+        }
+
+        return this;
+    }
+
+    private CastRule<?, ?> internalResolve(LogicalType inputDataType, 
LogicalType targetDataType) {
+        // Try lookup by target type root/type families
+        Map<Object, CastRule<?, ?>> inputTypeToCastRuleMap =
+                lookupTypeInMap(rules, targetDataType.getTypeRoot());
+        CastRule<?, ?> rule;
+        if (inputTypeToCastRuleMap != null) {
+            // Try lookup by input type root/type families
+            rule = lookupTypeInMap(inputTypeToCastRuleMap, 
inputDataType.getTypeRoot());
+            if (rule != null) {
+                return rule;
+            }
+        }
+
+        // Try with the custom predicate rules
+        rule =
+                rulesWithCustomPredicate.stream()
+                        .filter(
+                                r ->
+                                        r.getPredicateDefinition()
+                                                .getCustomPredicate()
+                                                .test(inputDataType, 
targetDataType))
+                        .findFirst()
+                        .orElse(null);
+
+        return rule;
+    }
+
+    private void freeze() {

Review comment:
       why not simply using a `static {}` right after the map member and call 
some `add` methods. we also don't need the `freeze` because the maps are not 
exposed as far as I can see. My main goal here is code simplification.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CastRuleProvider.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.casting.rules.IdentityCastRule$;
+import org.apache.flink.table.data.casting.rules.TimestampToStringCastRule$;
+import org.apache.flink.table.data.casting.rules.UpcastToBigIntCastRule$;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class resolves {@link CastRule} using the input and the target type. 
*/
+@Internal
+public class CastRuleProvider {
+
+    /* ------- Entrypoint ------- */
+
+    /**
+     * Resolve a {@link CastRule} for the provided input data type and target 
data type. Returns
+     * {@code null} if no rule can be resolved.
+     */
+    public static @Nullable CastRule<?, ?> resolve(
+            LogicalType inputDataType, LogicalType targetDataType) {
+        return INSTANCE.internalResolve(inputDataType, targetDataType);
+    }
+
+    /** @see #resolve(LogicalType, LogicalType) */
+    public static @Nullable CastRule<?, ?> resolve(
+            DataType inputDataType, DataType targetDataType) {
+        return resolve(inputDataType.getLogicalType(), 
targetDataType.getLogicalType());
+    }
+
+    /* ------ Implementation ------ */
+
+    private static final CastRuleProvider INSTANCE = new CastRuleProvider();
+
+    static {
+        INSTANCE.addRule(TimestampToStringCastRule$.MODULE$)
+                .addRule(IdentityCastRule$.MODULE$)
+                .addRule(UpcastToBigIntCastRule$.MODULE$)
+                .freeze();
+    }
+
+    // Map<Target family or root, Map<Input family or root, rule>>
+    private Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();

Review comment:
       can be `final`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/data/casting/rules/TimestampToStringCastRule.scala
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting.rules
+
+import org.apache.flink.table.data.TimestampData
+import org.apache.flink.table.data.casting.{CastRulePredicate, 
CodeGeneratorCastRule}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, 
qualifyMethod}
+import 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.TIMESTAMP_TO_STRING_TIME_ZONE
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils
+import org.apache.flink.table.types.logical.{LocalZonedTimestampType, 
LogicalType, LogicalTypeFamily, LogicalTypeRoot, TimestampType}
+
+object TimestampToStringCastRule extends 
AbstractCharacterFamilyTargetRule[TimestampData](
+  CastRulePredicate.builder()
+    .input(LogicalTypeFamily.TIMESTAMP)
+    .target(LogicalTypeFamily.CHARACTER_STRING)
+    .build()
+) {
+  override def generateStringExpression(
+                                         context: 
CodeGeneratorCastRule.Context,
+                                         inputArgumentName: String,
+                                         inputLogicalType: LogicalType,
+                                         targetLogicalType: LogicalType): 
String = {
+    val zoneId = inputLogicalType.getTypeRoot match {
+      case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE => 
context.getSessionTimeZoneTerm
+      case _ => s"${className[SqlDateTimeUtils]}.UTC_ZONE"
+    }
+    val precision = inputLogicalType match {

Review comment:
       use `LogicalTypeChecks.getPrecision()` 

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/data/casting/CastRulesTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.TestFactory;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * This class runs unit tests of {@link 
org.apache.flink.table.data.casting.CastRule}

Review comment:
       nit: import classes in JavaDocs to improve the reading flow

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/casting/CastRule.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.ZoneId;
+
+/**
+ * Casting executor factory performs the pre-flight of a casting operation.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+@Internal
+public interface CastRule<IN, OUT> {
+
+    /** @see CastRulePredicate for more details about a cast rule predicate 
definition */
+    CastRulePredicate getPredicateDefinition();
+
+    /**
+     * Create a casting executor starting from the provided input type. The 
returned {@link
+     * CastExecutor} assumes the input value is using the internal data type, 
and it's a valid value
+     * for the provided {@code targetLogicalType}.
+     */
+    CastExecutor<IN, OUT> create(

Review comment:
       why do both `CastRuleProvider` and `CastRule` take source and target 
type? isn't this redundant? the `CastRuleProvider` can directly return the 
`CastExecutor` and we would save an additional indirection. We could convert 
the `CastRuleProvider` into a `CastExecutorProvider` that would only work with 
`CastRule`s internally.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/casting/CastRule.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.ZoneId;
+
+/**
+ * Casting executor factory performs the pre-flight of a casting operation.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+@Internal
+public interface CastRule<IN, OUT> {

Review comment:
       `CastExecutor` could be in the `data.utils` package then

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/data/casting/rules/AbstractCharacterFamilyTargetRule.scala
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting.rules
+
+import org.apache.flink.table.data.StringData
+import org.apache.flink.table.data.binary.BinaryStringData
+import org.apache.flink.table.data.casting.{CastExpression, CastRulePredicate, 
CodeGeneratorCastRule}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.className
+import org.apache.flink.table.types.logical.{CharType, LogicalType, 
VarCharType}
+
+abstract class AbstractCharacterFamilyTargetRule[IN](predicate: 
CastRulePredicate)
+  extends AbstractCodeGeneratorCastRule[IN, StringData](predicate) {
+
+  def generateStringExpression(context: CodeGeneratorCastRule.Context,

Review comment:
       please follow this style for indention:
   https://github.com/databricks/scala-style-guide#indent
   
   we can discuss to introduce a strict code style in the future

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/data/casting/rules/AbstractCastRule.scala
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting.rules
+
+import org.apache.flink.table.data.casting.{CastRule, CastRulePredicate}
+
+abstract class AbstractCastRule[IN, OUT](predicate: CastRulePredicate) extends 
CastRule[IN, OUT] {

Review comment:
       let's try to implement most rules in Java. so far the rules are not 
overly complicated. we can still fallback in Scala if very important utilities 
cannot be migrated

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CodeGeneratedCastExecutor.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ExpressionEvaluator;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Cast executor which can be instantiated starting from an expression code.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+class CodeGeneratedCastExecutor<IN, OUT> implements CastExecutor<IN, OUT> {
+
+    private final ExpressionEvaluator ee;
+
+    // To reuse for invocations
+    private final Object[] inputArray;
+
+    CodeGeneratedCastExecutor(
+            String code, String inputArgumentName, Class<IN> inputClass, 
Class<OUT> outputClass) {
+        this.ee = new ExpressionEvaluator();
+        // Input args
+        ee.setParameters(new String[] {inputArgumentName}, new Class[] 
{inputClass});
+        // Result type
+        ee.setExpressionType(outputClass);
+        try {
+            // Compile
+            ee.cook(code);

Review comment:
       compilation can be quite expensive and take a lot of time. usually we 
use our compiler utils to speed things up, and cache reoccuring code. maybe we 
should do the same here

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CodeGeneratedCastExecutor.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ExpressionEvaluator;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Cast executor which can be instantiated starting from an expression code.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+class CodeGeneratedCastExecutor<IN, OUT> implements CastExecutor<IN, OUT> {
+
+    private final ExpressionEvaluator ee;

Review comment:
       for code readbility, don't use abbreviations like `ee` but rather just 
`evaluator`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/casting/CastRule.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.ZoneId;
+
+/**
+ * Casting executor factory performs the pre-flight of a casting operation.
+ *
+ * @param <IN> Input internal type
+ * @param <OUT> Output internal type
+ */
+@Internal
+public interface CastRule<IN, OUT> {

Review comment:
       if we need to call code generator for `PrintUtils` anyway that sits in 
the planner, we can also move this interface to the planner. only the 
`CastExecutor` would stay in common.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/data/casting/CastRuleProvider.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.casting;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.casting.rules.IdentityCastRule$;
+import org.apache.flink.table.data.casting.rules.TimestampToStringCastRule$;
+import org.apache.flink.table.data.casting.rules.UpcastToBigIntCastRule$;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class resolves {@link CastRule} using the input and the target type. 
*/
+@Internal
+public class CastRuleProvider {
+
+    /* ------- Entrypoint ------- */
+
+    /**
+     * Resolve a {@link CastRule} for the provided input data type and target 
data type. Returns
+     * {@code null} if no rule can be resolved.
+     */
+    public static @Nullable CastRule<?, ?> resolve(
+            LogicalType inputDataType, LogicalType targetDataType) {
+        return INSTANCE.internalResolve(inputDataType, targetDataType);
+    }
+
+    /** @see #resolve(LogicalType, LogicalType) */
+    public static @Nullable CastRule<?, ?> resolve(
+            DataType inputDataType, DataType targetDataType) {
+        return resolve(inputDataType.getLogicalType(), 
targetDataType.getLogicalType());
+    }
+
+    /* ------ Implementation ------ */
+
+    private static final CastRuleProvider INSTANCE = new CastRuleProvider();
+
+    static {
+        INSTANCE.addRule(TimestampToStringCastRule$.MODULE$)
+                .addRule(IdentityCastRule$.MODULE$)
+                .addRule(UpcastToBigIntCastRule$.MODULE$)
+                .freeze();
+    }
+
+    // Map<Target family or root, Map<Input family or root, rule>>
+    private Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();
+    private List<CastRule<?, ?>> rulesWithCustomPredicate = new ArrayList<>();
+
+    private CastRuleProvider addRule(CastRule<?, ?> rule) {
+        CastRulePredicate predicate = rule.getPredicateDefinition();
+
+        for (LogicalTypeRoot targetTypeRoot : predicate.getTargetTypes()) {
+            Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetTypeRoot, k -> new 
HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : 
predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+        for (LogicalTypeFamily targetTypeFamily : 
predicate.getTargetTypeFamilies()) {
+            Map<Object, CastRule<?, ?>> map =
+                    rules.computeIfAbsent(targetTypeFamily, k -> new 
HashMap<>());
+            for (LogicalTypeRoot inputTypeRoot : predicate.getInputTypes()) {
+                map.put(inputTypeRoot, rule);
+            }
+            for (LogicalTypeFamily inputTypeFamily : 
predicate.getInputTypeFamilies()) {
+                map.put(inputTypeFamily, rule);
+            }
+        }
+
+        if (predicate.getCustomPredicate() != null) {
+            rulesWithCustomPredicate.add(rule);
+        }
+
+        return this;
+    }
+
+    private CastRule<?, ?> internalResolve(LogicalType inputDataType, 
LogicalType targetDataType) {
+        // Try lookup by target type root/type families
+        Map<Object, CastRule<?, ?>> inputTypeToCastRuleMap =

Review comment:
       nit: I would really encourage to use `final` also for local variable. It 
makes it clearer when the content of a variable is changed again or not. e.g. 
`rule` is reassigned whereas `inputTypeToCastRuleMap` is not. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to