Sergii Mikhtoniuk created FLINK-17804: -----------------------------------------
Summary: Follow the spec when decoding Parquet logical DECIMAL type Key: FLINK-17804 URL: Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.10.1 Reporter: Sergii Mikhtoniuk When reading a Parquet file (produced by Spark 2.4.0 with default configuration) Flink's {{ParquetRowInputFormat}} fails with {{NumberFormatException}}. After debugging this it seems that Flink doesn't follow the Parquet spec on [encoding DECIMAL logical type|] The Parquet schema for this field is: {code} optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4)); {code} If I understand the spec correctly, it says that the value should contain a binary representation of an unscaled decimal. Flink's [RowConverter|] however treats it as a base-10 UTF-8 string. What Flink essentially is doing: {code} val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64) val decimal = new java.math.BigDecimal(new String(binary, "UTF-8").toCharArray) {code} What I think spec suggests: {code} val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64) val unscaled = new java.math.BigInteger(binary) val decimal = new java.math.BigDecimal(unscaled) {code} Error stacktrace: {code} java.lang.NumberFormatException at java.math.BigDecimal.<init>( at java.math.BigDecimal.<init>( at java.math.BigDecimal.<init>( at org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary( at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue( at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter( at at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord( at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd( at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd( at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ {code} Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)