twalthr commented on a change in pull request #17186:
URL: https://github.com/apache/flink/pull/17186#discussion_r705291119



##########
File path: docs/data/sql_functions.yml
##########
@@ -708,6 +708,42 @@ json:
       -- '[]'
       JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
       ```
+  - sql: JSON_OBJECT([KEY key VALUE value]* [ { NULL | ABSENT } ON NULL ])

Review comment:
       is it possible to omit the `KEY`? 
   
https://docs.oracle.com/en/database/oracle/oracle-database/12.2/adjsn/generation.html#GUID-1084A518-A44A-4654-A796-C1DD4D8EC2AA

##########
File path: docs/data/sql_functions.yml
##########
@@ -708,6 +708,42 @@ json:
       -- '[]'
       JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
       ```
+  - sql: JSON_OBJECT([KEY key VALUE value]* [ { NULL | ABSENT } ON NULL ])
+    table: jsonObject(JsonOnNull, keyValues...)
+    description: |
+      Builds a JSON object string from a list of key-value pairs.
+
+      Note that keys must be non-`NULL` string literals, while values may be 
arbitrary expressions.
+
+      The `ON NULL` behavior defines how to treat `NULL` values. If omitted, 
`NULL ON NULL` is
+      assumed by default.
+
+      Note that this function only returns a JSON string, not an actual JSON 
type. Not all JSON can
+      currently be represented in Flink's type system.
+
+      ```
+      -- '{}'
+      JSON_OBJECT()
+
+      -- '{"K1":"V1","K2":"V2"}'
+      JSON_OBJECT(KEY 'K1' VALUE 'V1', KEY 'K2' VALUE 'V2')
+
+      -- Expressions as values
+      JSON_OBJECT(KEY 'orderNo' VALUE orders.orderId)
+
+      -- ON NULL
+      JSON_OBJECT(KEY 'K1' VALUE NULL NULL ON NULL)   -- '{"K1":null}'
+      JSON_OBJECT(KEY 'K1' VALUE NULL ABSENT ON NULL) -- '{}'
+
+      -- '{"K1":"{\"K2\":\"V\"}"}'
+      JSON_OBJECT(
+        KEY 'K1'
+        VALUE JSON_OBJECT(

Review comment:
       this immediately raises the question how we support nested objects, what 
we need is `FORMAT JSON`, right? maybe we should add some explanation what we 
see and what we don't support yet

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RepeatingSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} composed of an arbitrarily often repeating list 
of {@link
+ * ArgumentTypeStrategy}s.
+ */
+@Internal
+public class RepeatingSequenceInputTypeStrategy implements InputTypeStrategy {
+
+    private final List<ArgumentTypeStrategy> argumentStrategies;
+
+    public RepeatingSequenceInputTypeStrategy(List<ArgumentTypeStrategy> 
argumentStrategies) {
+        this.argumentStrategies = argumentStrategies;
+    }
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return new ArgumentCount() {
+            @Override
+            public boolean isValidCount(int count) {
+                return count % argumentStrategies.size() == 0;
+            }
+
+            @Override
+            public Optional<Integer> getMinCount() {
+                return Optional.empty();
+            }
+
+            @Override
+            public Optional<Integer> getMaxCount() {
+                return Optional.empty();
+            }
+        };
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> dataTypes = callContext.getArgumentDataTypes();
+        final List<DataType> inferredDataTypes = new 
ArrayList<>(dataTypes.size());
+
+        for (int i = 0; i < callContext.getArgumentDataTypes().size(); i++) {
+            final ArgumentTypeStrategy argumentStrategy =
+                    argumentStrategies.get(i % argumentStrategies.size());
+
+            final Optional<DataType> inferredDataType =
+                    argumentStrategy.inferArgumentType(callContext, i, 
throwOnFailure);
+            if (!inferredDataType.isPresent()) {
+                return Optional.empty();
+            }
+
+            inferredDataTypes.add(inferredDataType.get());
+        }
+
+        return Optional.of(inferredDataTypes);
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+        final List<Signature.Argument> arguments = new ArrayList<>();
+        for (int i = 0; i < argumentStrategies.size(); i++) {
+            final Signature.Argument argument =
+                    argumentStrategies.get(i).getExpectedArgument(definition, 
i);
+            arguments.add(argument);
+        }
+
+        // Unfortunately there is no way to represent the repetition in the 
signature
+        final Signature signature = Signature.of(arguments);

Review comment:
       how about `[arg, arg]...` something to indicate the repetition. we do 
already `f([STRING & <LITERAL>], INT)` so a complex one could be `f([[STRING & 
<LITERAL>], INT]...)`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
##########
@@ -562,6 +563,43 @@ public static ApiExpression withoutColumns(Object head, 
Object... tail) {
         return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, 
tail);
     }
 
+    /**
+     * Builds a JSON object string from a list of key-value pairs.
+     *
+     * <p>{@param keyValues} is an even-numbered list of alternating key/value 
pairs. Note that keys
+     * must be non-{@code NULL} string literals, while values may be arbitrary 
expressions.
+     *
+     * <p>The {@link JsonOnNull onNull} behavior defines how to treat {@code 
NULL} values.
+     *
+     * <p>Note that this function only returns a JSON string, not an actual 
JSON type. Not all JSON
+     * can currently be represented in Flink's type system.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // "{}"
+     * jsonObject(JsonOnNull.NULL)
+     * // "{\"K1\":\"V1\",\"K2\":\"V2\"}"
+     * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2")
+     *
+     * // Expressions as values
+     * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId"))
+     *
+     * // ON NULL
+     * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING()))   // 
"{\"K1\":null}"
+     * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // "{}"
+     *
+     * // "{\"K1\":\"{\\\"K2\\\":\\\"V\\\"}\"}"

Review comment:
       I think for readbility, I would drop the outer quote to get rid of one 
level of escapes. or use single quotes in the upper level.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
##########
@@ -46,6 +55,20 @@
     public static final InputTypeStrategy CURRENT_WATERMARK =
             new CurrentWatermarkInputTypeStrategy();
 
+    /**
+     * Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
+     *
+     * <p>The first argument defines the on-null behavior and is followed by 
any number of key-value
+     * pairs. Keys must be character string literals, while values are 
arbitrary expressions.
+     */
+    public static final InputTypeStrategy JSON_OBJECT =
+            compositeSequence()
+                    .argument(symbol(JsonOnNull.class))
+                    .finishWithVarying(
+                            repeatingSequence(

Review comment:
       since we have this `repeatingSequence` now, we could remove the specific 
one for  the `map()` function?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
##########
@@ -798,4 +801,17 @@ object StringCallGen {
     }
   }
 
+  private def generateJsonObject(
+      ctx: CodeGeneratorContext,
+      returnType: LogicalType,
+      operands: Seq[GeneratedExpression]): GeneratedExpression = {
+    generateCallIfArgsNullable(ctx, returnType, operands) {
+      terms =>
+        s"""
+           |$BINARY_STRING.fromString(
+           |    
${qualifyMethod(BuiltInMethods.JSON_OBJECT)}(${safeToStringTerms(terms, 
operands)}))

Review comment:
       don't we allow ANY here? this looks dangerous to me.

##########
File path: docs/data/sql_functions.yml
##########
@@ -708,6 +708,41 @@ json:
       -- '[]'
       JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
       ```
