sandip-db commented on code in PR #50052:
URL: https://github.com/apache/spark/pull/50052#discussion_r2023879996


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -386,6 +442,113 @@ class UnivocityParser(
       }
     }
   }
+
+  /**
+   * This class converts a comma-separated value into a variant column (when 
the schema contains
+   * variant type) or a variant field (when in singleVariantColumn mode).
+   *
+   * It has a list of scalar types to try (long, decimal, date, timestamp, 
boolean) and maintains
+   * the current content type. It tries to parse the input as the current 
content type. If the
+   * parsing fails, it moves to the next type in the list and continues the 
trial. It never checks
+   * the previous types that have already failed. In the end, it either 
successfully parses the
+   * input as a specific scalar type, or fails after trying all the types and 
defaults to the string
+   * type. The state is reset for every input file.
+   *
+   * Floating point types (double, float) are not considered to avoid 
precision loss.
+   */
+  private final class VariantValueConverter extends ValueConverter {
+    private var currentType: DataType = LongType
+
+    override def apply(s: String): Any = {
+      val builder = new VariantBuilder(false)
+      convertInput(builder, s)
+      val v = builder.result()
+      new VariantVal(v.getValue, v.getMetadata)
+    }
+
+    def convertInput(builder: VariantBuilder, s: String): Unit = {
+      if (s == null || s == options.nullValue) {
+        builder.appendNull()
+        return
+      }
+
+      def parseLong(): DataType = {
+        try {
+          builder.appendLong(s.toLong)
+          // The actual integral type doesn't matter. `appendLong` will use 
the smallest possible
+          // integral type to store the value.
+          LongType
+        } catch {
+          case NonFatal(_) => parseDecimal()
+        }
+      }
+
+      def parseDecimal(): DataType = {
+        try {
+          var d = decimalParser(s)
+          if (d.scale() < 0) {
+            d = d.setScale(0)
+          }
+          if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION &&
+            d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) {
+            builder.appendDecimal(d)
+            // The actual decimal type doesn't matter. `appendDecimal` will 
use the smallest
+            // possible decimal type to store the value.
+            DecimalType.USER_DEFAULT
+          } else {
+            parseDate()
+          }
+        } catch {
+          case NonFatal(_) => parseDate()
+        }
+      }
+
+      def parseDate(): DataType = {
+        try {
+          builder.appendDate(dateFormatter.parse(s))
+          DateType
+        } catch {
+          case NonFatal(_) => parseTimestamp()
+        }
+      }
+
+      def parseTimestamp(): DataType = {
+        try {
+          builder.appendTimestamp(timestampFormatter.parse(s))
+          TimestampType
+        } catch {
+          case NonFatal(_) => parseBoolean()
+        }
+      }
+
+      def parseBoolean(): DataType = {
+        if (s == "true") {
+          builder.appendBoolean(true)
+          BooleanType
+        } else if (s == "false") {
+          builder.appendBoolean(false)
+          BooleanType
+        } else {
+          parseString()
+        }
+      }
+
+      def parseString(): DataType = {
+        builder.appendString(s)
+        StringType
+      }
+
+      val newType = currentType match {

Review Comment:
   Add `TimestampNTZType`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -386,6 +442,113 @@ class UnivocityParser(
       }
     }
   }
