At least I hope it has been fixed. Which version and planner are you using?
On 11.12.19 11:47, Arujit Pradhan wrote:
Hi Timo,
Thanks for the bug reference.
You mentioned that this bug has been fixed. Is the fix available for
flink 1.9+ and default query planner.
Thanks and regards,
/Arujit/
On Wed, Dec 11, 2019 at 3:56 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
I remember that we fixed some bug around this topic recently. The
legacy
planner should not be affected.
There is another user reporting this:
https://issues.apache.org/jira/browse/FLINK-15040
Regards,
Timo
On 11.12.19 10:34, Dawid Wysakowicz wrote:
> Hi Arujit,
>
> Could you also share the query where you use this UDF? It would also
> help if you said which version of Flink you are using and which
planner.
>
> Best,
>
> Dawid
>
> On 11/12/2019 10:21, Arujit Pradhan wrote:
>> Hi all,
>>
>> So we are creating some User Defined Functions of type
>> AggregateFunction. And we want to send some static metrics from the
>> *open()* method of the UDFs as we can get *MetricGroup *by
>> *FunctionContext *which is only exposed in the open method. Our
code
>> looks something like this(Which is an implementation of count
distinct
>> in SQL) :
>>
>> public class DistinctCount extends AggregateFunction<Integer,
>> DistinctCountAccumulator> { @Override public
DistinctCountAccumulator
>> createAccumulator() { return new DistinctCountAccumulator(); }
>> @Override public void open(FunctionContext context) throws
Exception { super.open(context); MetricGroup metricGroup =
context.getMetricGroup(); // add some metric to the group here
>> System.out.println("in the open of UDF"); } @Override public void
>> close() throws Exception { super.close(); } @Override public
Integer
>> getValue(DistinctCountAccumulator distinctCountAccumulator) {
System.out.println("in the udf"); return
distinctCountAccumulator.count(); } public void
accumulate(DistinctCountAccumulator distinctCountAccumulator, String
item) { if (item== null) { return; }
distinctCountAccumulator.add(item); } }
>>
>> But when we use this UDF in FlinkSQL, it seems like the open
method is
>> not being called at all.
>>
>> From the filnk UDF documentation we find :
>>
>> *The |open()| method is called once before the evaluation
method. The
>> |close()| method after the last call to the evaluation method.*
>>
>> *The |open()| method provides a |FunctionContext| that contains
>> information about the context in which user-defined functions are
>> executed, such as the metric group, the distributed cache files, or
>> the global job parameters.*
>>
>> Then is there any reason that open is not working in
>> AggragateFunctions. Btw it works fine in case of
ScalarFunctions. Is
>> there any alternative scope where we can register some static
metrics
>> in a UDF.
>>
>>
>> Thanks and regards,
>> /Arujit/
>>