Thanks Dian, that resolved my issues.

On Sun, May 21, 2023 at 6:55 PM Dian Fu <dian0511...@gmail.com> wrote:
>
> Hi Tom,
>
> The following statement is incorrect.
> ```
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
> ```
>
> You should define it as following:
> custom_udf_2.py
> ```
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> class MyUDF(ScalarFunction):
>
>   def __init__(self):
>     self.counter = None
>
>   def open(self, function_context):
>     self.counter = function_context.get_metric_group().counter("my_counter")
>
>   def eval(self, x, y):
>     self.counter.inc()
>     return x + y
>
> my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT())
> ```
>
> And then use it in SQL as following:
> ```
> CREATE FUNCTION add AS 'custom_udf_2.my_udf'
> LANGUAGE PYTHON;
> ```
>
> Regards,
> Dian
>
> On Fri, May 19, 2023 at 6:23 AM tom yang <ensc...@gmail.com> wrote:
>>
>> Hi
>>
>>
>> I am trying to create a flinksql program using python udf & using
>> metrics. This is my sample python file
>>
>> custom_udf_2.py
>>
>> ```
>> from pyflink.table.udf import ScalarFunction, udf
>> from pyflink.table import DataTypes
>>
>> class MyUDF(ScalarFunction):
>>
>>   def __init__(self):
>>     self.counter = None
>>
>>   def open(self, function_context):
>>     self.counter = function_context.get_metric_group().counter("my_counter")
>>
>>   def eval(self, x, y):
>>     self.counter.inc()
>>     return x + y
>>
>> ```
>>
>> This is my sql script
>>
>> ```
>> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
>> LANGUAGE PYTHON;
>>
>> CREATE TABLE datagen (
>>     a BIGINT,
>>     b BIGINT
>> ) WITH (
>>     'connector' = 'datagen',
>>     'fields.a.kind'='sequence',
>>     'fields.a.start'='1',
>>     'fields.a.end'='8',
>>     'fields.b.kind'='sequence',
>>     'fields.b.start'='4',
>>     'fields.b.end'='11'
>> );
>>
>> CREATE TABLE print_sink (
>>     `sum` BIGINT
>> ) WITH (
>>     'connector' = 'print'
>> );
>>
>>
>> INSERT into print_sink (
>>     select add(a,b) FROM datagen
>> );
>>
>> ```
>>
>> When I try to execute this program I get the following
>>
>>
>> ```
>> /bin/sql-client.sh -f ~/python_udf_lab.sql
>>     --pyFiles ~/custom_udf_2.py
>>
>> Flink SQL> [INFO] Execute statement succeed.
>>
>> Flink SQL>
>> > CREATE TABLE datagen (
>> >     a BIGINT,
>> >     b BIGINT
>> > ) WITH (
>> >     'connector' = 'datagen',
>> >     'fields.a.kind'='sequence',
>> >     'fields.a.start'='1',
>> >     'fields.a.end'='8',
>> >     'fields.b.kind'='sequence',
>> >     'fields.b.start'='4',
>> >     'fields.b.end'='11'
>> > )[INFO] Execute statement succeed.
>>
>> Flink SQL>
>> > CREATE TABLE print_sink (
>> >     `sum` BIGINT
>> > ) WITH (
>> >     'connector' = 'print'
>> > )[INFO] Execute statement succeed.
>>
>> Flink SQL>
>> >
>> > INSERT into print_sink (
>> >     select add(a,b) FROM datagen
>> > )[ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Instantiating python function
>> 'custom_udf_2.MyUDF' failed.
>> ```
>>
>>
>> Ive tried multiple variations of
>> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
>> LANGUAGE PYTHON;
>>
>> CREATE FUNCTION add AS 'MyUDF'
>> LANGUAGE PYTHON;
>>
>>
>> fyi this is on flink 1.16.1 & python 3.9.13
>>
>> Admittingly I haven’t any documentation on the official documentation
>> with this usage. Is this usecase currently supported?
>> I know that it works with sql if I change the add function as,
>>
>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
>> result_type=DataTypes.BIGINT())
>> def add(i, j):
>>   return i + j
>>
>> but then it doesn’t create any metrics
>>
>> Does anyone has any idea how I can get this to work specifically with
>> flinksql with python udf metrics
>>
>> Thanks,
>> Tom

Reply via email to