+
+  /**
+   * This class converts a comma-separated value into a variant column (when 
the schema contains
+   * variant type) or a variant field (when in singleVariantColumn mode).
+   *
+   * It has a list of scalar types to try (long, decimal, date, timestamp, 
boolean) and maintains
+   * the current content type. It tries to parse the input as the current 
content type. If the
+   * parsing fails, it moves to the next type in the list and continues the 
trial. It never checks
+   * the previous types that have already failed. In the end, it either 
successfully parses the
+   * input as a specific scalar type, or fails after trying all the types and 
defaults to the string
+   * type. The state is reset for every input file.
+   *
+   * Floating point types (double, float) are not considered to avoid 
precision loss.
+   */
+  private final class VariantValueConverter extends ValueConverter {
+    private var currentType: DataType = LongType
+
+    override def apply(s: String): Any = {
+      val builder = new VariantBuilder(false)
+      convertInput(builder, s)
+      val v = builder.result()
+      new VariantVal(v.getValue, v.getMetadata)
+    }
+
+    def convertInput(builder: VariantBuilder, s: String): Unit = {
+      if (s == null || s == options.nullValue) {
+        builder.appendNull()
+        return
+      }
+
+      def parseLong(): DataType = {
+        try {
+          builder.appendLong(s.toLong)
+          // The actual integral type doesn't matter. `appendLong` will use 
the smallest possible
+          // integral type to store the value.
+          LongType
+        } catch {
+          case NonFatal(_) => parseDecimal()
+        }
+      }
+
+      def parseDecimal(): DataType = {
+        try {
+          var d = decimalParser(s)
+          if (d.scale() < 0) {
+            d = d.setScale(0)
+          }
+          if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION &&
+            d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) {
+            builder.appendDecimal(d)
+            // The actual decimal type doesn't matter. `appendDecimal` will 
use the smallest
+            // possible decimal type to store the value.
+            DecimalType.USER_DEFAULT
+          } else {
+            parseDate()
+          }
+        } catch {
+          case NonFatal(_) => parseDate()

Review Comment:
   parseDate if options.preferDate is set, else parseTimestampNTZ



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -386,6 +442,113 @@ class UnivocityParser(
       }
     }
   }
+
+  /**
+   * This class converts a comma-separated value into a variant column (when 
the schema contains
+   * variant type) or a variant field (when in singleVariantColumn mode).
+   *
+   * It has a list of scalar types to try (long, decimal, date, timestamp, 
boolean) and maintains
+   * the current content type. It tries to parse the input as the current 
content type. If the
+   * parsing fails, it moves to the next type in the list and continues the 
trial. It never checks
+   * the previous types that have already failed. In the end, it either 
successfully parses the
+   * input as a specific scalar type, or fails after trying all the types and 
defaults to the string
+   * type. The state is reset for every input file.
+   *
+   * Floating point types (double, float) are not considered to avoid 
precision loss.
+   */
+  private final class VariantValueConverter extends ValueConverter {
+    private var currentType: DataType = LongType
+
+    override def apply(s: String): Any = {
+      val builder = new VariantBuilder(false)
+      convertInput(builder, s)
+      val v = builder.result()
+      new VariantVal(v.getValue, v.getMetadata)
+    }
+
+    def convertInput(builder: VariantBuilder, s: String): Unit = {
+      if (s == null || s == options.nullValue) {
+        builder.appendNull()
+        return
+      }
+
+      def parseLong(): DataType = {
+        try {
+          builder.appendLong(s.toLong)
+          // The actual integral type doesn't matter. `appendLong` will use 
the smallest possible
+          // integral type to store the value.
+          LongType
+        } catch {
+          case NonFatal(_) => parseDecimal()
+        }
+      }
+
+      def parseDecimal(): DataType = {
+        try {
+          var d = decimalParser(s)
+          if (d.scale() < 0) {
+            d = d.setScale(0)
+          }
+          if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION &&
+            d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) {
+            builder.appendDecimal(d)
+            // The actual decimal type doesn't matter. `appendDecimal` will 
use the smallest
+            // possible decimal type to store the value.
+            DecimalType.USER_DEFAULT
+          } else {
+            parseDate()
+          }
+        } catch {
+          case NonFatal(_) => parseDate()
+        }
+      }
+
+      def parseDate(): DataType = {
+        try {
+          builder.appendDate(dateFormatter.parse(s))
+          DateType
+        } catch {
+          case NonFatal(_) => parseTimestamp()
+        }
+      }
+
+      def parseTimestamp(): DataType = {
+        try {
+          builder.appendTimestamp(timestampFormatter.parse(s))
+          TimestampType
+        } catch {
+          case NonFatal(_) => parseBoolean()
+        }
+      }
+
+      def parseBoolean(): DataType = {
+        if (s == "true") {
+          builder.appendBoolean(true)
+          BooleanType
+        } else if (s == "false") {

Review Comment:
   ```suggestion
           val str = s.toLowerCase(Locale.ROOT)
           if (str == "true") {
             builder.appendBoolean(true)
             BooleanType
           } else if (str == "false") {
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala:
##########
@@ -760,12 +760,9 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
       context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
     )
 
-    checkError(
-      exception = intercept[SparkUnsupportedOperationException] {
-        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()
-      },
-      condition = "UNSUPPORTED_DATATYPE",
-      parameters = Map("typeName" -> "\"VARIANT\"")
+    checkAnswer(
+      df.select(from_csv(lit("1,2,3"), valueSchema, Map.empty[String, 
String])),
+      Seq(Row(Row(1L, "2", new VariantVal(Array[Byte](12, 3), Array[Byte](1, 
0, 0)))))
     )

Review Comment:
   Pls address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to