Thank you Martin and Stephan for your help. I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
Now I can develop more flexible and maintainable code. Thank you a lot guys. Greetings, Giacomo On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Interesting case. We use plain Java Serialization to distribute UDFs, and > perform additional "cleaning" of scopes, which may be causing the issue. > > Can you try the following to see if any of those resolves the problem? > > 1) On the environment, disable the closure cleaner (in the execution > config). > > 2) Let the CullTimeBase class implement java.io.Serializable. > > Please let us know how it turns out! > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <m.jungha...@mailbox.org > > wrote: > >> Hi Giacomo, >> >> I ran into the same issue. Seems to be coupled to the serialization >> mechanism of UDFs. I solved it by letting the base class implement the UDF >> interface (e.g. FlatMapFunction) and in addition make it generic (which >> should be necessary in your example). >> >> public [abstract] class CullTimeBase<IN, OUT> implements >> FlatMapFunction<IN, OUT> { >> // ... >> } >> >> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, >> RainFallPOJO> { >> // ... >> } >> >> This should work. >> >> Best, >> Martin >> >> >> On 16.09.2015 10:41, Giacomo Licari wrote: >> >> Hi guys, >> I'm trying to create a base class which is inherited from classes >> implementing FlatMap method on specific POJO types. >> >> It seems inheritance doesn't work, I can access this.PropertyName or >> super.PropertyName from flatMap method but values are always null. >> >> Here the derived class, using RainfallPOJO: >> >> public class CullTimeRainfall extends CullTimeBase implements >> FlatMapFunction<RainfallPOJO, RainfallPOJO> { >> >> public CullTimeRainfall(int num, int den, String time_data_name, String >> start_time, String end_time, int interval, String time_unit){ >> super(num, den, time_data_name, start_time, end_time, interval, >> time_unit); >> } >> >> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) >> throws Exception { >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); >> try { >> Date time = formatter.parse( obj.getTime().replaceAll( >> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >> if(time.after(this.startTime) && time.before(this.endTime)){ >> coll.collect(obj); >> } >> } catch(Exception e){ >> e.printStackTrace(); >> } >> } >> } >> >> My Base class is: >> >> public class CullTimeBase { >> >> protected int numerator; >> protected int denominator; >> protected String timeDataName; >> protected Date startTime; >> protected Date endTime; >> protected int interval; >> protected String timeUnit; >> public CullTimeBase(int num, int den, String time_data_name, String >> start_time, String end_time, int interv, String time_unit){ >> numerator = num; >> denominator = den; >> timeDataName = time_data_name; >> interval = interv; >> timeUnit = time_unit; >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); >> try { >> startTime = formatter.parse(start_time); >> endTime = formatter.parse(end_time); >> } catch (ParseException e) { >> e.printStackTrace(); >> } >> } >> >> It works only if I declare all variables and methods in only one class, >> but so I should repeat the same properties in more classes. I would only >> specialize each derived class with a custom flatMap method. which uses a >> custom POJO type. >> >> Thanks a lot, >> Giacomo >> >> >> >