twalthr commented on a change in pull request #17186: URL: https://github.com/apache/flink/pull/17186#discussion_r705291119
########## File path: docs/data/sql_functions.yml ########## @@ -708,6 +708,42 @@ json: -- '[]' JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR) ``` + - sql: JSON_OBJECT([KEY key VALUE value]* [ { NULL | ABSENT } ON NULL ]) Review comment: is it possible to omit the `KEY`? https://docs.oracle.com/en/database/oracle/oracle-database/12.2/adjsn/generation.html#GUID-1084A518-A44A-4654-A796-C1DD4D8EC2AA ########## File path: docs/data/sql_functions.yml ########## @@ -708,6 +708,42 @@ json: -- '[]' JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR) ``` + - sql: JSON_OBJECT([KEY key VALUE value]* [ { NULL | ABSENT } ON NULL ]) + table: jsonObject(JsonOnNull, keyValues...) + description: | + Builds a JSON object string from a list of key-value pairs. + + Note that keys must be non-`NULL` string literals, while values may be arbitrary expressions. + + The `ON NULL` behavior defines how to treat `NULL` values. If omitted, `NULL ON NULL` is + assumed by default. + + Note that this function only returns a JSON string, not an actual JSON type. Not all JSON can + currently be represented in Flink's type system. + + ``` + -- '{}' + JSON_OBJECT() + + -- '{"K1":"V1","K2":"V2"}' + JSON_OBJECT(KEY 'K1' VALUE 'V1', KEY 'K2' VALUE 'V2') + + -- Expressions as values + JSON_OBJECT(KEY 'orderNo' VALUE orders.orderId) + + -- ON NULL + JSON_OBJECT(KEY 'K1' VALUE NULL NULL ON NULL) -- '{"K1":null}' + JSON_OBJECT(KEY 'K1' VALUE NULL ABSENT ON NULL) -- '{}' + + -- '{"K1":"{\"K2\":\"V\"}"}' + JSON_OBJECT( + KEY 'K1' + VALUE JSON_OBJECT( Review comment: this immediately raises the question how we support nested objects, what we need is `FORMAT JSON`, right? maybe we should add some explanation what we see and what we don't support yet ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.ArgumentTypeStrategy; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * {@link InputTypeStrategy} composed of an arbitrarily often repeating list of {@link + * ArgumentTypeStrategy}s. + */ +@Internal +public class RepeatingSequenceInputTypeStrategy implements InputTypeStrategy { + + private final List<ArgumentTypeStrategy> argumentStrategies; + + public RepeatingSequenceInputTypeStrategy(List<ArgumentTypeStrategy> argumentStrategies) { + this.argumentStrategies = argumentStrategies; + } + + @Override + public ArgumentCount getArgumentCount() { + return new ArgumentCount() { + @Override + public boolean isValidCount(int count) { + return count % argumentStrategies.size() == 0; + } + + @Override + public Optional<Integer> getMinCount() { + return Optional.empty(); + } + + @Override + public Optional<Integer> getMaxCount() { + return Optional.empty(); + } + }; + } + + @Override + public Optional<List<DataType>> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + final List<DataType> dataTypes = callContext.getArgumentDataTypes(); + final List<DataType> inferredDataTypes = new ArrayList<>(dataTypes.size()); + + for (int i = 0; i < callContext.getArgumentDataTypes().size(); i++) { + final ArgumentTypeStrategy argumentStrategy = + argumentStrategies.get(i % argumentStrategies.size()); + + final Optional<DataType> inferredDataType = + argumentStrategy.inferArgumentType(callContext, i, throwOnFailure); + if (!inferredDataType.isPresent()) { + return Optional.empty(); + } + + inferredDataTypes.add(inferredDataType.get()); + } + + return Optional.of(inferredDataTypes); + } + + @Override + public List<Signature> getExpectedSignatures(FunctionDefinition definition) { + final List<Signature.Argument> arguments = new ArrayList<>(); + for (int i = 0; i < argumentStrategies.size(); i++) { + final Signature.Argument argument = + argumentStrategies.get(i).getExpectedArgument(definition, i); + arguments.add(argument); + } + + // Unfortunately there is no way to represent the repetition in the signature + final Signature signature = Signature.of(arguments); Review comment: how about `[arg, arg]...` something to indicate the repetition. we do already `f([STRING & <LITERAL>], INT)` so a complex one could be `f([[STRING & <LITERAL>], INT]...)` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java ########## @@ -562,6 +563,43 @@ public static ApiExpression withoutColumns(Object head, Object... tail) { return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, tail); } + /** + * Builds a JSON object string from a list of key-value pairs. + * + * <p>{@param keyValues} is an even-numbered list of alternating key/value pairs. Note that keys + * must be non-{@code NULL} string literals, while values may be arbitrary expressions. + * + * <p>The {@link JsonOnNull onNull} behavior defines how to treat {@code NULL} values. + * + * <p>Note that this function only returns a JSON string, not an actual JSON type. Not all JSON + * can currently be represented in Flink's type system. + * + * <p>Examples: + * + * <pre>{@code + * // "{}" + * jsonObject(JsonOnNull.NULL) + * // "{\"K1\":\"V1\",\"K2\":\"V2\"}" + * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2") + * + * // Expressions as values + * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId")) + * + * // ON NULL + * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING())) // "{\"K1\":null}" + * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // "{}" + * + * // "{\"K1\":\"{\\\"K2\\\":\\\"V\\\"}\"}" Review comment: I think for readbility, I would drop the outer quote to get rid of one level of escapes. or use single quotes in the upper level. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java ########## @@ -46,6 +55,20 @@ public static final InputTypeStrategy CURRENT_WATERMARK = new CurrentWatermarkInputTypeStrategy(); + /** + * Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. + * + * <p>The first argument defines the on-null behavior and is followed by any number of key-value + * pairs. Keys must be character string literals, while values are arbitrary expressions. + */ + public static final InputTypeStrategy JSON_OBJECT = + compositeSequence() + .argument(symbol(JsonOnNull.class)) + .finishWithVarying( + repeatingSequence( Review comment: since we have this `repeatingSequence` now, we could remove the specific one for the `map()` function? ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala ########## @@ -798,4 +801,17 @@ object StringCallGen { } } + private def generateJsonObject( + ctx: CodeGeneratorContext, + returnType: LogicalType, + operands: Seq[GeneratedExpression]): GeneratedExpression = { + generateCallIfArgsNullable(ctx, returnType, operands) { + terms => + s""" + |$BINARY_STRING.fromString( + | ${qualifyMethod(BuiltInMethods.JSON_OBJECT)}(${safeToStringTerms(terms, operands)})) Review comment: don't we allow ANY here? this looks dangerous to me. ########## File path: docs/data/sql_functions.yml ########## @@ -708,6 +708,41 @@ json: -- '[]' JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR) ``` + - sql: JSON_OBJECT([KEY key VALUE value]* [ { NULL | ABSENT } ON NULL ]) + table: jsonObject(JsonOnNull, keyValues...) + description: | + Builds a JSON string from a list of key-value pairs. + + Note that keys must be non-`NULL` string literals, while values may be arbitrary expressions. + + The `ON NULL` behavior defines how to treat `NULL` values. + + Note that this function only returns a JSON string, not an actual JSON type. Not all JSON can Review comment: I would say this sounds to negative. We also don't have a JSON type. Just write: ``` The function returns a JSON string. ``` `Not all JSON` can you elaborate on this? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions.converter.converters; + +import org.apache.flink.table.api.JsonOnNull; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlJsonConstructorNullClause; + +import java.util.LinkedList; +import java.util.List; + +/** Conversion for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. */ +class JsonObjectConverter extends CustomizedConverter { Review comment: nit: `@Internal` just to make it clear ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java ########## @@ -562,6 +563,43 @@ public static ApiExpression withoutColumns(Object head, Object... tail) { return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, tail); } + /** + * Builds a JSON string from a list of key-value pairs. + * + * <p>{@param keyValues} is an even-numbered list of alternating key/value pairs. Note that keys + * must be non-{@code NULL} string literals, while values may be arbitrary expressions. + * + * <p>The {@link JsonOnNull onNull} behavior defines how to treat {@code NULL} values. + * + * <p>Note that this function only returns a JSON string, not an actual JSON type. Not all JSON + * can currently be represented in Flink's type system. + * + * <p>Examples: + * + * <pre>{@code + * // "{}" + * jsonObject(JsonOnNull.NULL) + * // "{\"K1\":\"V1\",\"K2\":\"V2\"}" + * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2") + * + * // Expressions as values + * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId")) + * + * // ON NULL + * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING())) // "{\"K1\":null}" + * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // "{}" + * + * // "{\"K1\":\"{\\\"K2\\\":\\\"V\\\"}\"}" + * jsonObject(JsonOnNull.NULL, "K1", jsonObject(JsonOnNull.NULL, "K2", "V")) + * }</pre> + */ + public static ApiExpression jsonObject(JsonOnNull onNull, Object... keyValues) { Review comment: Also, we would need to provide similar functionality in Python and Scala APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org