+  - sql: JSON_OBJECT([KEY key VALUE value]* [ { NULL | ABSENT } ON NULL ])
+    table: jsonObject(JsonOnNull, keyValues...)
+    description: |
+      Builds a JSON string from a list of key-value pairs.
+
+      Note that keys must be non-`NULL` string literals, while values may be 
arbitrary expressions.
+
+      The `ON NULL` behavior defines how to treat `NULL` values.
+
+      Note that this function only returns a JSON string, not an actual JSON 
type. Not all JSON can

Review comment:
       I would say this sounds to negative. We also don't have a JSON type. 
Just write:
   ```
   The function returns a JSON string.
   ```
   
   `Not all JSON` can you elaborate on this?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonObjectConverter.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter.converters;
+
+import org.apache.flink.table.api.JsonOnNull;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import 
org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/** Conversion for {@link BuiltInFunctionDefinitions#JSON_OBJECT}. */
+class JsonObjectConverter extends CustomizedConverter {

Review comment:
       nit: `@Internal` just to make it clear

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
##########
@@ -562,6 +563,43 @@ public static ApiExpression withoutColumns(Object head, 
Object... tail) {
         return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, 
tail);
     }
 
+    /**
+     * Builds a JSON string from a list of key-value pairs.
+     *
+     * <p>{@param keyValues} is an even-numbered list of alternating key/value 
pairs. Note that keys
+     * must be non-{@code NULL} string literals, while values may be arbitrary 
expressions.
+     *
+     * <p>The {@link JsonOnNull onNull} behavior defines how to treat {@code 
NULL} values.
+     *
+     * <p>Note that this function only returns a JSON string, not an actual 
JSON type. Not all JSON
+     * can currently be represented in Flink's type system.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // "{}"
+     * jsonObject(JsonOnNull.NULL)
+     * // "{\"K1\":\"V1\",\"K2\":\"V2\"}"
+     * jsonObject(JsonOnNull.NULL, "K1", "V1", "K2", "V2")
+     *
+     * // Expressions as values
+     * jsonObject(JsonOnNull.NULL, "orderNo", $("orderId"))
+     *
+     * // ON NULL
+     * jsonObject(JsonOnNull.NULL, "K1", nullOf(DataTypes.STRING()))   // 
"{\"K1\":null}"
+     * jsonObject(JsonOnNull.ABSENT, "K1", nullOf(DataTypes.STRING())) // "{}"
+     *
+     * // "{\"K1\":\"{\\\"K2\\\":\\\"V\\\"}\"}"
+     * jsonObject(JsonOnNull.NULL, "K1", jsonObject(JsonOnNull.NULL, "K2", 
"V"))
+     * }</pre>
+     */
+    public static ApiExpression jsonObject(JsonOnNull onNull, Object... 
keyValues) {

Review comment:
       Also, we would need to provide similar functionality in Python and Scala 
APIs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to