sandip-db commented on code in PR #50517:
URL: https://github.com/apache/spark/pull/50517#discussion_r2030644453


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala:
##########
@@ -71,4 +76,38 @@ object DataSourceOptions {
   // as a single VARIANT type column in the table with the given column name.
   // E.g. 
spark.read.format("<data-source-format>").option("singleVariantColumn", 
"colName")
   val SINGLE_VARIANT_COLUMN = "singleVariantColumn"
+  // The common option name for all data sources that supports corrupt record. 
In case of a parsing
+  // error, the record will be stored as a string in the column with the given 
name.
+  // Theoretically, the behavior of this option is not affected by the parsing 
mode
+  // (PERMISSIVE/FAILFAST/DROPMALFORMED). However, the corrupt record is only 
visible to the user
+  // when in PERMISSIVE mode, because the queries will fail in FAILFAST mode, 
or the row containing
+  // the corrupt record will be dropped in DROPMALFORMED mode.
+  val COLUMN_NAME_OF_CORRUPT_RECORD = "columnNameOfCorruptRecord"
+
+  // When `singleVariantColumn` is enabled and there is a user-specified 
schema, the schema must
+  // either be a variant field, or a variant field plus a corrupt column field.
+  def validateSingleVariantColumn(
+      options: CaseInsensitiveMap[String],
+      userSpecifiedSchema: Option[StructType]): Unit = {
+    (options.get(SINGLE_VARIANT_COLUMN), userSpecifiedSchema) match {
+      case (Some(col), Some(schema)) =>
+        var valid = schema.fields.exists { f =>
+          f.dataType.isInstanceOf[VariantType] && f.name == col && f.nullable
+        }
+        schema.length match {
+          case 1 =>
+          case 2 =>
+            val corruptRecordName = options.getOrElse(

Review Comment:
   nit:
   ```suggestion
               val corruptRecordColumnName = options.getOrElse(
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala:
##########
@@ -396,7 +396,7 @@ object CSVOptions extends DataSourceOptions {
   val EMPTY_VALUE = newOption("emptyValue")
   val LINE_SEP = newOption("lineSep")
   val INPUT_BUFFER_SIZE = newOption("inputBufferSize")
-  val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord")
+  val COLUMN_NAME_OF_CORRUPT_RECORD = 
newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD)

Review Comment:
   Add from_csv and csv file reader test with corrupt column and variant



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala:
##########
@@ -4007,6 +4007,7 @@ abstract class JsonSuite
       "true",
       """{"a": [], "b": null}""",
       """{"a": 1}""",
+      "bad json",

Review Comment:
   Add from_json test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to