sandip-db commented on code in PR #50052: URL: https://github.com/apache/spark/pull/50052#discussion_r2023879996
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ########## @@ -386,6 +442,113 @@ class UnivocityParser( } } } + + /** + * This class converts a comma-separated value into a variant column (when the schema contains + * variant type) or a variant field (when in singleVariantColumn mode). + * + * It has a list of scalar types to try (long, decimal, date, timestamp, boolean) and maintains + * the current content type. It tries to parse the input as the current content type. If the + * parsing fails, it moves to the next type in the list and continues the trial. It never checks + * the previous types that have already failed. In the end, it either successfully parses the + * input as a specific scalar type, or fails after trying all the types and defaults to the string + * type. The state is reset for every input file. + * + * Floating point types (double, float) are not considered to avoid precision loss. + */ + private final class VariantValueConverter extends ValueConverter { + private var currentType: DataType = LongType + + override def apply(s: String): Any = { + val builder = new VariantBuilder(false) + convertInput(builder, s) + val v = builder.result() + new VariantVal(v.getValue, v.getMetadata) + } + + def convertInput(builder: VariantBuilder, s: String): Unit = { + if (s == null || s == options.nullValue) { + builder.appendNull() + return + } + + def parseLong(): DataType = { + try { + builder.appendLong(s.toLong) + // The actual integral type doesn't matter. `appendLong` will use the smallest possible + // integral type to store the value. + LongType + } catch { + case NonFatal(_) => parseDecimal() + } + } + + def parseDecimal(): DataType = { + try { + var d = decimalParser(s) + if (d.scale() < 0) { + d = d.setScale(0) + } + if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION && + d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) { + builder.appendDecimal(d) + // The actual decimal type doesn't matter. `appendDecimal` will use the smallest + // possible decimal type to store the value. + DecimalType.USER_DEFAULT + } else { + parseDate() + } + } catch { + case NonFatal(_) => parseDate() + } + } + + def parseDate(): DataType = { + try { + builder.appendDate(dateFormatter.parse(s)) + DateType + } catch { + case NonFatal(_) => parseTimestamp() + } + } + + def parseTimestamp(): DataType = { + try { + builder.appendTimestamp(timestampFormatter.parse(s)) + TimestampType + } catch { + case NonFatal(_) => parseBoolean() + } + } + + def parseBoolean(): DataType = { + if (s == "true") { + builder.appendBoolean(true) + BooleanType + } else if (s == "false") { + builder.appendBoolean(false) + BooleanType + } else { + parseString() + } + } + + def parseString(): DataType = { + builder.appendString(s) + StringType + } + + val newType = currentType match { Review Comment: Add `TimestampNTZType` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ########## @@ -386,6 +442,113 @@ class UnivocityParser( } } } + + /** + * This class converts a comma-separated value into a variant column (when the schema contains + * variant type) or a variant field (when in singleVariantColumn mode). + * + * It has a list of scalar types to try (long, decimal, date, timestamp, boolean) and maintains + * the current content type. It tries to parse the input as the current content type. If the + * parsing fails, it moves to the next type in the list and continues the trial. It never checks + * the previous types that have already failed. In the end, it either successfully parses the + * input as a specific scalar type, or fails after trying all the types and defaults to the string + * type. The state is reset for every input file. + * + * Floating point types (double, float) are not considered to avoid precision loss. + */ + private final class VariantValueConverter extends ValueConverter { + private var currentType: DataType = LongType + + override def apply(s: String): Any = { + val builder = new VariantBuilder(false) + convertInput(builder, s) + val v = builder.result() + new VariantVal(v.getValue, v.getMetadata) + } + + def convertInput(builder: VariantBuilder, s: String): Unit = { + if (s == null || s == options.nullValue) { + builder.appendNull() + return + } + + def parseLong(): DataType = { + try { + builder.appendLong(s.toLong) + // The actual integral type doesn't matter. `appendLong` will use the smallest possible + // integral type to store the value. + LongType + } catch { + case NonFatal(_) => parseDecimal() + } + } + + def parseDecimal(): DataType = { + try { + var d = decimalParser(s) + if (d.scale() < 0) { + d = d.setScale(0) + } + if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION && + d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) { + builder.appendDecimal(d) + // The actual decimal type doesn't matter. `appendDecimal` will use the smallest + // possible decimal type to store the value. + DecimalType.USER_DEFAULT + } else { + parseDate() + } + } catch { + case NonFatal(_) => parseDate() Review Comment: parseDate if options.preferDate is set, else parseTimestampNTZ ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ########## @@ -386,6 +442,113 @@ class UnivocityParser( } } } + + /** + * This class converts a comma-separated value into a variant column (when the schema contains + * variant type) or a variant field (when in singleVariantColumn mode). + * + * It has a list of scalar types to try (long, decimal, date, timestamp, boolean) and maintains + * the current content type. It tries to parse the input as the current content type. If the + * parsing fails, it moves to the next type in the list and continues the trial. It never checks + * the previous types that have already failed. In the end, it either successfully parses the + * input as a specific scalar type, or fails after trying all the types and defaults to the string + * type. The state is reset for every input file. + * + * Floating point types (double, float) are not considered to avoid precision loss. + */ + private final class VariantValueConverter extends ValueConverter { + private var currentType: DataType = LongType + + override def apply(s: String): Any = { + val builder = new VariantBuilder(false) + convertInput(builder, s) + val v = builder.result() + new VariantVal(v.getValue, v.getMetadata) + } + + def convertInput(builder: VariantBuilder, s: String): Unit = { + if (s == null || s == options.nullValue) { + builder.appendNull() + return + } + + def parseLong(): DataType = { + try { + builder.appendLong(s.toLong) + // The actual integral type doesn't matter. `appendLong` will use the smallest possible + // integral type to store the value. + LongType + } catch { + case NonFatal(_) => parseDecimal() + } + } + + def parseDecimal(): DataType = { + try { + var d = decimalParser(s) + if (d.scale() < 0) { + d = d.setScale(0) + } + if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION && + d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) { + builder.appendDecimal(d) + // The actual decimal type doesn't matter. `appendDecimal` will use the smallest + // possible decimal type to store the value. + DecimalType.USER_DEFAULT + } else { + parseDate() + } + } catch { + case NonFatal(_) => parseDate() + } + } + + def parseDate(): DataType = { + try { + builder.appendDate(dateFormatter.parse(s)) + DateType + } catch { + case NonFatal(_) => parseTimestamp() + } + } + + def parseTimestamp(): DataType = { + try { + builder.appendTimestamp(timestampFormatter.parse(s)) + TimestampType + } catch { + case NonFatal(_) => parseBoolean() + } + } + + def parseBoolean(): DataType = { + if (s == "true") { + builder.appendBoolean(true) + BooleanType + } else if (s == "false") { Review Comment: ```suggestion val str = s.toLowerCase(Locale.ROOT) if (str == "true") { builder.appendBoolean(true) BooleanType } else if (str == "false") { ``` ########## sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala: ########## @@ -760,12 +760,9 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) ) - checkError( - exception = intercept[SparkUnsupportedOperationException] { - df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() - }, - condition = "UNSUPPORTED_DATATYPE", - parameters = Map("typeName" -> "\"VARIANT\"") + checkAnswer( + df.select(from_csv(lit("1,2,3"), valueSchema, Map.empty[String, String])), + Seq(Row(Row(1L, "2", new VariantVal(Array[Byte](12, 3), Array[Byte](1, 0, 0))))) ) Review Comment: Pls address this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org