Xuekui, I guess you want high-order functions support in Flink SQL like
Spark has https://spark.apache.org/docs/latest/api/sql/#transform ?

Best regards,
Alexey

On Wed, Feb 22, 2023 at 10:31 AM Xuekui <baixue...@foxmail.com> wrote:

> Hi Yuxia and Shammon,
>
> Thanks for your reply.
>
> The requirements is dynamic in my case. If I move the logic into udf, it's
> not flexiable.
>
> For example, there's one users column in my talbe whose type is Row<id
> Long, name String, address String>
>
> I can do some filter like array_filter(users, user -> user.id > 1) or 
> array_filter(users,
> user -> user.name in ("a", "b"))
>
> If I can pass function parameter, I only need one UDF array_filter.
>
> But if I put the operation of lambda function into UDF, I need to write
> one udf for each condition.
>
> Thanks
> Xuekui
>
> ------------------------------
>
> Xuekui
> baixue...@foxmail.com
>
> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=Xuekui&icon=https%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Doidb%26k%3DKxxTmcBSdMxKwMAN06icatg%26s%3D0&mail=baixuekui%40foxmail.com&code=N8Cet_EMVMnF7HKcDtoXrjLoOIaR6L1TssSS8JERSE6kMP-z4kIBPOANAJykWxtHUxbc3pFXPCMY0GPECwMqimaIEv4aD6d01ucP-vR-UH0>
>
>
>
>
> Original Email
>
> Sender:"Shammon FY"< zjur...@gmail.com >;
>
> Sent Time:2023/2/22 14:28
>
> To:"yuxia"< luoyu...@alumni.sjtu.edu.cn >;
>
> Cc recipient:"Xuekui"< baixue...@foxmail.com >;"fskmine"<
> fskm...@gmail.com >;"Caizhi Weng"< tsreape...@gmail.com >;"User"<
> user@flink.apache.org >;
>
> Subject:Re: Flink SQL support array transform function
>
> Hi
>
> Agree with @yuxia, you should do the operation of lambda function in your
> own udf
>
> Best,
> Shammon
>
>
> On Wed, Feb 22, 2023 at 10:32 AM yuxia <luoyu...@alumni.sjtu.edu.cn>
> wrote:
>
>> May be you can try with a non-lambda function.
>> But TBH, I haven't seen any Flink UDF that accepts function as parameter
>> in my previous experience. I'm afraid that it's no allowed to pass a
>> function as parameter.
>>
>> Best regards,
>> Yuxia
>>
>> ------------------------------
>> *发件人: *"Xuekui" <baixue...@foxmail.com>
>> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn>
>> *抄送: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com>,
>> "User" <user@flink.apache.org>
>> *发送时间: *星期二, 2023年 2 月 21日 上午 11:25:48
>> *主题: *Re:Re: Flink SQL support array transform function
>>
>> Hi YuXia,
>>
>> Thanks for your advice.
>>
>> By adding the hint, the type validation can pass.
>> But still I can't pass the function to this udf
>> Here is my query
>>
>> select array_transform(ids, id -> id +1) from tmp_table
>>
>> The lambda function  id -> id +1 can't be passed because "->" is not
>> supported in calcite now.
>>
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Encountered "- >" at line 3, column 40.
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>>
>>
>>
>>
>>
>>
>> Original Email
>>
>> Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn >;
>>
>> Sent Time:2023/2/20 10:00
>>
>> To:"Xuekui"< baixue...@foxmail.com >;
>>
>> Cc recipient:"fskmine"< fskm...@gmail.com >;"Caizhi Weng"<
>> tsreape...@gmail.com >;"User"< user@flink.apache.org >;
>>
>> Subject:Re: Flink SQL support array transform function
>>
>> Hi, Xuekui.
>> As said in the exception stack,  may be you can try to provide hint for
>> the function's parameters.
>>
>>
>> class ArrayTransformFunction extends ScalarFunction {
>>
>>  def eval(@DataTypeHint("ARRAY<BIGINT>") a: Array[Long],
>>  @DataTypeHint("RAW") function: Long => Long): Array[Long] = {
>>  a.map(e => function(e))
>>  }
>>
>> }
>> Hope it can help.
>> For more detail, please refer to Flink doc[1]
>>
>> [1]:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
>>
>>
>>
>> Best regards,
>> Yuxia
>>
>> ------------------------------
>> *发件人: *"Xuekui" <baixue...@foxmail.com>
>> *收件人: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com
>> >
>> *抄送: *"User" <user@flink.apache.org>
>> *发送时间: *星期四, 2023年 2 月 16日 上午 10:54:05
>> *主题: *Re:  Flink SQL support array transform function
>>
>> Hi Caizhi,
>>
>> I've tried to write UDF to support this function, but I found I can't
>> pass the function parameter to udf because the data type of function is not
>> supported. An exception throws in SQL validation.
>>
>> My UDF code:
>>
>> class ArrayTransformFunction extends ScalarFunction {
>>
>>   def eval(a: Array[Long], function: Long => Long): Array[Long] = {
>>     a.map(e => function(e))
>>   }
>>
>> }
>>
>>
>> Exception:
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. An error occurred in the type inference logic of 
>> function 'transform'.
>>      at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>>      at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>>      at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>>      at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>>      at SQLTest$.main(SQLTest.scala:44)
>>      at SQLTest.main(SQLTest.scala)
>> Caused by: org.apache.flink.table.api.ValidationException: An error occurred 
>> in the type inference logic of function 'transform'.
>>      at 
>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>>      at 
>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>>      at 
>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>>      at java.util.Optional.flatMap(Optional.java:241)
>>      at 
>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>>      at 
>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>>      at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
>>      at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
>>      at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
>>      at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
>>      at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>>      at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>>      ... 6 more
>> Caused by: org.apache.flink.table.api.ValidationException: Could not extract 
>> a valid type inference for function class 'udf.ArrayTransformFunction'. 
>> Please check for implementation mistakes and/or provide a corresponding hint.
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>      at 
>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>>      at 
>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>>      at 
>> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>>      at 
>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>>      ... 17 more
>> Caused by: org.apache.flink.table.api.ValidationException: Error in 
>> extracting a signature to output mapping.
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>>      at 
>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>>      at 
>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>>      ... 20 more
>> Caused by: org.apache.flink.table.api.ValidationException: Unable to extract 
>> a type inference from method:
>> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>>      ... 22 more
>> Caused by: org.apache.flink.table.api.ValidationException: Could not extract 
>> a data type from 'scala.Function1<java.lang.Object, java.lang.Object>' in 
>> parameter 1 of method 'eval' in class 'udf.ArrayTransformFunction'. Please 
>> pass the required data type manually or allow RAW types.
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
>>      at java.util.Optional.orElseGet(Optional.java:267)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
>>      at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
>>      at 
>> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>>      at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
>>      at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>      at 
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>      at 
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>      at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>      at 
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>>      at 
>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>>      ... 23 more
>> Caused by: org.apache.flink.table.api.ValidationException: Could not extract 
>> a data type from 'scala.Function1<java.lang.Object, java.lang.Object>'. 
>> Interpreting it as a structured type was also not successful.
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212)
>>      ... 43 more
>> Caused by: org.apache.flink.table.api.ValidationException: Class 
>> 'scala.Function1' must not be abstract.
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
>>      at 
>> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453)
>>      at 
>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268)
>>      ... 44 more
>>
>> Do you have any advice on this udf?
>>
>> Thanks
>>
>>
>> ------------------ Original ------------------
>> *From: * "Shengkai Fang";<fskm...@gmail.com>;
>> *Send time:* Tuesday, Aug 3, 2021 8:51 PM
>> *To:* "Caizhi Weng"<tsreape...@gmail.com>;
>> *Cc:* "Xuekui"<baixue...@foxmail.com>; "user"<user@flink.apache.org>;
>> *Subject: * Re: Flink SQL support array transform function
>>
>> Hi, Caizhi. Do you think we should support this? Maybe we can open a jira
>> for this or to align with the spark to support more useful built-in
>> functions.
>>
>>
>> Caizhi Weng <tsreape...@gmail.com> 于2021年8月3日周二 下午3:42写道:
>>
>>> Hi!
>>> Currently there is no such built-in function in Flink SQL. You can try
>>> to write your own user-defined function[1] to achieve this.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
>>>
>>> Xuekui <baixue...@foxmail.com> 于2021年8月3日周二 下午3:22写道:
>>>
>>>> Hi,
>>>>
>>>> I'm using Flink SQL and need to do some transformation for one array
>>>> column, just like spark sql transform function.
>>>> https://spark.apache.org/docs/latest/api/sql/index.html#transform
>>>>
>>>> I found it's not supported by Flink SQL , is there any plan for it?
>>>>
>>>>
>>>> Thank you
>>>>
>>>>
>>
>>

Reply via email to