Hi all, Thank you for the help.
It seems an exception thrown when Flink try to deserialize the object > outputed by your udf. So is the obejct produced by your udf serializable? > Does it contain any lambda function in the object/class? The output object of the UDF is the string "foo" which should be serializable. This exception only occurs when the input to the UDF is not null. However, when the input is null, then the output object (which is still the string "foo") does not cause any error or exception (i.e. it is able to be serialized). There are no lambda functions in the output object (it is just a string object). Thanks, Tom On Thu, May 26, 2022 at 9:36 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > It seems an exception thrown when Flink try to deserialize the object > outputed by your udf. So is the obejct produced by your udf serializable? > Does it contain any lambda function in the object/class? > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"Tom Thornton" <thom...@yelp.com> > *收件人: *"User" <user@flink.apache.org> > *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04 > *主题: *Exception when running Java UDF with Blink table planner > > We are migrating from the legacy table planner to the Blink table planner. > Previously we had a UDF defined like this that worked without issue: > > public class ListToString extends DPScalarFunction { > public String eval(List list) { > return "foo"; > } > > Since moving to the Blink table planner and receiving this error: > > Caused by: org.apache.flink.table.api.ValidationException: Given parameters > of function 'ListToString' do not match any signature. > Actual: (java.lang.String[]) > Expected: (java.util.List) > > > We refactored the UDF to take as input an Object[] to match what is > received from Blink: > > public class ListToString extends DPScalarFunction { > public String eval(Object[] arr) { return "foo"; > } > } > > Now the UDF always fails (including for the simplified example above where > we return a constant string regardless of input). For example, when we run > on a query like this one: > > SELECT ListToString(`col1`) as col1_string FROM `table` > > Produces an IndexOutOfBoundsException: > > Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for > length 0 > at > java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) > at > java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) > at > java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) > at java.base/java.util.Objects.checkIndex(Objects.java:372) > at java.base/java.util.ArrayList.get(ArrayList.java:459) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) > at > org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570) > at > org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64) > at > org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700) > at > org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683) > at > org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175) > at > org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104) > at > org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128) > at > org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070) > at > org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406) > at StreamExecCalc$337.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at SourceConversion$328.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266) > > Any ideas what may be causing this? > >