Zongwen Li created FLINK-24726:
----------------------------------

             Summary: Hive Lookup Join SQL: decimal type 
UnsupportedOperationException
                 Key: FLINK-24726
                 URL: https://issues.apache.org/jira/browse/FLINK-24726
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Runtime
    Affects Versions: 1.13.3, 1.14.0
            Reporter: Zongwen Li


exception trace:
{code:java}
java.lang.UnsupportedOperationException: 
org.apache.flink.hive.shaded.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionaryjava.lang.UnsupportedOperationException:
 
org.apache.flink.hive.shaded.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
 at 
org.apache.flink.hive.shaded.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)
 at 
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetDictionary.decodeToLong(ParquetDictionary.java:42)
 at 
org.apache.flink.table.data.vector.heap.HeapLongVector.getLong(HeapLongVector.java:47)
 at 
org.apache.flink.hive.shaded.formats.parquet.vector.ParquetDecimalVector.getDecimal(ParquetDecimalVector.java:48)
 at 
org.apache.flink.table.data.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:116)
 at 
org.apache.flink.table.data.ColumnarRowData.getDecimal(ColumnarRowData.java:119)
 at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
 at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
 at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138)
 at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 at LookupFunction$159.flatMap(Unknown Source) at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 at StreamExecCalc$913.processElement_split139(Unknown Source) at 
StreamExecCalc$913.processElement(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:334)
 at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:219)
 at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:124)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at 
java.lang.Thread.run(Thread.java:748)
{code}
hive primitive type:

fixed_len_byte_array(5) list_price (DECIMAL(10,2));

This type of lookup join operation will cause an exception;

 

root error:
{code:java}
//org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader#readToVector
public final void readToVector(int readNumber, VECTOR vector) throws 
IOException {
    int rowId = 0;
    WritableIntVector dictionaryIds = null;
    if (dictionary != null) {
        dictionaryIds = vector.reserveDictionaryIds(readNumber);
    }
    while (readNumber > 0) {
        // ......
        if (isCurrentPageDictionaryEncoded) {
            runLenDecoder.readDictionaryIds(
                    num, dictionaryIds, vector, rowId, maxDefLevel, 
this.dictionaryIdsDecoder);

            if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
                // The problem is here:
                // PlainBinaryDictionary has been added to HeapLongVector
                vector.setDictionary(new ParquetDictionary(dictionary));
            } else {
                readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
            }
        } else {
            if (vector.hasDictionary() && rowId != 0) {
                readBatchFromDictionaryIds(0, rowId, vector, 
vector.getDictionaryIds());
            }
            vector.setDictionary(null);
            readBatch(rowId, num, vector);
        }

        valuesRead += num;
        rowId += num;
        readNumber -= num;
    }
}

{code}
Because the ??#readBatch?? and ??#readBatchFromDictionaryIds?? of  
??org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader 
??were not called, an exception was thrown.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to