Thanks Dian, that resolved my issues.
On Sun, May 21, 2023 at 6:55 PM Dian Fu <dian0511...@gmail.com> wrote: > > Hi Tom, > > The following statement is incorrect. > ``` > CREATE FUNCTION add AS 'custom_udf_2.MyUDF' > LANGUAGE PYTHON; > ``` > > You should define it as following: > custom_udf_2.py > ``` > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes > > class MyUDF(ScalarFunction): > > def __init__(self): > self.counter = None > > def open(self, function_context): > self.counter = function_context.get_metric_group().counter("my_counter") > > def eval(self, x, y): > self.counter.inc() > return x + y > > my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT()) > ``` > > And then use it in SQL as following: > ``` > CREATE FUNCTION add AS 'custom_udf_2.my_udf' > LANGUAGE PYTHON; > ``` > > Regards, > Dian > > On Fri, May 19, 2023 at 6:23 AM tom yang <ensc...@gmail.com> wrote: >> >> Hi >> >> >> I am trying to create a flinksql program using python udf & using >> metrics. This is my sample python file >> >> custom_udf_2.py >> >> ``` >> from pyflink.table.udf import ScalarFunction, udf >> from pyflink.table import DataTypes >> >> class MyUDF(ScalarFunction): >> >> def __init__(self): >> self.counter = None >> >> def open(self, function_context): >> self.counter = function_context.get_metric_group().counter("my_counter") >> >> def eval(self, x, y): >> self.counter.inc() >> return x + y >> >> ``` >> >> This is my sql script >> >> ``` >> CREATE FUNCTION add AS 'custom_udf_2.MyUDF' >> LANGUAGE PYTHON; >> >> CREATE TABLE datagen ( >> a BIGINT, >> b BIGINT >> ) WITH ( >> 'connector' = 'datagen', >> 'fields.a.kind'='sequence', >> 'fields.a.start'='1', >> 'fields.a.end'='8', >> 'fields.b.kind'='sequence', >> 'fields.b.start'='4', >> 'fields.b.end'='11' >> ); >> >> CREATE TABLE print_sink ( >> `sum` BIGINT >> ) WITH ( >> 'connector' = 'print' >> ); >> >> >> INSERT into print_sink ( >> select add(a,b) FROM datagen >> ); >> >> ``` >> >> When I try to execute this program I get the following >> >> >> ``` >> /bin/sql-client.sh -f ~/python_udf_lab.sql >> --pyFiles ~/custom_udf_2.py >> >> Flink SQL> [INFO] Execute statement succeed. >> >> Flink SQL> >> > CREATE TABLE datagen ( >> > a BIGINT, >> > b BIGINT >> > ) WITH ( >> > 'connector' = 'datagen', >> > 'fields.a.kind'='sequence', >> > 'fields.a.start'='1', >> > 'fields.a.end'='8', >> > 'fields.b.kind'='sequence', >> > 'fields.b.start'='4', >> > 'fields.b.end'='11' >> > )[INFO] Execute statement succeed. >> >> Flink SQL> >> > CREATE TABLE print_sink ( >> > `sum` BIGINT >> > ) WITH ( >> > 'connector' = 'print' >> > )[INFO] Execute statement succeed. >> >> Flink SQL> >> > >> > INSERT into print_sink ( >> > select add(a,b) FROM datagen >> > )[ERROR] Could not execute SQL statement. Reason: >> java.lang.IllegalStateException: Instantiating python function >> 'custom_udf_2.MyUDF' failed. >> ``` >> >> >> Ive tried multiple variations of >> CREATE FUNCTION add AS 'custom_udf_2.MyUDF' >> LANGUAGE PYTHON; >> >> CREATE FUNCTION add AS 'MyUDF' >> LANGUAGE PYTHON; >> >> >> fyi this is on flink 1.16.1 & python 3.9.13 >> >> Admittingly I haven’t any documentation on the official documentation >> with this usage. Is this usecase currently supported? >> I know that it works with sql if I change the add function as, >> >> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], >> result_type=DataTypes.BIGINT()) >> def add(i, j): >> return i + j >> >> but then it doesn’t create any metrics >> >> Does anyone has any idea how I can get this to work specifically with >> flinksql with python udf metrics >> >> Thanks, >> Tom