Qinghui Xu created FLINK-35637: ---------------------------------- Summary: ScalarFunctionCallGen does not handle complex argument type properly Key: FLINK-35637 URL: https://issues.apache.org/jira/browse/FLINK-35637 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Qinghui Xu
When trying to use a UDF that expects argument as `Array<RowData>`, error is raised: ``` {{}}java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.table.data.RawValueData at org.apache.flink.table.data.GenericArrayData.getRawValue(GenericArrayData.java:223) at org.apache.flink.table.data.ArrayData.lambda$createElementGetter$95d74a6c$1(ArrayData.java:224) at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1223) at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:106) at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1175) at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1115) at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:419) at StreamExecCalc$1560.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) at MyUDFExpectingRowDataArray$$anonfun$run$1.apply at MyUDFExpectingRowDataArray$$anonfun$run$1.apply at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at com.criteo.featureflow.flink.datadisco.test.JsonFileRowDataSource.run(TestBlinkGlupTableSource.scala:65) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ``` After digging into the `ScalarFunctionCallGen`, it turns out it's trying to treat the argument as a `RAW` type while it should be a `ROW`. The root cause seems to be that the codegen relies solely on the `ScalarFunction` signature to refer the type which is the "external type". It should instead take into consideration the type of the operand and bridge to the external type. -- This message was sent by Atlassian Jira (v8.20.10#820010)