[ https://issues.apache.org/jira/browse/FLINK-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680849#comment-17680849 ]
Sergii Mikhtoniuk edited comment on FLINK-17804 at 1/26/23 1:13 AM: -------------------------------------------------------------------- I think this ticket can be closed. Flink 1.6 still has issues following Parquet spec: - FLINK-25565 with timestamp representation - using {{byte_array}} for decimals instead of {{fixed_len_byte_array}} as spec prescribes (when writing via {{AvroSchemaConverter}} and {{{}RowDataToAvroConverters{}}}) which causes incompatibility with Apache Spark But the decoding of decimals seems to be fixed. was (Author: sergiimk): I think this ticker can be closed. Flink 1.6 still has issues following Parquet spec: - FLINK-25565 with timestamp representation - using {{byte_array}} for decimals instead of {{fixed_len_byte_array}} as spec prescribes (when writing via {{AvroSchemaConverter}} and {{RowDataToAvroConverters}}) which causes incompatibility with Apache Spark But the decoding of decimals seems to be fixed. > Follow the spec when decoding Parquet logical DECIMAL type > ---------------------------------------------------------- > > Key: FLINK-17804 > URL: https://issues.apache.org/jira/browse/FLINK-17804 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.10.1 > Reporter: Sergii Mikhtoniuk > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > decimal, parquet, pull-request-available, spark > > When reading a Parquet file (produced by Spark 2.4.0 with default > configuration) Flink's {{ParquetRowInputFormat}} fails with > {{NumberFormatException}}. > After debugging this it seems that Flink doesn't follow the Parquet spec on > [encoding DECIMAL logical > type|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal] > The Parquet schema for this field is: > {code} > optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4)); > {code} > If I understand the spec correctly, it says that the value should contain a > binary representation of an unscaled decimal. Flink's > [RowConverter|https://github.com/apache/flink/blob/release-1.10.1/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java#L202] > however treats it as a base-10 UTF-8 string. > What Flink essentially is doing: > {code} > val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64) > val decimal = new java.math.BigDecimal(new String(binary, > "UTF-8").toCharArray) > {code} > What I think spec suggests: > {code} > val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64) > val unscaled = new java.math.BigInteger(binary) > val decimal = new java.math.BigDecimal(unscaled) > {code} > Error stacktrace: > {code} > java.lang.NumberFormatException > at java.math.BigDecimal.<init>(BigDecimal.java:497) > at java.math.BigDecimal.<init>(BigDecimal.java:383) > at java.math.BigDecimal.<init>(BigDecimal.java:680) > at > org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary(RowConverter.java:202) > at > org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317) > at > org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367) > at > org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406) > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235) > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) > at > org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327) > {code} > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)