sandip-db commented on code in PR #50517: URL: https://github.com/apache/spark/pull/50517#discussion_r2030644453
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala: ########## @@ -71,4 +76,38 @@ object DataSourceOptions { // as a single VARIANT type column in the table with the given column name. // E.g. spark.read.format("<data-source-format>").option("singleVariantColumn", "colName") val SINGLE_VARIANT_COLUMN = "singleVariantColumn" + // The common option name for all data sources that supports corrupt record. In case of a parsing + // error, the record will be stored as a string in the column with the given name. + // Theoretically, the behavior of this option is not affected by the parsing mode + // (PERMISSIVE/FAILFAST/DROPMALFORMED). However, the corrupt record is only visible to the user + // when in PERMISSIVE mode, because the queries will fail in FAILFAST mode, or the row containing + // the corrupt record will be dropped in DROPMALFORMED mode. + val COLUMN_NAME_OF_CORRUPT_RECORD = "columnNameOfCorruptRecord" + + // When `singleVariantColumn` is enabled and there is a user-specified schema, the schema must + // either be a variant field, or a variant field plus a corrupt column field. + def validateSingleVariantColumn( + options: CaseInsensitiveMap[String], + userSpecifiedSchema: Option[StructType]): Unit = { + (options.get(SINGLE_VARIANT_COLUMN), userSpecifiedSchema) match { + case (Some(col), Some(schema)) => + var valid = schema.fields.exists { f => + f.dataType.isInstanceOf[VariantType] && f.name == col && f.nullable + } + schema.length match { + case 1 => + case 2 => + val corruptRecordName = options.getOrElse( Review Comment: nit: ```suggestion val corruptRecordColumnName = options.getOrElse( ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala: ########## @@ -396,7 +396,7 @@ object CSVOptions extends DataSourceOptions { val EMPTY_VALUE = newOption("emptyValue") val LINE_SEP = newOption("lineSep") val INPUT_BUFFER_SIZE = newOption("inputBufferSize") - val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord") + val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD) Review Comment: Add from_csv and csv file reader test with corrupt column and variant ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala: ########## @@ -4007,6 +4007,7 @@ abstract class JsonSuite "true", """{"a": [], "b": null}""", """{"a": 1}""", + "bad json", Review Comment: Add from_json test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org