[ https://issues.apache.org/jira/browse/FLINK-23671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395717#comment-17395717 ]
Shengkai Fang commented on FLINK-23671: --------------------------------------- [~twalthr] I have updated the description. > Failed to inference type in correlate > -------------------------------------- > > Key: FLINK-23671 > URL: https://issues.apache.org/jira/browse/FLINK-23671 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.2 > Reporter: Shengkai Fang > Priority: Major > > {code:java} > CREATE FUNCTION func111 AS > 'org.apache.flink.table.client.gateway.utils.CPDetailOriginMatchV2UDF'; > CREATE TABLE side( > `id2` VARCHAR, > PRIMARY KEY (`id2`) NOT ENFORCED > ) WITH ( > 'connector' = 'values' > ); > CREATE TABLE main( > `id` VARCHAR, > `proctime` as proctime() > ) WITH ( > 'connector' = 'datagen', > 'number-of-rows' = '10' > ); > CREATE TABLE blackhole( > `id` VARCHAR > ) WITH ( > 'connector' = 'blackhole' > ); > INSERT INTO blackhole > SELECT `id` > FROM main > JOIN side FOR SYSTEM_TIME AS OF main.`proctime` ON main.`id` = side.`id2` > INNER join lateral table(func111(side.`id2`)) as T(`is_match`, > `match_bizline`, `match_page_id`, `source_type`) ON 1 = 1; > {code} > The implementation of the udf is as follow > {code:java} > package org.apache.flink.table.client.gateway.utils; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.catalog.DataTypeFactory; > import org.apache.flink.table.functions.TableFunction; > import org.apache.flink.table.types.DataType; > import org.apache.flink.table.types.inference.TypeInference; > import org.apache.flink.types.Row; > import java.util.Optional; > public class CPDetailOriginMatchV2UDF extends TableFunction<Row> { > public void eval(String original) { > collect(null); > } > // is_matched, match_bizline, match_page_id, scene > @Override > public TypeInference getTypeInference(DataTypeFactory typeFactory) { > return TypeInference.newBuilder() > .outputTypeStrategy( > callContext -> { > DataType[] array = new DataType[4]; > array[0] = DataTypes.BOOLEAN(); > array[1] = DataTypes.STRING(); > // page_id 是Long类型, BIGINT 是否可以支持? > array[2] = DataTypes.BIGINT(); > array[3] = DataTypes.STRING(); > return Optional.of(DataTypes.ROW(array)); > }) > .build(); > } > } > {code} > The exception stack as follows. > {code:java} > org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of > function's argument data type 'STRING NOT NULL' and actual argument type > 'STRING'. > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:323) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$$anonfun$verifyArgumentTypes$1.apply(BridgingFunctionGenUtil.scala:320) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.verifyArgumentTypes(BridgingFunctionGenUtil.scala:320) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:95) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:65) > at > org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:73) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:861) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:537) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:57) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157) > at > org.apache.flink.table.planner.codegen.CorrelateCodeGenerator$.generateOperator(CorrelateCodeGenerator.scala:127) > at > org.apache.flink.table.planner.codegen.CorrelateCodeGenerator$.generateCorrelateTransformation(CorrelateCodeGenerator.scala:75) > at > org.apache.flink.table.planner.codegen.CorrelateCodeGenerator.generateCorrelateTransformation(CorrelateCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate.translateToPlanInternal(CommonExecCorrelate.java:102) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:210) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:289) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.lambda$translateInputToPlan$5(ExecNodeBase.java:244) > > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)