I run it only implementing java.io.Serializable without disabling the closure cleaner.
Another question I have is about POJO classes. I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I obtain this error: [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo Greetings, G.L. On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote: > Could you also try the other variant (disabeling the closure cleaner)? I > would be curious if this behavior is expected Java Serialization behavior, > or whether our pre-processing code is causing it. > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <giacomo.lic...@gmail.com> > wrote: > >> Thank you Martin and Stephan for your help. >> I tried directly to implement java.io.Serializable in Base class and it >> worked perfectly! >> >> Now I can develop more flexible and maintainable code. Thank you a lot >> guys. >> >> Greetings, >> Giacomo >> >> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi! >>> >>> Interesting case. We use plain Java Serialization to distribute UDFs, >>> and perform additional "cleaning" of scopes, which may be causing the issue. >>> >>> Can you try the following to see if any of those resolves the problem? >>> >>> 1) On the environment, disable the closure cleaner (in the execution >>> config). >>> >>> 2) Let the CullTimeBase class implement java.io.Serializable. >>> >>> Please let us know how it turns out! >>> >>> Greetings, >>> Stephan >>> >>> >>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < >>> m.jungha...@mailbox.org> wrote: >>> >>>> Hi Giacomo, >>>> >>>> I ran into the same issue. Seems to be coupled to the serialization >>>> mechanism of UDFs. I solved it by letting the base class implement the UDF >>>> interface (e.g. FlatMapFunction) and in addition make it generic (which >>>> should be necessary in your example). >>>> >>>> public [abstract] class CullTimeBase<IN, OUT> implements >>>> FlatMapFunction<IN, OUT> { >>>> // ... >>>> } >>>> >>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, >>>> RainFallPOJO> { >>>> // ... >>>> } >>>> >>>> This should work. >>>> >>>> Best, >>>> Martin >>>> >>>> >>>> On 16.09.2015 10:41, Giacomo Licari wrote: >>>> >>>> Hi guys, >>>> I'm trying to create a base class which is inherited from classes >>>> implementing FlatMap method on specific POJO types. >>>> >>>> It seems inheritance doesn't work, I can access this.PropertyName or >>>> super.PropertyName from flatMap method but values are always null. >>>> >>>> Here the derived class, using RainfallPOJO: >>>> >>>> public class CullTimeRainfall extends CullTimeBase implements >>>> FlatMapFunction<RainfallPOJO, RainfallPOJO> { >>>> >>>> public CullTimeRainfall(int num, int den, String time_data_name, String >>>> start_time, String end_time, int interval, String time_unit){ >>>> super(num, den, time_data_name, start_time, end_time, interval, >>>> time_unit); >>>> } >>>> >>>> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) >>>> throws Exception { >>>> DateFormat formatter = new >>>> SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); >>>> try { >>>> Date time = formatter.parse( obj.getTime().replaceAll( >>>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >>>> if(time.after(this.startTime) && time.before(this.endTime)){ >>>> coll.collect(obj); >>>> } >>>> } catch(Exception e){ >>>> e.printStackTrace(); >>>> } >>>> } >>>> } >>>> >>>> My Base class is: >>>> >>>> public class CullTimeBase { >>>> >>>> protected int numerator; >>>> protected int denominator; >>>> protected String timeDataName; >>>> protected Date startTime; >>>> protected Date endTime; >>>> protected int interval; >>>> protected String timeUnit; >>>> public CullTimeBase(int num, int den, String time_data_name, String >>>> start_time, String end_time, int interv, String time_unit){ >>>> numerator = num; >>>> denominator = den; >>>> timeDataName = time_data_name; >>>> interval = interv; >>>> timeUnit = time_unit; >>>> DateFormat formatter = new >>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); >>>> try { >>>> startTime = formatter.parse(start_time); >>>> endTime = formatter.parse(end_time); >>>> } catch (ParseException e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> >>>> It works only if I declare all variables and methods in only one class, >>>> but so I should repeat the same properties in more classes. I would only >>>> specialize each derived class with a custom flatMap method. which uses a >>>> custom POJO type. >>>> >>>> Thanks a lot, >>>> Giacomo >>>> >>>> >>>> >>> >> >