Hi Tom, The following statement is incorrect. ``` CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; ```
You should define it as following: custom_udf_2.py ``` from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, x, y): self.counter.inc() return x + y my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT()) ``` And then use it in SQL as following: ``` CREATE FUNCTION add AS 'custom_udf_2.my_udf' LANGUAGE PYTHON; ``` Regards, Dian On Fri, May 19, 2023 at 6:23 AM tom yang <ensc...@gmail.com> wrote: > Hi > > > I am trying to create a flinksql program using python udf & using > metrics. This is my sample python file > > custom_udf_2.py > > ``` > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes > > class MyUDF(ScalarFunction): > > def __init__(self): > self.counter = None > > def open(self, function_context): > self.counter = > function_context.get_metric_group().counter("my_counter") > > def eval(self, x, y): > self.counter.inc() > return x + y > > ``` > > This is my sql script > > ``` > CREATE FUNCTION add AS 'custom_udf_2.MyUDF' > LANGUAGE PYTHON; > > CREATE TABLE datagen ( > a BIGINT, > b BIGINT > ) WITH ( > 'connector' = 'datagen', > 'fields.a.kind'='sequence', > 'fields.a.start'='1', > 'fields.a.end'='8', > 'fields.b.kind'='sequence', > 'fields.b.start'='4', > 'fields.b.end'='11' > ); > > CREATE TABLE print_sink ( > `sum` BIGINT > ) WITH ( > 'connector' = 'print' > ); > > > INSERT into print_sink ( > select add(a,b) FROM datagen > ); > > ``` > > When I try to execute this program I get the following > > > ``` > /bin/sql-client.sh -f ~/python_udf_lab.sql > --pyFiles ~/custom_udf_2.py > > Flink SQL> [INFO] Execute statement succeed. > > Flink SQL> > > CREATE TABLE datagen ( > > a BIGINT, > > b BIGINT > > ) WITH ( > > 'connector' = 'datagen', > > 'fields.a.kind'='sequence', > > 'fields.a.start'='1', > > 'fields.a.end'='8', > > 'fields.b.kind'='sequence', > > 'fields.b.start'='4', > > 'fields.b.end'='11' > > )[INFO] Execute statement succeed. > > Flink SQL> > > CREATE TABLE print_sink ( > > `sum` BIGINT > > ) WITH ( > > 'connector' = 'print' > > )[INFO] Execute statement succeed. > > Flink SQL> > > > > INSERT into print_sink ( > > select add(a,b) FROM datagen > > )[ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalStateException: Instantiating python function > 'custom_udf_2.MyUDF' failed. > ``` > > > Ive tried multiple variations of > CREATE FUNCTION add AS 'custom_udf_2.MyUDF' > LANGUAGE PYTHON; > > CREATE FUNCTION add AS 'MyUDF' > LANGUAGE PYTHON; > > > fyi this is on flink 1.16.1 & python 3.9.13 > > Admittingly I haven’t any documentation on the official documentation > with this usage. Is this usecase currently supported? > I know that it works with sql if I change the add function as, > > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT()) > def add(i, j): > return i + j > > but then it doesn’t create any metrics > > Does anyone has any idea how I can get this to work specifically with > flinksql with python udf metrics > > Thanks, > Tom >