chenhao-db commented on code in PR #50052:
URL: https://github.com/apache/spark/pull/50052#discussion_r1976506479


##########
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala:
##########
@@ -759,14 +759,6 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
         "sqlExpr" -> "\"to_csv(value)\""),
       context = ExpectedContext(fragment = "to_csv", 
getCurrentClassCallSitePattern)
     )
-
-    checkError(
-      exception = intercept[SparkUnsupportedOperationException] {
-        df.select(from_csv(lit("data"), valueSchema, Map.empty[String, 
String])).collect()

Review Comment:
   Done.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -68,6 +69,11 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    dataSchema.foreach { field =>
+      if (!supportDataType(field.dataType, allowVariant = false)) {
+        throw 
QueryCompilationErrors.dataTypeUnsupportedByDataSourceError("CSV", field)

Review Comment:
   The plan is to never support it.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
     val textParsingException = 
malformedCSVException.getCause.asInstanceOf[TextParsingException]
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
+
+  test("csv with variant") {
+    withTempPath { path =>
+      val data =
+        """field 1,field2
+          |1.1,1e9
+          |,"hello
+          |world",true
+          |""".stripMargin
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+      def checkSingleVariant(options: Map[String, String], expected: String*): 
Unit = {
+        val allOptions = options ++ Map("singleVariantColumn" -> "v")
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as 
string)"),
+          expected.map(Row(_))
+        )
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("count(*)"),
+          Row(expected.length)
+        )
+      }
+
+      checkSingleVariant(Map(),
+        """{"_c0":"field 1","_c1":"field2"}""",
+        """{"_c0":1.1,"_c1":"1e9"}""",
+        """{"_c0":null,"_c1":"hello"}""",
+        """{"_c0":"world\"","_c1":"true"}""")
+
+      checkSingleVariant(Map("header" -> "true"),
+        """{"field 1":1.1,"field2":"1e9"}""",
+        """{"field 1":null,"field2":"hello"}""",
+        """{"field 1":"world\"","field2":"true"}""")
+
+      checkSingleVariant(Map("multiLine" -> "true"),
+        """{"_c0":"field 1","_c1":"field2"}""",
+        """{"_c0":1.1,"_c1":"1e9"}""",
+        """{"_c0":null,"_c1":"hello\nworld","_c2":"true"}""")
+
+      checkSingleVariant(Map("multiLine" -> "true", "header" -> "true"),
+        """{"field 1":1.1,"field2":"1e9"}""",
+        """{"field 1":null,"field2":"hello\nworld"}""")
+
+      checkError(
+        exception = intercept[SparkException] {
+          val options = Map("singleVariantColumn" -> "v", "multiLine" -> 
"true", "header" -> "true",
+            "mode" -> "failfast")
+          spark.read.options(options).csv(path.getCanonicalPath).collect()
+        }.getCause.asInstanceOf[SparkException],
+        condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
+        parameters = Map("badRecord" -> """[{"field 
1":null,"field2":"hello\nworld"}]""",
+          "failFastMode" -> "FAILFAST")
+      )
+
+      def checkSchema(options: Map[String, String], expected: (String, 
String)*): Unit = {
+        checkAnswer(
+          spark.read.options(options).schema("`field 1` variant, field2 
variant")

Review Comment:
   Existing test cases with null results can validate this.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3492,6 +3493,100 @@ abstract class CSVSuite
     val textParsingException = 
malformedCSVException.getCause.asInstanceOf[TextParsingException]
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
+
+  test("csv with variant") {
+    withTempPath { path =>
+      val data =
+        """field 1,field2
+          |1.1,1e9
+          |,"hello
+          |world",true
+          |""".stripMargin
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+
+      def checkSingleVariant(options: Map[String, String], expected: String*): 
Unit = {
+        val allOptions = options ++ Map("singleVariantColumn" -> "v")
+        checkAnswer(
+          
spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as 
string)"),

Review Comment:
   The to-string result is already enough for validation.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala:
##########
@@ -48,6 +48,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
     val csvDataSource = CSVDataSource(parsedOptions)
+    !parsedOptions.needHeaderForSingleVariantColumn &&

Review Comment:
   It is not very simple to implement. The current way to split a file when 
`header` is true is to avoid checking header when not at the start of the file. 
However, when we have both `singleVariantColumn` and `header`, we need to read 
the start line of the file, which is not straightforward in the current API 
(for example, `parseIterator` doesn't know anything about the "file", it can 
only see certain lines). This could be a future optimization.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -161,9 +164,12 @@ class UnivocityParser(
   // Each input token is placed in each output row's position by mapping 
these. In this case,
   //
   //   output row - ["A", 2]
-  private val valueConverters: Array[ValueConverter] = {
-    requiredSchema.map(f => makeConverter(f.name, f.dataType, 
f.nullable)).toArray
-  }
+  private val valueConverters: Array[ValueConverter] =
+    if (options.singleVariantColumn.isDefined) {
+      null
+    } else {
+      requiredSchema.map(f => makeConverter(f.name, f.dataType, 
f.nullable)).toArray
+    }

Review Comment:
   I tend to keep my current code. `lazy` is not free.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to