I remember that we fixed some bug around this topic recently. The legacy
planner should not be affected.
There is another user reporting this:
https://issues.apache.org/jira/browse/FLINK-15040
Regards,
Timo
On 11.12.19 10:34, Dawid Wysakowicz wrote:
Hi Arujit,
Could you also share the query where you use this UDF? It would also
help if you said which version of Flink you are using and which planner.
Best,
Dawid
On 11/12/2019 10:21, Arujit Pradhan wrote:
Hi all,
So we are creating some User Defined Functions of type
AggregateFunction. And we want to send some static metrics from the
*open()* method of the UDFs as we can get *MetricGroup *by
*FunctionContext *which is only exposed in the open method. Our code
looks something like this(Which is an implementation of count distinct
in SQL) :
public class DistinctCount extends AggregateFunction<Integer,
DistinctCountAccumulator> { @Override public DistinctCountAccumulator
createAccumulator() { return new DistinctCountAccumulator(); }
@Override public void open(FunctionContext context) throws Exception { super.open(context); MetricGroup metricGroup = context.getMetricGroup(); // add some metric to the group here
System.out.println("in the open of UDF"); } @Override public void
close() throws Exception { super.close(); } @Override public Integer
getValue(DistinctCountAccumulator distinctCountAccumulator) { System.out.println("in the udf"); return distinctCountAccumulator.count(); } public void accumulate(DistinctCountAccumulator distinctCountAccumulator, String item) { if (item== null) { return; } distinctCountAccumulator.add(item); } }
But when we use this UDF in FlinkSQL, it seems like the open method is
not being called at all.
From the filnk UDF documentation we find :
*The |open()| method is called once before the evaluation method. The
|close()| method after the last call to the evaluation method.*
*The |open()| method provides a |FunctionContext| that contains
information about the context in which user-defined functions are
executed, such as the metric group, the distributed cache files, or
the global job parameters.*
Then is there any reason that open is not working in
AggragateFunctions. Btw it works fine in case of ScalarFunctions. Is
there any alternative scope where we can register some static metrics
in a UDF.
Thanks and regards,
/Arujit/