chenhao-db commented on code in PR #50052: URL: https://github.com/apache/spark/pull/50052#discussion_r1976506479
########## sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala: ########## @@ -759,14 +759,6 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { "sqlExpr" -> "\"to_csv(value)\""), context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) ) - - checkError( - exception = intercept[SparkUnsupportedOperationException] { - df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() Review Comment: Done. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala: ########## @@ -68,6 +69,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + dataSchema.foreach { field => + if (!supportDataType(field.dataType, allowVariant = false)) { + throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError("CSV", field) Review Comment: The plan is to never support it. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala: ########## @@ -3492,6 +3493,100 @@ abstract class CSVSuite val textParsingException = malformedCSVException.getCause.asInstanceOf[TextParsingException] assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException]) } + + test("csv with variant") { + withTempPath { path => + val data = + """field 1,field2 + |1.1,1e9 + |,"hello + |world",true + |""".stripMargin + Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8)) + + def checkSingleVariant(options: Map[String, String], expected: String*): Unit = { + val allOptions = options ++ Map("singleVariantColumn" -> "v") + checkAnswer( + spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as string)"), + expected.map(Row(_)) + ) + checkAnswer( + spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("count(*)"), + Row(expected.length) + ) + } + + checkSingleVariant(Map(), + """{"_c0":"field 1","_c1":"field2"}""", + """{"_c0":1.1,"_c1":"1e9"}""", + """{"_c0":null,"_c1":"hello"}""", + """{"_c0":"world\"","_c1":"true"}""") + + checkSingleVariant(Map("header" -> "true"), + """{"field 1":1.1,"field2":"1e9"}""", + """{"field 1":null,"field2":"hello"}""", + """{"field 1":"world\"","field2":"true"}""") + + checkSingleVariant(Map("multiLine" -> "true"), + """{"_c0":"field 1","_c1":"field2"}""", + """{"_c0":1.1,"_c1":"1e9"}""", + """{"_c0":null,"_c1":"hello\nworld","_c2":"true"}""") + + checkSingleVariant(Map("multiLine" -> "true", "header" -> "true"), + """{"field 1":1.1,"field2":"1e9"}""", + """{"field 1":null,"field2":"hello\nworld"}""") + + checkError( + exception = intercept[SparkException] { + val options = Map("singleVariantColumn" -> "v", "multiLine" -> "true", "header" -> "true", + "mode" -> "failfast") + spark.read.options(options).csv(path.getCanonicalPath).collect() + }.getCause.asInstanceOf[SparkException], + condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", + parameters = Map("badRecord" -> """[{"field 1":null,"field2":"hello\nworld"}]""", + "failFastMode" -> "FAILFAST") + ) + + def checkSchema(options: Map[String, String], expected: (String, String)*): Unit = { + checkAnswer( + spark.read.options(options).schema("`field 1` variant, field2 variant") Review Comment: Existing test cases with null results can validate this. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala: ########## @@ -3492,6 +3493,100 @@ abstract class CSVSuite val textParsingException = malformedCSVException.getCause.asInstanceOf[TextParsingException] assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException]) } + + test("csv with variant") { + withTempPath { path => + val data = + """field 1,field2 + |1.1,1e9 + |,"hello + |world",true + |""".stripMargin + Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8)) + + def checkSingleVariant(options: Map[String, String], expected: String*): Unit = { + val allOptions = options ++ Map("singleVariantColumn" -> "v") + checkAnswer( + spark.read.options(allOptions).csv(path.getCanonicalPath).selectExpr("cast(v as string)"), Review Comment: The to-string result is already enough for validation. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala: ########## @@ -48,6 +48,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { columnPruning = sparkSession.sessionState.conf.csvColumnPruning, sparkSession.sessionState.conf.sessionLocalTimeZone) val csvDataSource = CSVDataSource(parsedOptions) + !parsedOptions.needHeaderForSingleVariantColumn && Review Comment: It is not very simple to implement. The current way to split a file when `header` is true is to avoid checking header when not at the start of the file. However, when we have both `singleVariantColumn` and `header`, we need to read the start line of the file, which is not straightforward in the current API (for example, `parseIterator` doesn't know anything about the "file", it can only see certain lines). This could be a future optimization. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ########## @@ -161,9 +164,12 @@ class UnivocityParser( // Each input token is placed in each output row's position by mapping these. In this case, // // output row - ["A", 2] - private val valueConverters: Array[ValueConverter] = { - requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable)).toArray - } + private val valueConverters: Array[ValueConverter] = + if (options.singleVariantColumn.isDefined) { + null + } else { + requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable)).toArray + } Review Comment: I tend to keep my current code. `lazy` is not free. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org