Hi Agree with @yuxia, you should do the operation of lambda function in your own udf
Best, Shammon On Wed, Feb 22, 2023 at 10:32 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > May be you can try with a non-lambda function. > But TBH, I haven't seen any Flink UDF that accepts function as parameter > in my previous experience. I'm afraid that it's no allowed to pass a > function as parameter. > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"Xuekui" <baixue...@foxmail.com> > *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn> > *抄送: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com>, > "User" <user@flink.apache.org> > *发送时间: *星期二, 2023年 2 月 21日 上午 11:25:48 > *主题: *Re:Re: Flink SQL support array transform function > > Hi YuXia, > > Thanks for your advice. > > By adding the hint, the type validation can pass. > But still I can't pass the function to this udf > Here is my query > > select array_transform(ids, id -> id +1) from tmp_table > > The lambda function id -> id +1 can't be passed because "->" is not > supported in calcite now. > > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Encountered "- >" at line 3, column 40. > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) > > > > > > > Original Email > > Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn >; > > Sent Time:2023/2/20 10:00 > > To:"Xuekui"< baixue...@foxmail.com >; > > Cc recipient:"fskmine"< fskm...@gmail.com >;"Caizhi Weng"< > tsreape...@gmail.com >;"User"< user@flink.apache.org >; > > Subject:Re: Flink SQL support array transform function > > Hi, Xuekui. > As said in the exception stack, may be you can try to provide hint for > the function's parameters. > > > class ArrayTransformFunction extends ScalarFunction { > > def eval(@DataTypeHint("ARRAY<BIGINT>") a: Array[Long], > @DataTypeHint("RAW") function: Long => Long): Array[Long] = { > a.map(e => function(e)) > } > > } > Hope it can help. > For more detail, please refer to Flink doc[1] > > [1]:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference > > > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"Xuekui" <baixue...@foxmail.com> > *收件人: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com> > *抄送: *"User" <user@flink.apache.org> > *发送时间: *星期四, 2023年 2 月 16日 上午 10:54:05 > *主题: *Re: Flink SQL support array transform function > > Hi Caizhi, > > I've tried to write UDF to support this function, but I found I can't pass > the function parameter to udf because the data type of function is not > supported. An exception throws in SQL validation. > > My UDF code: > > class ArrayTransformFunction extends ScalarFunction { > > def eval(a: Array[Long], function: Long => Long): Array[Long] = { > a.map(e => function(e)) > } > > } > > > Exception: > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. An error occurred in the type inference logic of > function 'transform'. > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) > at SQLTest$.main(SQLTest.scala:44) > at SQLTest.main(SQLTest.scala) > Caused by: org.apache.flink.table.api.ValidationException: An error occurred > in the type inference logic of function 'transform'. > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) > at java.util.Optional.flatMap(Optional.java:241) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) > at > org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) > ... 6 more > Caused by: org.apache.flink.table.api.ValidationException: Could not extract > a valid type inference for function class 'udf.ArrayTransformFunction'. > Please check for implementation mistakes and/or provide a corresponding hint. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83) > at > org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) > ... 17 more > Caused by: org.apache.flink.table.api.ValidationException: Error in > extracting a signature to output mapping. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) > ... 20 more > Caused by: org.apache.flink.table.api.ValidationException: Unable to extract > a type inference from method: > public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) > ... 22 more > Caused by: org.apache.flink.table.api.ValidationException: Could not extract > a data type from 'scala.Function1<java.lang.Object, java.lang.Object>' in > parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please > pass the required data type manually or allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383) > at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) > at > java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) > at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) > ... 23 more > Caused by: org.apache.flink.table.api.ValidationException: Could not extract > a data type from 'scala.Function1<java.lang.Object, java.lang.Object>'. > Interpreting it as a structured type was also not successful. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212) > ... 43 more > Caused by: org.apache.flink.table.api.ValidationException: Class > 'scala.Function1' must not be abstract. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328) > at > org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268) > ... 44 more > > Do you have any advice on this udf? > > Thanks > > > ------------------ Original ------------------ > *From: * "Shengkai Fang";<fskm...@gmail.com>; > *Send time:* Tuesday, Aug 3, 2021 8:51 PM > *To:* "Caizhi Weng"<tsreape...@gmail.com>; > *Cc:* "Xuekui"<baixue...@foxmail.com>; "user"<user@flink.apache.org>; > *Subject: * Re: Flink SQL support array transform function > > Hi, Caizhi. Do you think we should support this? Maybe we can open a jira > for this or to align with the spark to support more useful built-in > functions. > > > Caizhi Weng <tsreape...@gmail.com> 于2021年8月3日周二 下午3:42写道: > >> Hi! >> Currently there is no such built-in function in Flink SQL. You can try to >> write your own user-defined function[1] to achieve this. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ >> >> Xuekui <baixue...@foxmail.com> 于2021年8月3日周二 下午3:22写道: >> >>> Hi, >>> >>> I'm using Flink SQL and need to do some transformation for one array >>> column, just like spark sql transform function. >>> https://spark.apache.org/docs/latest/api/sql/index.html#transform >>> >>> I found it's not supported by Flink SQL , is there any plan for it? >>> >>> >>> Thank you >>> >>> > >