Hi Tom,

The following statement is incorrect.
```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;
```

You should define it as following:
custom_udf_2.py
```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):

  def __init__(self):
    self.counter = None

  def open(self, function_context):
    self.counter = function_context.get_metric_group().counter("my_counter")

  def eval(self, x, y):
    self.counter.inc()
    return x + y

my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT())
```

And then use it in SQL as following:
```
CREATE FUNCTION add AS 'custom_udf_2.my_udf'
LANGUAGE PYTHON;
```

Regards,
Dian

On Fri, May 19, 2023 at 6:23 AM tom yang <ensc...@gmail.com> wrote:

> Hi
>
>
> I am trying to create a flinksql program using python udf & using
> metrics. This is my sample python file
>
> custom_udf_2.py
>
> ```
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> class MyUDF(ScalarFunction):
>
>   def __init__(self):
>     self.counter = None
>
>   def open(self, function_context):
>     self.counter =
> function_context.get_metric_group().counter("my_counter")
>
>   def eval(self, x, y):
>     self.counter.inc()
>     return x + y
>
> ```
>
> This is my sql script
>
> ```
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
>
> CREATE TABLE datagen (
>     a BIGINT,
>     b BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.a.kind'='sequence',
>     'fields.a.start'='1',
>     'fields.a.end'='8',
>     'fields.b.kind'='sequence',
>     'fields.b.start'='4',
>     'fields.b.end'='11'
> );
>
> CREATE TABLE print_sink (
>     `sum` BIGINT
> ) WITH (
>     'connector' = 'print'
> );
>
>
> INSERT into print_sink (
>     select add(a,b) FROM datagen
> );
>
> ```
>
> When I try to execute this program I get the following
>
>
> ```
> /bin/sql-client.sh -f ~/python_udf_lab.sql
>     --pyFiles ~/custom_udf_2.py
>
> Flink SQL> [INFO] Execute statement succeed.
>
> Flink SQL>
> > CREATE TABLE datagen (
> >     a BIGINT,
> >     b BIGINT
> > ) WITH (
> >     'connector' = 'datagen',
> >     'fields.a.kind'='sequence',
> >     'fields.a.start'='1',
> >     'fields.a.end'='8',
> >     'fields.b.kind'='sequence',
> >     'fields.b.start'='4',
> >     'fields.b.end'='11'
> > )[INFO] Execute statement succeed.
>
> Flink SQL>
> > CREATE TABLE print_sink (
> >     `sum` BIGINT
> > ) WITH (
> >     'connector' = 'print'
> > )[INFO] Execute statement succeed.
>
> Flink SQL>
> >
> > INSERT into print_sink (
> >     select add(a,b) FROM datagen
> > )[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Instantiating python function
> 'custom_udf_2.MyUDF' failed.
> ```
>
>
> Ive tried multiple variations of
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
>
> CREATE FUNCTION add AS 'MyUDF'
> LANGUAGE PYTHON;
>
>
> fyi this is on flink 1.16.1 & python 3.9.13
>
> Admittingly I haven’t any documentation on the official documentation
> with this usage. Is this usecase currently supported?
> I know that it works with sql if I change the add function as,
>
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT())
> def add(i, j):
>   return i + j
>
> but then it doesn’t create any metrics
>
> Does anyone has any idea how I can get this to work specifically with
> flinksql with python udf metrics
>
> Thanks,
> Tom
>

Reply via email to