Hi.

Could you also tell us which Flink version you are using, the schema of the
source table and some test data? With these info, we can debug in our local
environment.

Best,
Shengkai

Tom Thornton <thom...@yelp.com> 于2022年5月27日周五 06:47写道:

> We are migrating from the legacy table planner to the Blink table planner.
> Previously we had a UDF defined like this that worked without issue:
>
> public class ListToString extends DPScalarFunction {
>     public String eval(List list) {
>         return "foo";
>     }
>
> Since moving to the Blink table planner and receiving this error:
>
> Caused by: org.apache.flink.table.api.ValidationException: Given parameters 
> of function 'ListToString' do not match any signature.
> Actual: (java.lang.String[])
> Expected: (java.util.List)
>
>
> We refactored the UDF to take as input an Object[] to match what is
> received from Blink:
>
> public class ListToString extends DPScalarFunction {
>     public String eval(Object[] arr) {        return "foo";
>     }
> }
>
> Now the UDF always fails (including for the simplified example above where
> we return a constant string regardless of input). For example, when we run
> on a query like this one:
>
> SELECT ListToString(`col1`) as col1_string FROM `table`
>
> Produces an IndexOutOfBoundsException:
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
> length 0
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>       at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>       at java.base/java.util.Objects.checkIndex(Objects.java:372)
>       at java.base/java.util.ArrayList.get(ArrayList.java:459)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>       at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>       at 
> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>       at StreamExecCalc$337.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at SourceConversion$328.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)
>
> Any ideas what may be causing this?
>

Reply via email to