Hi Shengkai, In order to reproduce the issue, the input argument type must be `Object[]`. Also DPScalarFunction is a typo and should be ScalarFunction. Are you able to observe the error if you try with the changed input type:
public static class ListToString extends ScalarFunction { public String eval(Object[] arr) { return "foo"; } } The UDF should be able to take as input any type of array (e.g., array of strings, ints, or longs) and still return a string regardless of the input. We are blocked on upgrading due to some unrelated issues, so hoping to determine if this is a legitimate issue with 1.11 or not. Thanks, Tom On Tue, May 31, 2022 at 8:12 PM Shengkai Fang <fskm...@gmail.com> wrote: > Hi, Tom. > > I don't reproduce the exception in the master. I am not sure whether the > problem is fixed or I missing something. > > The only difference is my test udf extends ScalarFunction rather than > DPScalarFunction and I use String[] as the input type. > > ``` > > public static class ListToString extends ScalarFunction { > public String eval(String[] arr) { > return "foo"; > } > } > > ``` > > I think you can also debug in this way: > 0. Open the Flink repo and checkout to the release-1.11 > 1. Create the UDF in JavaUserDefinedScalarFunctions > 2. Find a test in the table ITCase, e.g. TableSinkITCase.scala > 3. Add a new test to verify the results. I just add the following code > ``` > @Test > def test(): Unit = { > > val dataId = TestValuesTableFactory.registerRowData( > Seq(GenericRowData.of(new > GenericArrayData(Array(StringData.fromString("3")).toArray[Object])))) > > tEnv.executeSql( > s""" > |CREATE TABLE test2 (person ARRAY<STRING>) WITH( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'register-internal-data' = 'true' > |) > |""".stripMargin > ) > tEnv.createFunction("ListToString", classOf[ListToString]) > tEnv.executeSql("SELECT ListToString(`person`) as col1_string FROM > `test2`").print() > } > ``` > 4. Then you can debug the case in your IDEA. > > Considering the Flink 1.11 is not maintained by the community, do you mind > to upgrade to the latest version(1.13/1.14/1.15)? > > > > Best, > Shengkai > > Tom Thornton <thom...@yelp.com> 于2022年6月1日周三 02:06写道: > >> Hi all, >> >> Thank you for the help. >> >> It seems an exception thrown when Flink try to deserialize the object >>> outputed by your udf. So is the obejct produced by your udf serializable? >>> Does it contain any lambda function in the object/class? >> >> >> The output object of the UDF is the string "foo" which should be >> serializable. This exception only occurs when the input to the UDF is not >> null. However, when the input is null, then the output object (which is >> still the string "foo") does not cause any error or exception (i.e. it is >> able to be serialized). There are no lambda functions in the output object >> (it is just a string object). >> >> Thanks, >> Tom >> >> On Thu, May 26, 2022 at 9:36 PM yuxia <luoyu...@alumni.sjtu.edu.cn> >> wrote: >> >>> It seems an exception thrown when Flink try to deserialize the object >>> outputed by your udf. So is the obejct produced by your udf serializable? >>> Does it contain any lambda function in the object/class? >>> >>> Best regards, >>> Yuxia >>> >>> ------------------------------ >>> *发件人: *"Tom Thornton" <thom...@yelp.com> >>> *收件人: *"User" <user@flink.apache.org> >>> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04 >>> *主题: *Exception when running Java UDF with Blink table planner >>> >>> We are migrating from the legacy table planner to the Blink table >>> planner. Previously we had a UDF defined like this that worked without >>> issue: >>> >>> public class ListToString extends DPScalarFunction { >>> public String eval(List list) { >>> return "foo"; >>> } >>> >>> Since moving to the Blink table planner and receiving this error: >>> >>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters >>> of function 'ListToString' do not match any signature. >>> Actual: (java.lang.String[]) >>> Expected: (java.util.List) >>> >>> >>> We refactored the UDF to take as input an Object[] to match what is >>> received from Blink: >>> >>> public class ListToString extends DPScalarFunction { >>> public String eval(Object[] arr) { return "foo"; >>> } >>> } >>> >>> Now the UDF always fails (including for the simplified example above >>> where we return a constant string regardless of input). For example, when >>> we run on a query like this one: >>> >>> SELECT ListToString(`col1`) as col1_string FROM `table` >>> >>> Produces an IndexOutOfBoundsException: >>> >>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for >>> length 0 >>> at >>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) >>> at >>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) >>> at >>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) >>> at java.base/java.util.Objects.checkIndex(Objects.java:372) >>> at java.base/java.util.ArrayList.get(ArrayList.java:459) >>> at >>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570) >>> at >>> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070) >>> at >>> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406) >>> at StreamExecCalc$337.processElement(Unknown Source) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >>> at SourceConversion$328.processElement(Unknown Source) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >>> at >>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266) >>> >>> Any ideas what may be causing this? >>> >>>