Qinghui Xu created FLINK-35637:
----------------------------------

             Summary: ScalarFunctionCallGen does not handle complex argument 
type properly
                 Key: FLINK-35637
                 URL: https://issues.apache.org/jira/browse/FLINK-35637
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Qinghui Xu


When trying to use a UDF that expects argument as `Array<RowData>`, error is 
raised:

```

{{}}java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData 
cannot be cast to org.apache.flink.table.data.RawValueData

at 
org.apache.flink.table.data.GenericArrayData.getRawValue(GenericArrayData.java:223)

at 
org.apache.flink.table.data.ArrayData.lambda$createElementGetter$95d74a6c$1(ArrayData.java:224)
 at 
org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1223)

at 
org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:106)

at 
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1175)

at 
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1115)

at 
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:419)

at StreamExecCalc$1560.processElement(Unknown Source)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)

at MyUDFExpectingRowDataArray$$anonfun$run$1.apply

at MyUDFExpectingRowDataArray$$anonfun$run$1.apply   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at 
com.criteo.featureflow.flink.datadisco.test.JsonFileRowDataSource.run(TestBlinkGlupTableSource.scala:65)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

```

After digging into the `ScalarFunctionCallGen`, it turns out it's trying to 
treat the argument as a `RAW` type while it should be a `ROW`.

The root cause seems to be that the codegen relies solely on the 
`ScalarFunction` signature to refer the type which is the "external type". It 
should instead take into consideration the type of the operand and bridge to 
the external type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to