twalthr commented on a change in pull request #17186:
URL: https://github.com/apache/flink/pull/17186#discussion_r707353810



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
primitiveTypeTermForType, typeTerm}
+import 
org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNullNode, 
createPrimitiveNodeTerm, createRawNodeTerm, isJsonFunctionOperand}
+import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
+import org.apache.flink.table.types.logical.LogicalType
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
+
+import org.apache.calcite.rex.RexCall
+import org.apache.calcite.sql.SqlJsonConstructorNullClause
+import org.apache.calcite.sql.SqlJsonConstructorNullClause.{ABSENT_ON_NULL, 
NULL_ON_NULL}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_OBJECT]].
+ *
+ * <code>JSON_OBJECT</code> returns a character string. However, this creates 
an issue when nesting

Review comment:
       `<code>` is not Scala

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
##########
@@ -28,24 +28,42 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawV
 
 import org.apache.calcite.rex.{RexCall, RexNode}
 
+import java.time.format.DateTimeFormatter
+
 /** Utility for generating JSON function calls. */
 object JsonGenerateUtils {
 
   /**
    * Returns a term which wraps the given <code>valueExpr</code> into a 
[[JsonNode]] of the
    * appropriate type.
    *
+   * @param ctx Context object.
    * @param containerNodeTerm Name of the [[ContainerNode]] from which to 
create the node.
    * @param valueExpr Generated expression of the value which should be 
wrapped.
    * @return Generated code fragment creating the appropriate node.
    */
-  def createPrimitiveNodeTerm(containerNodeTerm: String, valueExpr: 
GeneratedExpression): String = {
+  def createPrimitiveNodeTerm(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      valueExpr: GeneratedExpression): String = {
     valueExpr.resultType.getTypeRoot match {
       case CHAR | VARCHAR => 
s"$containerNodeTerm.textNode(${valueExpr.resultTerm}.toString())"
       case BOOLEAN => 
s"$containerNodeTerm.booleanNode(${valueExpr.resultTerm})"
       case DECIMAL => 
s"$containerNodeTerm.numberNode(${valueExpr.resultTerm}.toBigDecimal())"
       case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE =>
         s"$containerNodeTerm.numberNode(${valueExpr.resultTerm})"
+      case TIMESTAMP_WITHOUT_TIME_ZONE =>
+        val formatter = 
s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME"
+        
s"$containerNodeTerm.textNode(${valueExpr.resultTerm}.toLocalDateTime().format($formatter))"

Review comment:
       fyi: with this we have different timestamp semantics in our formats vs 
functions.
   
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/json/#json-timestamp-format-standard

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
##########
@@ -28,24 +28,42 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawV
 
 import org.apache.calcite.rex.{RexCall, RexNode}
 
+import java.time.format.DateTimeFormatter
+
 /** Utility for generating JSON function calls. */
 object JsonGenerateUtils {
 
   /**
    * Returns a term which wraps the given <code>valueExpr</code> into a 
[[JsonNode]] of the
    * appropriate type.
    *
+   * @param ctx Context object.
    * @param containerNodeTerm Name of the [[ContainerNode]] from which to 
create the node.
    * @param valueExpr Generated expression of the value which should be 
wrapped.
    * @return Generated code fragment creating the appropriate node.
    */
-  def createPrimitiveNodeTerm(containerNodeTerm: String, valueExpr: 
GeneratedExpression): String = {
+  def createPrimitiveNodeTerm(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      valueExpr: GeneratedExpression): String = {
     valueExpr.resultType.getTypeRoot match {
       case CHAR | VARCHAR => 
s"$containerNodeTerm.textNode(${valueExpr.resultTerm}.toString())"
       case BOOLEAN => 
s"$containerNodeTerm.booleanNode(${valueExpr.resultTerm})"
       case DECIMAL => 
s"$containerNodeTerm.numberNode(${valueExpr.resultTerm}.toBigDecimal())"
       case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE =>
         s"$containerNodeTerm.numberNode(${valueExpr.resultTerm})"
+      case TIMESTAMP_WITHOUT_TIME_ZONE =>
+        val formatter = 
s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME"
+        
s"$containerNodeTerm.textNode(${valueExpr.resultTerm}.toLocalDateTime().format($formatter))"
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+        val timeZone = ctx.addReusableSessionTimeZone()
+        val formatter = 
s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME"
+        s"""
+           |$containerNodeTerm.textNode(
+           |    
${valueExpr.resultTerm}.toInstant().atZone($timeZone.toZoneId()).format($formatter))

Review comment:
       we don't need to evaluate the time zone here, we can simply use `Z` 
suffix

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
primitiveTypeTermForType, typeTerm}
+import 
org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNullNode, 
createPrimitiveNodeTerm, createRawNodeTerm, isJsonFunctionOperand}
+import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
+import org.apache.flink.table.types.logical.LogicalType
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
+
+import org.apache.calcite.rex.RexCall
+import org.apache.calcite.sql.SqlJsonConstructorNullClause
+import org.apache.calcite.sql.SqlJsonConstructorNullClause.{ABSENT_ON_NULL, 
NULL_ON_NULL}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_OBJECT]].
+ *
+ * <code>JSON_OBJECT</code> returns a character string. However, this creates 
an issue when nesting
+ * calls to this function with the intention of creating a nested JSON 
structure. Instead of a
+ * nested JSON object, a JSON string would be inserted, i.e.
+ * <code>JSON_OBJECT(KEY 'K' VALUE JSON_OBJECT(KEY 'A' VALUE 'B'))</code> 
would result in
+ * <code>{"K":"{\"A\":\"B\"}"}</code> instead of the intended 
<code>{"K":{"A":"B"}}</code>. We
+ * remedy this by treating nested calls to this function differently and 
inserting the value as a
+ * raw node instead of as a string node.
+ */
+class JsonObjectCallGen(call: RexCall) extends CallGenerator {
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType): GeneratedExpression = {
+
+    val jsonUtils = typeTerm(classOf[SqlJsonUtils])
+    val onNull = getNullBehavior(operands.head)
+
+    val nodeTerm = ctx.addReusableLocalVariable(typeTerm(classOf[ObjectNode]), 
"node")
+    val populateNodeCode = operands.zipWithIndex.drop(1).grouped(2).map {
+      case Seq((keyExpr, _), (valueExpr, valueIdx)) =>
+        val valueTerm = if 
(isJsonFunctionOperand(call.operands.get(valueIdx))) {
+          createRawNodeTerm(nodeTerm, valueExpr)
+        } else {
+          createPrimitiveNodeTerm(nodeTerm, valueExpr)
+        }
+
+        onNull match {
+          case NULL_ON_NULL =>
+            s"""
+               |if (${valueExpr.nullTerm}) {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), 
${createNullNode(nodeTerm)});

Review comment:
       can we reuse the null node?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
primitiveTypeTermForType, typeTerm}
+import 
org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNullNode, 
createPrimitiveNodeTerm, createRawNodeTerm, isJsonFunctionOperand}
+import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
+import org.apache.flink.table.types.logical.LogicalType
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
+
+import org.apache.calcite.rex.RexCall
+import org.apache.calcite.sql.SqlJsonConstructorNullClause
+import org.apache.calcite.sql.SqlJsonConstructorNullClause.{ABSENT_ON_NULL, 
NULL_ON_NULL}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_OBJECT]].
+ *
+ * <code>JSON_OBJECT</code> returns a character string. However, this creates 
an issue when nesting
+ * calls to this function with the intention of creating a nested JSON 
structure. Instead of a
+ * nested JSON object, a JSON string would be inserted, i.e.
+ * <code>JSON_OBJECT(KEY 'K' VALUE JSON_OBJECT(KEY 'A' VALUE 'B'))</code> 
would result in
+ * <code>{"K":"{\"A\":\"B\"}"}</code> instead of the intended 
<code>{"K":{"A":"B"}}</code>. We
+ * remedy this by treating nested calls to this function differently and 
inserting the value as a
+ * raw node instead of as a string node.
+ */
+class JsonObjectCallGen(call: RexCall) extends CallGenerator {
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType): GeneratedExpression = {
+
+    val jsonUtils = typeTerm(classOf[SqlJsonUtils])
+    val onNull = getNullBehavior(operands.head)
+
+    val nodeTerm = ctx.addReusableLocalVariable(typeTerm(classOf[ObjectNode]), 
"node")
+    val populateNodeCode = operands.zipWithIndex.drop(1).grouped(2).map {
+      case Seq((keyExpr, _), (valueExpr, valueIdx)) =>
+        val valueTerm = if 
(isJsonFunctionOperand(call.operands.get(valueIdx))) {
+          createRawNodeTerm(nodeTerm, valueExpr)
+        } else {
+          createPrimitiveNodeTerm(nodeTerm, valueExpr)
+        }
+
+        onNull match {
+          case NULL_ON_NULL =>
+            s"""
+               |if (${valueExpr.nullTerm}) {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), 
${createNullNode(nodeTerm)});
+               |} else {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), 
$valueTerm);
+               |}
+               |""".stripMargin
+          case ABSENT_ON_NULL =>
+            s"""
+               |if (!${valueExpr.nullTerm}) {
+               |    $nodeTerm.set(${keyExpr.resultTerm}.toString(), 
$valueTerm);
+               |}
+               |""".stripMargin
+        }
+    }.mkString
+
+    val resultTerm = 
ctx.addReusableLocalVariable(primitiveTypeTermForType(returnType), "result")

Review comment:
       I think we had this discussion before but are we sure to 
use`addReusableLocalVariable`? It doesn't need to be reusable?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
primitiveTypeTermForType, typeTerm}
+import 
org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNullNode, 
createPrimitiveNodeTerm, createRawNodeTerm, isJsonFunctionOperand}
+import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
+import org.apache.flink.table.types.logical.LogicalType
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
+
+import org.apache.calcite.rex.RexCall
+import org.apache.calcite.sql.SqlJsonConstructorNullClause
+import org.apache.calcite.sql.SqlJsonConstructorNullClause.{ABSENT_ON_NULL, 
NULL_ON_NULL}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_OBJECT]].
+ *
+ * <code>JSON_OBJECT</code> returns a character string. However, this creates 
an issue when nesting
+ * calls to this function with the intention of creating a nested JSON 
structure. Instead of a
+ * nested JSON object, a JSON string would be inserted, i.e.
+ * <code>JSON_OBJECT(KEY 'K' VALUE JSON_OBJECT(KEY 'A' VALUE 'B'))</code> 
would result in
+ * <code>{"K":"{\"A\":\"B\"}"}</code> instead of the intended 
<code>{"K":{"A":"B"}}</code>. We
+ * remedy this by treating nested calls to this function differently and 
inserting the value as a
+ * raw node instead of as a string node.
+ */
+class JsonObjectCallGen(call: RexCall) extends CallGenerator {
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType): GeneratedExpression = {
+
+    val jsonUtils = typeTerm(classOf[SqlJsonUtils])
+    val onNull = getNullBehavior(operands.head)
+
+    val nodeTerm = ctx.addReusableLocalVariable(typeTerm(classOf[ObjectNode]), 
"node")

Review comment:
       can we make this a member variable and reuse it for this call?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to