twalthr commented on code in PR #23811: URL: https://github.com/apache/flink/pull/23811#discussion_r1407512557
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java: ########## @@ -216,6 +217,17 @@ public String asSummaryString() { return getFunctionName() + argList; } + @Override + public String asSerializableString() { + if (functionDefinition instanceof BuiltInFunctionDefinition) { + return ((BuiltInFunctionDefinition) functionDefinition) Review Comment: nit: use local variable and cast once ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java: ########## @@ -216,6 +217,17 @@ public String asSummaryString() { return getFunctionName() + argList; } + @Override + public String asSerializableString() { + if (functionDefinition instanceof BuiltInFunctionDefinition) { + return ((BuiltInFunctionDefinition) functionDefinition) + .getCallSyntax() + .unparse(((BuiltInFunctionDefinition) functionDefinition).getSqlName(), args); + } else { + return CallSyntax.FUNCTION.unparse(getFunctionName(), args); Review Comment: We should throw an exception when `functionIdentifier` is null. Something like "Only functions that have been registered before are serializable"? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java: ########## @@ -69,23 +69,31 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { private final boolean isInternal; + private final CallSyntax callSyntax; Review Comment: Should we call this `SqlCallSyntax`? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -2092,6 +2262,19 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition IS_JSON = BuiltInFunctionDefinition.newBuilder() .name("IS_JSON") + .callSyntax( + (sqlName, operands) -> { + final String s = + String.format( + "%s IS JSON", + CallSyntaxUtils.asSerializableOperand( + operands.get(0))); + if (operands.size() > 1) { Review Comment: can you add more comments for this special cases? It's hard to understand what is going on here. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java: ########## @@ -219,6 +219,11 @@ public String asSummaryString() { return stringifyValue(value); } + @Override + public String asSerializableString() { + return stringifyValue(value); Review Comment: are you sure that this is safe for all data types? A quick scan revealed that arrays could be an issue? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -679,6 +720,26 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition DISTINCT = BuiltInFunctionDefinition.newBuilder() .name("distinct") + .callSyntax( + (sqlName, operands) -> { + final CallExpression callExpression = + (CallExpression) operands.get(0); + if (callExpression.getFunctionDefinition() Review Comment: Can you add some comments here? This looks not straightforward compared to the other functions. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java: ########## @@ -327,12 +351,43 @@ public Builder internal() { return this; } + /** + * Overwrites the syntax used for unparsing a function into a SQL string. If not specified. Review Comment: nit: `If not specified,` also below ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java: ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.api.JsonExistsOnError; +import org.apache.flink.table.api.JsonOnNull; +import org.apache.flink.table.api.JsonQueryOnEmptyOrError; +import org.apache.flink.table.api.JsonQueryWrapper; +import org.apache.flink.table.api.JsonType; +import org.apache.flink.table.api.JsonValueOnEmptyOrError; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.expressions.resolver.ExpressionResolver; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.operations.ValuesQueryOperation; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeFactoryMock; +import org.apache.flink.table.utils.FunctionLookupMock; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.negative; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for serializing {@link BuiltInFunctionDefinitions} into a SQL string. */ +public class ExpressionSerializationTest { + + public static Stream<TestSpec> testData() { Review Comment: Will we also add a test in the planner that reuses the spec and checks whether the SQL is correct? We could also extend the `BuiltInFunctionTestBase` and test serializability for all Table API expressions. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -1904,6 +2055,25 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition GET = BuiltInFunctionDefinition.newBuilder() .name("get") + .callSyntax( + (sqlName, operands) -> { + final Optional<String> fieldName = + ((ValueLiteralExpression) operands.get(1)) + .getValueAs(String.class); + + return fieldName + .map( + n -> + String.format( + "%s.`%s`", Review Comment: not sure if the serialization format is correct here: ``` `%s` ``` shouldn't this be properly escaped in the argument already ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -763,6 +827,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition SUBSTR = BuiltInFunctionDefinition.newBuilder() .name("substr") + .callSyntax("SUBSTR", CallSyntax.SUBSTRING) Review Comment: Serialize into `SUBSTRING`? I don't understand why this synonym actually exists. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/CallSyntaxUtils.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; + +/** Utility functions that can be used for writing {@link CallSyntax}. */ +@Internal +class CallSyntaxUtils { + + /** + * Converts the given {@link ResolvedExpression} into a SQL string. Wraps the string with + * parenthesis if the expression is not a leaf expression such as e.g. {@link + * ValueLiteralExpression} or {@link FieldReferenceExpression}. + */ + static String asSerializableOperand(ResolvedExpression expression) { Review Comment: why are not calling this method for all functions? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/CallSyntax.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.ResolvedExpression; + +import java.util.List; +import java.util.stream.Collectors; + +/** Provides a format for unparsing {@link BuiltInFunctionDefinitions} into a SQL string. */ +@Internal +public interface CallSyntax { + + String unparse(String sqlName, List<ResolvedExpression> operands); + + default String unparseDistinct(String sqlName, List<ResolvedExpression> operands) { + throw new UnsupportedOperationException( + "Only the FUNCTION syntax supports the DISTINCT clause."); + } + + CallSyntax FUNCTION = Review Comment: add concrete examples to each syntax ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -406,6 +414,13 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition IF = BuiltInFunctionDefinition.newBuilder() .name("ifThenElse") + .callSyntax( + (sqlName, operands) -> + String.format( + "IF %s THEN %s ELSE %s", Review Comment: is that valid SQL? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -406,6 +414,13 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition IF = BuiltInFunctionDefinition.newBuilder() .name("ifThenElse") + .callSyntax( + (sqlName, operands) -> + String.format( + "IF %s THEN %s ELSE %s", Review Comment: Why not `asSerializableOperand`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org