Hi

Thanks @Alexey. I think what @kui needs is quite similar to transform in
Spark, right?

Best,
Shammon


On Wed, Feb 22, 2023 at 10:12 PM Alexey Novakov <ale...@ververica.com>
wrote:

> Xuekui, I guess you want high-order functions support in Flink SQL like
> Spark has https://spark.apache.org/docs/latest/api/sql/#transform ?
>
> Best regards,
> Alexey
>
> On Wed, Feb 22, 2023 at 10:31 AM Xuekui <baixue...@foxmail.com> wrote:
>
>> Hi Yuxia and Shammon,
>>
>> Thanks for your reply.
>>
>> The requirements is dynamic in my case. If I move the logic into udf,
>> it's not flexiable.
>>
>> For example, there's one users column in my talbe whose type is Row<id
>> Long, name String, address String>
>>
>> I can do some filter like array_filter(users, user -> user.id > 1) or 
>> array_filter(users,
>> user -> user.name in ("a", "b"))
>>
>> If I can pass function parameter, I only need one UDF array_filter.
>>
>> But if I put the operation of lambda function into UDF, I need to write
>> one udf for each condition.
>>
>> Thanks
>> Xuekui
>>
>> ------------------------------
>>
>> Xuekui
>> baixue...@foxmail.com
>>
>> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=Xuekui&icon=https%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Doidb%26k%3DKxxTmcBSdMxKwMAN06icatg%26s%3D0&mail=baixuekui%40foxmail.com&code=N8Cet_EMVMnF7HKcDtoXrjLoOIaR6L1TssSS8JERSE6kMP-z4kIBPOANAJykWxtHUxbc3pFXPCMY0GPECwMqimaIEv4aD6d01ucP-vR-UH0>
>>
>>
>>
>>
>> Original Email
>>
>> Sender:"Shammon FY"< zjur...@gmail.com >;
>>
>> Sent Time:2023/2/22 14:28
>>
>> To:"yuxia"< luoyu...@alumni.sjtu.edu.cn >;
>>
>> Cc recipient:"Xuekui"< baixue...@foxmail.com >;"fskmine"<
>> fskm...@gmail.com >;"Caizhi Weng"< tsreape...@gmail.com >;"User"<
>> user@flink.apache.org >;
>>
>> Subject:Re: Flink SQL support array transform function
>>
>> Hi
>>
>> Agree with @yuxia, you should do the operation of lambda function in your
>> own udf
>>
>> Best,
>> Shammon
>>
>>
>> On Wed, Feb 22, 2023 at 10:32 AM yuxia <luoyu...@alumni.sjtu.edu.cn>
>> wrote:
>>
>>> May be you can try with a non-lambda function.
>>> But TBH, I haven't seen any Flink UDF that accepts function as parameter
>>> in my previous experience. I'm afraid that it's no allowed to pass a
>>> function as parameter.
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Xuekui" <baixue...@foxmail.com>
>>> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn>
>>> *抄送: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <tsreape...@gmail.com>,
>>> "User" <user@flink.apache.org>
>>> *发送时间: *星期二, 2023年 2 月 21日 上午 11:25:48
>>> *主题: *Re:Re: Flink SQL support array transform function
>>>
>>> Hi YuXia,
>>>
>>> Thanks for your advice.
>>>
>>> By adding the hint, the type validation can pass.
>>> But still I can't pass the function to this udf
>>> Here is my query
>>>
>>> select array_transform(ids, id -> id +1) from tmp_table
>>>
>>> The lambda function  id -> id +1 can't be passed because "->" is not
>>> supported in calcite now.
>>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.SqlParserException: SQL parse failed.
>>> Encountered "- >" at line 3, column 40.
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>>>
>>>
>>>
>>>
>>>
>>>
>>> Original Email
>>>
>>> Sender:"yuxia"< luoyu...@alumni.sjtu.edu.cn >;
>>>
>>> Sent Time:2023/2/20 10:00
>>>
>>> To:"Xuekui"< baixue...@foxmail.com >;
>>>
>>> Cc recipient:"fskmine"< fskm...@gmail.com >;"Caizhi Weng"<
>>> tsreape...@gmail.com >;"User"< user@flink.apache.org >;
>>>
>>> Subject:Re: Flink SQL support array transform function
>>>
>>> Hi, Xuekui.
>>> As said in the exception stack,  may be you can try to provide hint for
>>> the function's parameters.
>>>
>>>
>>> class ArrayTransformFunction extends ScalarFunction {
>>>
>>>  def eval(@DataTypeHint("ARRAY<BIGINT>") a: Array[Long],
>>>  @DataTypeHint("RAW") function: Long => Long): Array[Long] = {
>>>  a.map(e => function(e))
>>>  }
>>>
>>> }
>>> Hope it can help.
>>> For more detail, please refer to Flink doc[1]
>>>
>>> [1]:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
>>>
>>>
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Xuekui" <baixue...@foxmail.com>
>>> *收件人: *"fskmine" <fskm...@gmail.com>, "Caizhi Weng" <
>>> tsreape...@gmail.com>
>>> *抄送: *"User" <user@flink.apache.org>
>>> *发送时间: *星期四, 2023年 2 月 16日 上午 10:54:05
>>> *主题: *Re:  Flink SQL support array transform function
>>>
>>> Hi Caizhi,
>>>
>>> I've tried to write UDF to support this function, but I found I can't
>>> pass the function parameter to udf because the data type of function is not
>>> supported. An exception throws in SQL validation.
>>>
>>> My UDF code:
>>>
>>> class ArrayTransformFunction extends ScalarFunction {
>>>
>>>   def eval(a: Array[Long], function: Long => Long): Array[Long] = {
>>>     a.map(e => function(e))
>>>   }
>>>
>>> }
>>>
>>>
>>> Exception:
>>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> SQL validation failed. An error occurred in the type inference logic of 
>>> function 'transform'.
>>>     at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>>>     at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>>>     at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>>>     at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>>>     at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>>>     at SQLTest$.main(SQLTest.scala:44)
>>>     at SQLTest.main(SQLTest.scala)
>>> Caused by: org.apache.flink.table.api.ValidationException: An error 
>>> occurred in the type inference logic of function 'transform'.
>>>     at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>>>     at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>>>     at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>>>     at java.util.Optional.flatMap(Optional.java:241)
>>>     at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>>>     at 
>>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>>>     at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
>>>     at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
>>>     at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
>>>     at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
>>>     at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>>>     at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>>>     ... 6 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not 
>>> extract a valid type inference for function class 
>>> 'udf.ArrayTransformFunction'. Please check for implementation mistakes 
>>> and/or provide a corresponding hint.
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>>     at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>>>     at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>>>     at 
>>> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>>>     at 
>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>>>     ... 17 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Error in 
>>> extracting a signature to output mapping.
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>>>     at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>>>     at 
>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>>>     ... 20 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Unable to 
>>> extract a type inference from method:
>>> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
>>>     ... 22 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not 
>>> extract a data type from 'scala.Function1<java.lang.Object, 
>>> java.lang.Object>' in parameter 1 of method 'eval' in class 
>>> 'udf.ArrayTransformFunction'. Please pass the required data type manually 
>>> or allow RAW types.
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:220)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:198)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:174)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromMethodParameter(DataTypeExtractor.java:128)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractDataTypeArgument(FunctionMappingExtractor.java:409)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$null$10(FunctionMappingExtractor.java:385)
>>>     at java.util.Optional.orElseGet(Optional.java:267)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$extractArgumentTemplates$11(FunctionMappingExtractor.java:383)
>>>     at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
>>>     at 
>>> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>>>     at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
>>>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>>     at 
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>>     at 
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>     at 
>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractArgumentTemplates(FunctionMappingExtractor.java:387)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createParameterSignatureExtraction$9(FunctionMappingExtractor.java:364)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:324)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
>>>     at 
>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
>>>     ... 23 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not 
>>> extract a data type from 'scala.Function1<java.lang.Object, 
>>> java.lang.Object>'. Interpreting it as a structured type was also not 
>>> successful.
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:270)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:212)
>>>     ... 43 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Class 
>>> 'scala.Function1' must not be abstract.
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
>>>     at 
>>> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:162)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:453)
>>>     at 
>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:268)
>>>     ... 44 more
>>>
>>> Do you have any advice on this udf?
>>>
>>> Thanks
>>>
>>>
>>> ------------------ Original ------------------
>>> *From: * "Shengkai Fang";<fskm...@gmail.com>;
>>> *Send time:* Tuesday, Aug 3, 2021 8:51 PM
>>> *To:* "Caizhi Weng"<tsreape...@gmail.com>;
>>> *Cc:* "Xuekui"<baixue...@foxmail.com>; "user"<user@flink.apache.org>;
>>> *Subject: * Re: Flink SQL support array transform function
>>>
>>> Hi, Caizhi. Do you think we should support this? Maybe we can open a
>>> jira for this or to align with the spark to support more useful built-in
>>> functions.
>>>
>>>
>>> Caizhi Weng <tsreape...@gmail.com> 于2021年8月3日周二 下午3:42写道:
>>>
>>>> Hi!
>>>> Currently there is no such built-in function in Flink SQL. You can try
>>>> to write your own user-defined function[1] to achieve this.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
>>>>
>>>> Xuekui <baixue...@foxmail.com> 于2021年8月3日周二 下午3:22写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm using Flink SQL and need to do some transformation for one array
>>>>> column, just like spark sql transform function.
>>>>> https://spark.apache.org/docs/latest/api/sql/index.html#transform
>>>>>
>>>>> I found it's not supported by Flink SQL , is there any plan for it?
>>>>>
>>>>>
>>>>> Thank you
>>>>>
>>>>>
>>>
>>>

Reply via email to