There are multiple ways of having a more generic UDF. I will use pseudo code here:

// supports any input
def eval(@DataTypeHint(inputGroup = ANY) Object o): String = {

}

// or you use no annotations at all and simply define a strategy
// default input strategy is wildcard
def eval(Map[Row, Integer]): String = {
...
}

override def getTypeInference() = {
  TypeInference.newBuilder()
  .outputStrategy(callContext => Optional.of(DataTypes.STRING()))
  .build();
}

You can also check the logical type using the callContext in outputStrategy().

Regards,
Timo


On 10.11.21 07:25, JING ZHANG wrote:
Hi, vtygoss
I add the UDF in my latest email in `AggregateITCase` of Flink master branch, it works well. I need some time to find out why it doesn't work in branch 1.12.0. I would give you response as soon as possible.

Best regards,
JING zhang


vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年11月10日周三 上午11:53写道:

    Hi, JING ZHANG!

    I tried not to define the `annotation` of eval method as possible,
    but it didn't work with exception below.  Then I found a way

    to make it work by defining `annotation`.


    - 1. is there a way to avoid to define `annotation`? It's very
    important for generality of UDF.

    - 2. I am using Flink 1.12.0, which version do you use to make it
    work without `annotation`?



    Best Regards!


    ```

    def eval(data: java.util.Map[Row, Integer]):String = {
       if (data ==null || data.size() ==0) {
         return ""
    }
       data.keySet().toArray().mkString(",")
    }

    ```






    在 2021年11月10日 11:33,JING ZHANG<beyond1...@gmail.com
    <mailto:beyond1...@gmail.com>> 写道:

    Hi vtygoss,
    I'm a little confused.
    The UDF could already work well without defining `DataTypeHint
    `annotation.
    Why do you define `DataTypeHint `annotation before input parameter
    of  `eval `method?

    Best,
    JING ZHANG

    vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年11月9日周
    二 下午8:17写道:

        Hi, JING ZHANG!

        Thanks for your many times of help.


        I already try to use COLLECT(ROW(id, name)) and store the result
        with type String(for POC test).  So I try to define an UDF, and
        the annotation of function eval must be defined as
        "MULTISET<ROW<...>>" as below, otherwise exception "..RAW/MAP
        expected. but MULTISET<ROW<`EXPR$0` STRING NOT NULL, `EXPR$1`
        STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT
        NULL> NOT NULL passed" thrown.


        And i think this way of UDF's annotation maybe not a convenient
        way for general scenario. I can't define many UDFs for all
        RowData structures, such as Row<id>, Row<id, name>, Row<.....>


        Is there any way to define the annotation for dynamic RowData
        structure?

        Thanks for your suggestions again.


        Best Regards!


        ```

        def eval(@DataTypeHint("MULTISET<ROW<" +
           "vital_sign_id STRING, cdt_vital_sign_index STRING, " +
           "unit_name STRING, parameter_value STRING, measure_datetime
        STRING>" +
           ">") data: JMAP[Row, Integer]):String = {
           if (data ==null || data.size() ==0) {
             return ""
        }
           data.keySet().toArray().mkString(",")
        }

        ```





        在 2021年11月8日 21:26,JING ZHANG<beyond1...@gmail.com
        <mailto:beyond1...@gmail.com>> 写道:

        Hi Vtygoss,
        You could try the following SQL:
        ```

        select COLLECT(ROW(id, name)) as info

        from table

        group by ...;

        ```

        In the above sql, the result type of `COLLECT(ROW(id, name))` is
        MULTISET<ROW>.

        `CollectAggFunction` would store the data in a MapState. key is
        element type, represent the row value. value is Integer type,
        represents the count of row.


        If you need to define a UDF which handles the result from
        `COLLECT(ROW(id, name))`, you could use Map<Row, Integer> as
        input parameter type.

        The following code is a demo. Hope it helps.

        tEnv.registerFunction("TestFunc", TestFunc)
        tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from 
table
        group by ...")

        ....

        @SerialVersionUID(1L)
        object TestFuncextends ScalarFunction {
           def eval(s: java.util.Map[Row, Integer]):String = 
s.keySet().mkString("\n")
        }

        Best regards,

        JING ZHANG


        vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年11月8
        日周一 下午7:00写道:

            Hi, flink community!


            I am working on migrating data production pipeline from
            SparkSQL to FlinkSQL(1.12.0). And i meet a problem about
            MULTISET<STRUCT<....>>.


            ```

            Spark SQL


            select COLLECT_LIST(named_struct('id', id, 'name', name)) as
            info

            from table

            group by ...;


            ```


            - 1. how to express and store this data structure in flink?

            I tried to express by MULTISET<ROW<id long, name string>> in
            FlinkSQL. But it seems that ORC / JSON / AVRO format cann't
            store this type.

            - 2.  How to read MULTISET<Row<id long, name string>> in
            FlinkSQL?

            If i need to define a function, which type should be briged
            to for MultiSet<Row<id long, name string>>?


            Is there any other way more convenient to solute this problem?

            Thanks very much for your any suggestions or replies.


            Best Regards!


Reply via email to