Xuekui, I guess you want high-order functions support in Flink SQL like Spark has https://spark.apache.org/docs/latest/api/sql/#transform ?
Best regards, Alexey On Wed, Feb 22, 2023 at 10:31 AM Xuekui <baixue...@foxmail.com> wrote: > Hi Yuxia and Shammon, > > Thanks for your reply. > > The requirements is dynamic in my case. If I move the logic into udf, it's > not flexiable. > > For example, there's one users column in my talbe whose type is Row<id > Long, name String, address String> > > I can do some filter like array_filter(users, user -> user.id > 1) or > array_filter(users, > user -> user.name in ("a", "b")) > > If I can pass function parameter, I only need one UDF array_filter. > > But if I put the operation of lambda function into UDF, I need to write > one udf for each condition. > > Thanks > Xuekui > > ------------------------------ > > Xuekui > baixue...@foxmail.com > > <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=Xuekui&icon=https%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Doidb%26k%3DKxxTmcBSdMxKwMAN06icatg%26s%3D0&mail=baixuekui%40foxmail.com&code=N8Cet_EMVMnF7HKcDtoXrjLoOIaR6L1TssSS8JERSE6kMP-z4kIBPOANAJykWxtHUxbc3pFXPCMY0GPECwMqimaIEv4aD6d01ucP-vR-UH0> > > > > > Original Email > > Sender:"Shammon FY"< zjur...@gmail.com >; > > Sent Time:2023/2/22 14:28 > > To:"yuxia"< luoyu...@alumni.sjtu.edu.cn >; > > Cc recipient:"Xuekui"< baixue...@foxmail.com >;"fskmine"< > fskm...@gmail.com >;"Caizhi Weng"< tsreape...@gmail.com >;"User"< > user@flink.apache.org >; > > Subject:Re: Flink SQL support array transform function > > Hi > > Agree with @yuxia, you should do the operation of lambda function in your > own udf > > Best, > Shammon > > > On Wed, Feb 22, 2023 at 10:32 AM yuxia <luoyu...@alumni.sjtu.edu.cn> > wrote: > >> May be you can try with a non-lambda function. >> But TBH, I haven't seen any Flink UDF that accepts function as parameter >> in my previous experience. I'm afraid that it's no allowed to pass a >> function as parameter. >> >> Best regards, >> Yuxia >> >> ------------------------------ >> *发件人: *"Xuekui" <baixue...@foxmail.com> >> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn> >> *抄送: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com>, >> "User" <user@flink.apache.org> >> *发送时间: *星期二, 2023年 2 月 21日 上午 11:25:48 >> *主题: *Re:Re: Flink SQL support array transform function >> >> Hi YuXia, >> >> Thanks for your advice. >> >> By adding the hint, the type validation can pass. >> But still I can't pass the function to this udf >> Here is my query >> >> select array_transform(ids, id -> id +1) from tmp_table >> >> The lambda function id -> id +1 can't be passed because "->" is not >> supported in calcite now. >> >> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >> SQL parse failed. Encountered "- >" at line 3, column 40. >> at >> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) >> >> >> >> >> >> >> Original Email >> >> Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn >; >> >> Sent Time:2023/2/20 10:00 >> >> To:"Xuekui"< baixue...@foxmail.com >; >> >> Cc recipient:"fskmine"< fskm...@gmail.com >;"Caizhi Weng"< >> tsreape...@gmail.com >;"User"< user@flink.apache.org >; >> >> Subject:Re: Flink SQL support array transform function >> >> Hi, Xuekui. >> As said in the exception stack, may be you can try to provide hint for >> the function's parameters. >> >> >> class ArrayTransformFunction extends ScalarFunction { >> >> def eval(@DataTypeHint("ARRAY<BIGINT>") a: Array[Long], >> @DataTypeHint("RAW") function: Long => Long): Array[Long] = { >> a.map(e => function(e)) >> } >> >> } >> Hope it can help. >> For more detail, please refer to Flink doc[1] >> >> [1]:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference >> >> >> >> Best regards, >> Yuxia >> >> ------------------------------ >> *发件人: *"Xuekui" <baixue...@foxmail.com> >> *收件人: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com >> > >> *抄送: *"User" <user@flink.apache.org> >> *发送时间: *星期四, 2023年 2 月 16日 上午 10:54:05 >> *主题: *Re: Flink SQL support array transform function >> >> Hi Caizhi, >> >> I've tried to write UDF to support this function, but I found I can't >> pass the function parameter to udf because the data type of function is not >> supported. An exception throws in SQL validation. >> >> My UDF code: >> >> class ArrayTransformFunction extends ScalarFunction { >> >> def eval(a: Array[Long], function: Long => Long): Array[Long] = { >> a.map(e => function(e)) >> } >> >> } >> >> >> Exception: >> >> Exception in thread "main" org.apache.flink.table.api.ValidationException: >> SQL validation failed. An error occurred in the type inference logic of >> function 'transform'. >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) >> at SQLTest$.main(SQLTest.scala:44) >> at SQLTest.main(SQLTest.scala) >> Caused by: org.apache.flink.table.api.ValidationException: An error occurred >> in the type inference logic of function 'transform'. >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) >> at java.util.Optional.flatMap(Optional.java:241) >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) >> at >> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) >> ... 6 more >> Caused by: org.apache.flink.table.api.ValidationException: Could not extract >> a valid type inference for function class 'udf.ArrayTransformFunction'. >> Please check for implementation mistakes and/or provide a corresponding hint. >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >> at >> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) >> at >> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83) >> at >> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143) >> at >> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) >> ... 17 more >> Caused by: org.apache.flink.table.api.ValidationException: Error in >> extracting a signature to output mapping. >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) >> at >> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) >> at >> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) >> ... 20 more >> Caused by: org.apache.flink.table.api.ValidationException: Unable to extract >> a type inference from method: >> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1) >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) >> ... 22 more >> Caused by: org.apache.flink.table.api.ValidationException: Could not extract >> a data type from 'scala.Function1<java.lang.Object, java.lang.Object>' in >> parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please >> pass the required data type manually or allow RAW types. >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385) >> at java.util.Optional.orElseGet(Optional.java:267) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383) >> at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) >> at >> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) >> at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) >> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) >> at >> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) >> ... 23 more >> Caused by: org.apache.flink.table.api.ValidationException: Could not extract >> a data type from 'scala.Function1<java.lang.Object, java.lang.Object>'. >> Interpreting it as a structured type was also not successful. >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212) >> ... 43 more >> Caused by: org.apache.flink.table.api.ValidationException: Class >> 'scala.Function1' must not be abstract. >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328) >> at >> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453) >> at >> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268) >> ... 44 more >> >> Do you have any advice on this udf? >> >> Thanks >> >> >> ------------------ Original ------------------ >> *From: * "Shengkai Fang";<fskm...@gmail.com>; >> *Send time:* Tuesday, Aug 3, 2021 8:51 PM >> *To:* "Caizhi Weng"<tsreape...@gmail.com>; >> *Cc:* "Xuekui"<baixue...@foxmail.com>; "user"<user@flink.apache.org>; >> *Subject: * Re: Flink SQL support array transform function >> >> Hi, Caizhi. Do you think we should support this? Maybe we can open a jira >> for this or to align with the spark to support more useful built-in >> functions. >> >> >> Caizhi Weng <tsreape...@gmail.com> 于2021年8月3日周二 下午3:42写道: >> >>> Hi! >>> Currently there is no such built-in function in Flink SQL. You can try >>> to write your own user-defined function[1] to achieve this. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ >>> >>> Xuekui <baixue...@foxmail.com> 于2021年8月3日周二 下午3:22写道: >>> >>>> Hi, >>>> >>>> I'm using Flink SQL and need to do some transformation for one array >>>> column, just like spark sql transform function. >>>> https://spark.apache.org/docs/latest/api/sql/index.html#transform >>>> >>>> I found it's not supported by Flink SQL , is there any plan for it? >>>> >>>> >>>> Thank you >>>> >>>> >> >>