Hi! Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
Can you try the following to see if any of those resolves the problem? 1) On the environment, disable the closure cleaner (in the execution config). 2) Let the CullTimeBase class implement java.io.Serializable. Please let us know how it turns out! Greetings, Stephan On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <m.jungha...@mailbox.org> wrote: > Hi Giacomo, > > I ran into the same issue. Seems to be coupled to the serialization > mechanism of UDFs. I solved it by letting the base class implement the UDF > interface (e.g. FlatMapFunction) and in addition make it generic (which > should be necessary in your example). > > public [abstract] class CullTimeBase<IN, OUT> implements > FlatMapFunction<IN, OUT> { > // ... > } > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, > RainFallPOJO> { > // ... > } > > This should work. > > Best, > Martin > > > On 16.09.2015 10:41, Giacomo Licari wrote: > > Hi guys, > I'm trying to create a base class which is inherited from classes > implementing FlatMap method on specific POJO types. > > It seems inheritance doesn't work, I can access this.PropertyName or > super.PropertyName from flatMap method but values are always null. > > Here the derived class, using RainfallPOJO: > > public class CullTimeRainfall extends CullTimeBase implements > FlatMapFunction<RainfallPOJO, RainfallPOJO> { > > public CullTimeRainfall(int num, int den, String time_data_name, String > start_time, String end_time, int interval, String time_unit){ > super(num, den, time_data_name, start_time, end_time, interval, time_unit); > } > > public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws > Exception { > DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); > try { > Date time = formatter.parse( obj.getTime().replaceAll( > "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); > if(time.after(this.startTime) && time.before(this.endTime)){ > coll.collect(obj); > } > } catch(Exception e){ > e.printStackTrace(); > } > } > } > > My Base class is: > > public class CullTimeBase { > > protected int numerator; > protected int denominator; > protected String timeDataName; > protected Date startTime; > protected Date endTime; > protected int interval; > protected String timeUnit; > public CullTimeBase(int num, int den, String time_data_name, String > start_time, String end_time, int interv, String time_unit){ > numerator = num; > denominator = den; > timeDataName = time_data_name; > interval = interv; > timeUnit = time_unit; > DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); > try { > startTime = formatter.parse(start_time); > endTime = formatter.parse(end_time); > } catch (ParseException e) { > e.printStackTrace(); > } > } > > It works only if I declare all variables and methods in only one class, > but so I should repeat the same properties in more classes. I would only > specialize each derived class with a custom flatMap method. which uses a > custom POJO type. > > Thanks a lot, > Giacomo > > >