Hi folks,

I’ve tried to create some Flink UDAFs that I’m invoking using the Table / SQL 
api.  In these UDAFs I’ve overridden the open() method to perform some setup 
operations (in my case initialize some metric counters). I noticed that this 
open() function isn’t being invoked in either the Dataset or the Datastream 
versions. Incidentally when I tested this out with a UDF, the open method ends 
up getting invoked just fine. Anyone know if this is a known issue else what I 
might be doing incorrectly?

I’ve been able to write a short repo to demonstrate this here - 
https://gist.github.com/piyushnarang/fe562060789ffeb01d59dcc3da375849

I’m not super familiar with the planner code, though I did try to dig into it a 
bit in a debugger for the DataSet scenario and it looks like we are creating a 
DataSetAggFunction<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala#L46>
 in the AggregateUtil.createDataSetAggregateFunctions. However, the 
DataSetAggFunction.open() doesn’t seem to be invoking the open() / close() 
methods on the underlying functions (not sure if this is intended).

Thanks,

-- Piyush

Reply via email to