Hi Shengkai,

In order to reproduce the issue, the input argument type must be
`Object[]`. Also DPScalarFunction is a typo and should be ScalarFunction.
Are you able to observe the error if you try with the changed input type:

public static class ListToString extends ScalarFunction {
    public String eval(Object[] arr) {
        return "foo";
    }
}

The UDF should be able to take as input any type of array (e.g., array of
strings, ints, or longs) and still return a string regardless of the input.

We are blocked on upgrading due to some unrelated issues, so hoping to
determine if this is a legitimate issue with 1.11 or not.

Thanks,
Tom

On Tue, May 31, 2022 at 8:12 PM Shengkai Fang <fskm...@gmail.com> wrote:

> Hi, Tom.
>
> I don't reproduce the exception in the master. I am not sure whether the
> problem is fixed or I missing something.
>
> The only difference is my test udf extends ScalarFunction rather than
> DPScalarFunction and I use String[] as the input type.
>
> ```
>
> public static class ListToString extends ScalarFunction {
>     public String eval(String[] arr) {
>         return "foo";
>     }
> }
>
> ```
>
> I think you can also debug in this way:
> 0. Open the Flink repo and checkout to the release-1.11
> 1. Create the UDF in JavaUserDefinedScalarFunctions
> 2. Find a test in the table ITCase, e.g. TableSinkITCase.scala
> 3. Add a new test to verify the results. I just add the following code
> ```
>  @Test
>   def test(): Unit = {
>
>     val dataId = TestValuesTableFactory.registerRowData(
>       Seq(GenericRowData.of(new
> GenericArrayData(Array(StringData.fromString("3")).toArray[Object]))))
>
>     tEnv.executeSql(
>       s"""
>          |CREATE TABLE test2   (person ARRAY<STRING>) WITH(
>          |  'connector' = 'values',
>          |  'data-id' = '$dataId',
>          |  'register-internal-data' = 'true'
>          |)
>          |""".stripMargin
>     )
>     tEnv.createFunction("ListToString", classOf[ListToString])
>     tEnv.executeSql("SELECT ListToString(`person`) as col1_string FROM
> `test2`").print()
>   }
> ```
> 4. Then you can debug the case in your IDEA.
>
> Considering the Flink 1.11 is not maintained by the community, do you mind
> to upgrade to the latest version(1.13/1.14/1.15)?
>
>
>
> Best,
> Shengkai
>
> Tom Thornton <thom...@yelp.com> 于2022年6月1日周三 02:06写道:
>
>> Hi all,
>>
>> Thank you for the help.
>>
>> It seems an exception thrown when Flink try to deserialize the object
>>> outputed by your udf. So is the obejct produced by your udf serializable?
>>> Does it contain any lambda function in the object/class?
>>
>>
>> The output object of the UDF is the string "foo" which should be
>> serializable. This exception only occurs when the input to the UDF is not
>> null. However, when the input is null, then the output object (which is
>> still the string "foo") does not cause any error or exception (i.e. it is
>> able to be serialized). There are no lambda functions in the output object
>> (it is just a string object).
>>
>> Thanks,
>> Tom
>>
>> On Thu, May 26, 2022 at 9:36 PM yuxia <luoyu...@alumni.sjtu.edu.cn>
>> wrote:
>>
>>> It seems an exception thrown when Flink try to deserialize the object
>>> outputed by your udf. So is the obejct produced by your udf serializable?
>>> Does it contain any lambda function in the object/class?
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Tom Thornton" <thom...@yelp.com>
>>> *收件人: *"User" <user@flink.apache.org>
>>> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04
>>> *主题: *Exception when running Java UDF with Blink table planner
>>>
>>> We are migrating from the legacy table planner to the Blink table
>>> planner. Previously we had a UDF defined like this that worked without
>>> issue:
>>>
>>> public class ListToString extends DPScalarFunction {
>>>     public String eval(List list) {
>>>         return "foo";
>>>     }
>>>
>>> Since moving to the Blink table planner and receiving this error:
>>>
>>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters 
>>> of function 'ListToString' do not match any signature.
>>> Actual: (java.lang.String[])
>>> Expected: (java.util.List)
>>>
>>>
>>> We refactored the UDF to take as input an Object[] to match what is
>>> received from Blink:
>>>
>>> public class ListToString extends DPScalarFunction {
>>>     public String eval(Object[] arr) {        return "foo";
>>>     }
>>> }
>>>
>>> Now the UDF always fails (including for the simplified example above
>>> where we return a constant string regardless of input). For example, when
>>> we run on a query like this one:
>>>
>>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>>
>>> Produces an IndexOutOfBoundsException:
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
>>> length 0
>>>     at 
>>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>>     at 
>>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>>     at 
>>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>>>     at java.base/java.util.Objects.checkIndex(Objects.java:372)
>>>     at java.base/java.util.ArrayList.get(ArrayList.java:459)
>>>     at 
>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>>     at 
>>> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>>>     at 
>>> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>>>     at 
>>> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>>>     at StreamExecCalc$337.processElement(Unknown Source)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>     at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>     at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>     at SourceConversion$328.processElement(Unknown Source)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>     at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>     at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>     at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>     at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>>>
>>> Any ideas what may be causing this?
>>>
>>>

Reply via email to