Hi Thanks @Alexey. I think what @kui needs is quite similar to transform in Spark, right?
Best, Shammon On Wed, Feb 22, 2023 at 10:12 PM Alexey Novakov <ale...@ververica.com> wrote: > Xuekui, I guess you want high-order functions support in Flink SQL like > Spark has https://spark.apache.org/docs/latest/api/sql/#transform ? > > Best regards, > Alexey > > On Wed, Feb 22, 2023 at 10:31 AM Xuekui <baixue...@foxmail.com> wrote: > >> Hi Yuxia and Shammon, >> >> Thanks for your reply. >> >> The requirements is dynamic in my case. If I move the logic into udf, >> it's not flexiable. >> >> For example, there's one users column in my talbe whose type is Row<id >> Long, name String, address String> >> >> I can do some filter like array_filter(users, user -> user.id > 1) or >> array_filter(users, >> user -> user.name in ("a", "b")) >> >> If I can pass function parameter, I only need one UDF array_filter. >> >> But if I put the operation of lambda function into UDF, I need to write >> one udf for each condition. >> >> Thanks >> Xuekui >> >> ------------------------------ >> >> Xuekui >> baixue...@foxmail.com >> >> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=Xuekui&icon=https%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Doidb%26k%3DKxxTmcBSdMxKwMAN06icatg%26s%3D0&mail=baixuekui%40foxmail.com&code=N8Cet_EMVMnF7HKcDtoXrjLoOIaR6L1TssSS8JERSE6kMP-z4kIBPOANAJykWxtHUxbc3pFXPCMY0GPECwMqimaIEv4aD6d01ucP-vR-UH0> >> >> >> >> >> Original Email >> >> Sender:"Shammon FY"< zjur...@gmail.com >; >> >> Sent Time:2023/2/22 14:28 >> >> To:"yuxia"< luoyu...@alumni.sjtu.edu.cn >; >> >> Cc recipient:"Xuekui"< baixue...@foxmail.com >;"fskmine"< >> fskm...@gmail.com >;"Caizhi Weng"< tsreape...@gmail.com >;"User"< >> user@flink.apache.org >; >> >> Subject:Re: Flink SQL support array transform function >> >> Hi >> >> Agree with @yuxia, you should do the operation of lambda function in your >> own udf >> >> Best, >> Shammon >> >> >> On Wed, Feb 22, 2023 at 10:32 AM yuxia <luoyu...@alumni.sjtu.edu.cn> >> wrote: >> >>> May be you can try with a non-lambda function. >>> But TBH, I haven't seen any Flink UDF that accepts function as parameter >>> in my previous experience. I'm afraid that it's no allowed to pass a >>> function as parameter. >>> >>> Best regards, >>> Yuxia >>> >>> ------------------------------ >>> *发件人: *"Xuekui" <baixue...@foxmail.com> >>> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn> >>> *抄送: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com>, >>> "User" <user@flink.apache.org> >>> *发送时间: *星期二, 2023年 2 月 21日 上午 11:25:48 >>> *主题: *Re:Re: Flink SQL support array transform function >>> >>> Hi YuXia, >>> >>> Thanks for your advice. >>> >>> By adding the hint, the type validation can pass. >>> But still I can't pass the function to this udf >>> Here is my query >>> >>> select array_transform(ids, id -> id +1) from tmp_table >>> >>> The lambda function id -> id +1 can't be passed because "->" is not >>> supported in calcite now. >>> >>> Exception in thread "main" >>> org.apache.flink.table.api.SqlParserException: SQL parse failed. >>> Encountered "- >" at line 3, column 40. >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) >>> >>> >>> >>> >>> >>> >>> Original Email >>> >>> Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn >; >>> >>> Sent Time:2023/2/20 10:00 >>> >>> To:"Xuekui"< baixue...@foxmail.com >; >>> >>> Cc recipient:"fskmine"< fskm...@gmail.com >;"Caizhi Weng"< >>> tsreape...@gmail.com >;"User"< user@flink.apache.org >; >>> >>> Subject:Re: Flink SQL support array transform function >>> >>> Hi, Xuekui. >>> As said in the exception stack, may be you can try to provide hint for >>> the function's parameters. >>> >>> >>> class ArrayTransformFunction extends ScalarFunction { >>> >>> def eval(@DataTypeHint("ARRAY<BIGINT>") a: Array[Long], >>> @DataTypeHint("RAW") function: Long => Long): Array[Long] = { >>> a.map(e => function(e)) >>> } >>> >>> } >>> Hope it can help. >>> For more detail, please refer to Flink doc[1] >>> >>> [1]:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference >>> >>> >>> >>> Best regards, >>> Yuxia >>> >>> ------------------------------ >>> *发件人: *"Xuekui" <baixue...@foxmail.com> >>> *收件人: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" < >>> tsreape...@gmail.com> >>> *抄送: *"User" <user@flink.apache.org> >>> *发送时间: *星期四, 2023年 2 月 16日 上午 10:54:05 >>> *主题: *Re: Flink SQL support array transform function >>> >>> Hi Caizhi, >>> >>> I've tried to write UDF to support this function, but I found I can't >>> pass the function parameter to udf because the data type of function is not >>> supported. An exception throws in SQL validation. >>> >>> My UDF code: >>> >>> class ArrayTransformFunction extends ScalarFunction { >>> >>> def eval(a: Array[Long], function: Long => Long): Array[Long] = { >>> a.map(e => function(e)) >>> } >>> >>> } >>> >>> >>> Exception: >>> >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> SQL validation failed. An error occurred in the type inference logic of >>> function 'transform'. >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) >>> at SQLTest$.main(SQLTest.scala:44) >>> at SQLTest.main(SQLTest.scala) >>> Caused by: org.apache.flink.table.api.ValidationException: An error >>> occurred in the type inference logic of function 'transform'. >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) >>> at java.util.Optional.flatMap(Optional.java:241) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) >>> at >>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) >>> ... 6 more >>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>> extract a valid type inference for function class >>> 'udf.ArrayTransformFunction'. Please check for implementation mistakes >>> and/or provide a corresponding hint. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83) >>> at >>> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) >>> ... 17 more >>> Caused by: org.apache.flink.table.api.ValidationException: Error in >>> extracting a signature to output mapping. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) >>> ... 20 more >>> Caused by: org.apache.flink.table.api.ValidationException: Unable to >>> extract a type inference from method: >>> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1) >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) >>> ... 22 more >>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>> extract a data type from 'scala.Function1<java.lang.Object, >>> java.lang.Object>' in parameter 1 of method 'eval' in class >>> 'udf.ArrayTransformFunction'. Please pass the required data type manually >>> or allow RAW types. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385) >>> at java.util.Optional.orElseGet(Optional.java:267) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383) >>> at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) >>> at >>> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) >>> at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) >>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) >>> at >>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) >>> at >>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>> at >>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) >>> ... 23 more >>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>> extract a data type from 'scala.Function1<java.lang.Object, >>> java.lang.Object>'. Interpreting it as a structured type was also not >>> successful. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212) >>> ... 43 more >>> Caused by: org.apache.flink.table.api.ValidationException: Class >>> 'scala.Function1' must not be abstract. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328) >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268) >>> ... 44 more >>> >>> Do you have any advice on this udf? >>> >>> Thanks >>> >>> >>> ------------------ Original ------------------ >>> *From: * "Shengkai Fang";<fskm...@gmail.com>; >>> *Send time:* Tuesday, Aug 3, 2021 8:51 PM >>> *To:* "Caizhi Weng"<tsreape...@gmail.com>; >>> *Cc:* "Xuekui"<baixue...@foxmail.com>; "user"<user@flink.apache.org>; >>> *Subject: * Re: Flink SQL support array transform function >>> >>> Hi, Caizhi. Do you think we should support this? Maybe we can open a >>> jira for this or to align with the spark to support more useful built-in >>> functions. >>> >>> >>> Caizhi Weng <tsreape...@gmail.com> 于2021年8月3日周二 下午3:42写道: >>> >>>> Hi! >>>> Currently there is no such built-in function in Flink SQL. You can try >>>> to write your own user-defined function[1] to achieve this. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ >>>> >>>> Xuekui <baixue...@foxmail.com> 于2021年8月3日周二 下午3:22写道: >>>> >>>>> Hi, >>>>> >>>>> I'm using Flink SQL and need to do some transformation for one array >>>>> column, just like spark sql transform function. >>>>> https://spark.apache.org/docs/latest/api/sql/index.html#transform >>>>> >>>>> I found it's not supported by Flink SQL , is there any plan for it? >>>>> >>>>> >>>>> Thank you >>>>> >>>>> >>> >>>