Zongwen Li created FLINK-24726: ---------------------------------- Summary: Hive Lookup Join SQL: decimal type UnsupportedOperationException Key: FLINK-24726 URL: https://issues.apache.org/jira/browse/FLINK-24726 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Runtime Affects Versions: 1.13.3, 1.14.0 Reporter: Zongwen Li
exception trace: {code:java} java.lang.UnsupportedOperationException: org.apache.flink.hive.shaded.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionaryjava.lang.UnsupportedOperationException: org.apache.flink.hive.shaded.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary at org.apache.flink.hive.shaded.parquet.column.Dictionary.decodeToLong(Dictionary.java:49) at org.apache.flink.hive.shaded.formats.parquet.vector.ParquetDictionary.decodeToLong(ParquetDictionary.java:42) at org.apache.flink.table.data.vector.heap.HeapLongVector.getLong(HeapLongVector.java:47) at org.apache.flink.hive.shaded.formats.parquet.vector.ParquetDecimalVector.getDecimal(ParquetDecimalVector.java:48) at org.apache.flink.table.data.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:116) at org.apache.flink.table.data.ColumnarRowData.getDecimal(ColumnarRowData.java:119) at org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233) at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138) at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) at LookupFunction$159.flatMap(Unknown Source) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at StreamExecCalc$913.processElement_split139(Unknown Source) at StreamExecCalc$913.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:334) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:219) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:124) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) {code} hive primitive type: fixed_len_byte_array(5) list_price (DECIMAL(10,2)); This type of lookup join operation will cause an exception; root error: {code:java} //org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader#readToVector public final void readToVector(int readNumber, VECTOR vector) throws IOException { int rowId = 0; WritableIntVector dictionaryIds = null; if (dictionary != null) { dictionaryIds = vector.reserveDictionaryIds(readNumber); } while (readNumber > 0) { // ...... if (isCurrentPageDictionaryEncoded) { runLenDecoder.readDictionaryIds( num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder); if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) { // The problem is here: // PlainBinaryDictionary has been added to HeapLongVector vector.setDictionary(new ParquetDictionary(dictionary)); } else { readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds); } } else { if (vector.hasDictionary() && rowId != 0) { readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds()); } vector.setDictionary(null); readBatch(rowId, num, vector); } valuesRead += num; rowId += num; readNumber -= num; } } {code} Because the ??#readBatch?? and ??#readBatchFromDictionaryIds?? of ??org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader ??were not called, an exception was thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)