Hi,

1.11中引入的新的udf注册接口,使用的是新的udf类型推断机制,所以会有上面的问题。
你可以参考新的udf类型推导文档[1] 来写一下type hint试试

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#type-inference

zz zhang <[email protected]> 于2020年8月7日周五 上午11:00写道:

> 执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
> Flink version: 1.11.1
>
> package com.test;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.types.Row;
>
>
> public class TestUTDFOk {
>     public static class UDTF extends TableFunction<Row> {
>
>         public void eval(String input) {
>             Row row = new Row(3);
>             row.setField(0, input);
>             row.setField(1, input.length());
>             row.setField(2, input +  2);
>             collect(row);
>         }
>     }
>
>     public static  class UDF extends ScalarFunction {
>         public String eval(Row row, Integer index) {
>             try {
>                 return String.valueOf(row.getField(index));
>             } catch (Exception e) {
>                 throw e;
>             }
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>                 env,
> EnvironmentSettings.newInstance().useBlinkPlanner().build());
> //        tEnv.registerFunction("udtf", new UDTF());
> //        tEnv.registerFunction("udf", new UDF());
>         tEnv.createTemporarySystemFunction("udtf", new UDTF());
>         tEnv.createTemporarySystemFunction("udf", new UDF());
>
>         tEnv.createTemporaryView("source", tEnv.fromValues("a", "b",
> "c").as("f0"));
>         String sinkDDL = "create table sinkTable ("
>                 + "f0 String"
>                 + ", x String"
>                 + ", y String"
>                 + ", z String"
>                 + ") with ("
>                 + "    'connector.type' = 'filesystem',"
>                 + "    'format.type' = 'csv',"
>                 + "    'connector.path' =
> 'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'"
>                 + ")";
>         String udtfCall = "insert into sinkTable SELECT S.f0"
>                 + ", udf(f1, 0) as x"
>                 + ", udf(f1, 1) as y"
>                 + ", udf(f1, 2) as z"
>                 + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";
>
>         tEnv.executeSql(sinkDDL);
>         tEnv.executeSql(udtfCall);
>     }
> }
>
> 异常如下:
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> An error occurred in the type inference logic of function 'udf'.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.test.TestUTDFOk.main(TestUTDFOk.java:64)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'udf'.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.util.Optional.flatMap(Optional.java:241)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1318)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1052)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
> ... 7 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'com.test.TestUTDFOk$UDF'. Please check for implementation mistakes
> and/or provide a corresponding hint.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:160)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:85)
> at
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:144)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:162)
> ... 19 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to output mapping.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158)
> ... 22 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public java.lang.String
> com.test.TestUTDFOk$UDF.eval(org.apache.flink.types.Row,java.lang.Integer)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110)
> ... 24 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class org.apache.flink.types.Row' in
> parameter 0 of method 'eval' in class 'com.test.TestUTDFOk$UDF'.
> Please pass the required data type manually or allow RAW types.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:147)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:396)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:375)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:375)
> at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
> at
> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
> at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:376)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:354)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:314)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164)
> ... 25 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot
> extract a data type from a pure 'org.apache.flink.types.Row' class.
> Please use annotations to define field names and field types.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238)
> ... 45 more
>
> Process finished with exit code 1
>
> --
> Best,
> zz zhang
>


-- 

Best,
Benchao Li

回复