It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine. I will create a JIRA issue for this and submit a patch to fix it.
Which version of Flink are used? Regards, Chiwan Park > On Sep 17, 2015, at 12:20 AM, Giacomo Licari <giacomo.lic...@gmail.com> wrote: > > Yes I did. > > if anyone has a bypass solution, let us know. > > Regards, > Giacomo Licari > > On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Giacomo, > > Did you create constructors without arguments in both base class and derived > class? > If you do, it seems like a bug. > > Regards, > Chiwan Park > > > On Sep 17, 2015, at 12:04 AM, Giacomo Licari <giacomo.lic...@gmail.com> > > wrote: > > > > Hi Chiwan, > > I followed instructions in documentation. > > I have a simple base class with some properties (all public). > > Then I extend that class with a new public property (tweet in my case), I > > provide also getter and setter for that property. > > > > Now when I execute: > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > I receive: > > There is no field called "table" in com.Flink.POJO.TwitterPOJO > > > > table is a field of the Base class, declared as public with also getter and > > setter. > > > > Thank you for your help. > > > > Giacomo > > > > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <chiwanp...@apache.org> wrote: > > Hi Giacomo, > > > > You should set your field as public. If you are set your field as private > > or protected, the class must provide getter and setter to be treated as > > POJO. > > > > Maybe the documentation in homepage [1] would be helpful. > > > > Regards, > > Chiwan Park > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> > > > wrote: > > > > > > I run it only implementing java.io.Serializable without disabling the > > > closure cleaner. > > > > > > Another question I have is about POJO classes. > > > I would also create a base POJO class with some common proprerties, and > > > then extend it in new classes. These classes are used to convert a CSV > > > into a dataset of POJO objects (of derived class type). > > > > > > In the following example, I create a DataSet of TwitterPOJO, which > > > extends a Base class, adding the new property "tweet". > > > > > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > > > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > > > I obtain this error: > > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class > > > com.Flink.POJO.TwitterPOJO is not a valid POJO type > > > Exception in thread "main" java.lang.ClassCastException: > > > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > > > org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > > > Greetings, > > > G.L. > > > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote: > > > Could you also try the other variant (disabeling the closure cleaner)? I > > > would be curious if this behavior is expected Java Serialization > > > behavior, or whether our pre-processing code is causing it. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari > > > <giacomo.lic...@gmail.com> wrote: > > > Thank you Martin and Stephan for your help. > > > I tried directly to implement java.io.Serializable in Base class and it > > > worked perfectly! > > > > > > Now I can develop more flexible and maintainable code. Thank you a lot > > > guys. > > > > > > Greetings, > > > Giacomo > > > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote: > > > Hi! > > > > > > Interesting case. We use plain Java Serialization to distribute UDFs, and > > > perform additional "cleaning" of scopes, which may be causing the issue. > > > > > > Can you try the following to see if any of those resolves the problem? > > > > > > 1) On the environment, disable the closure cleaner (in the execution > > > config). > > > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > > > Please let us know how it turns out! > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns > > > <m.jungha...@mailbox.org> wrote: > > > Hi Giacomo, > > > > > > I ran into the same issue. Seems to be coupled to the serialization > > > mechanism of UDFs. I solved it by letting the base class implement the > > > UDF interface (e.g. FlatMapFunction) and in addition make it generic > > > (which should be necessary in your example). > > > > > > public [abstract] class CullTimeBase<IN, OUT> implements > > > FlatMapFunction<IN, OUT> { > > > // ... > > > } > > > > > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, > > > RainFallPOJO> { > > > // ... > > > } > > > > > > This should work. > > > > > > Best, > > > Martin > > > > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > > >> Hi guys, > > >> I'm trying to create a base class which is inherited from classes > > >> implementing FlatMap method on specific POJO types. > > >> > > >> It seems inheritance doesn't work, I can access this.PropertyName or > > >> super.PropertyName from flatMap method but values are always null. > > >> > > >> Here the derived class, using RainfallPOJO: > > >> > > >> public class CullTimeRainfall extends CullTimeBase implements > > >> FlatMapFunction<RainfallPOJO, RainfallPOJO> { > > >> > > >> public CullTimeRainfall(int num, int den, String time_data_name, > > >> String start_time, String end_time, int interval, String time_unit){ > > >> super(num, den, time_data_name, start_time, end_time, > > >> interval, time_unit); > > >> } > > >> > > >> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) > > >> throws Exception { > > >> DateFormat formatter = new > > >> SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); > > >> try { > > >> Date time = formatter.parse( obj.getTime().replaceAll( > > >> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); > > >> if(time.after(this.startTime) && time.before(this.endTime)){ > > >> coll.collect(obj); > > >> } > > >> } catch(Exception e){ > > >> e.printStackTrace(); > > >> } > > >> } > > >> > > >> } > > >> > > >> My Base class is: > > >> > > >> public class CullTimeBase { > > >> > > >> protected int numerator; > > >> protected int denominator; > > >> protected String timeDataName; > > >> protected Date startTime; > > >> protected Date endTime; > > >> protected int interval; > > >> protected String timeUnit; > > >> > > >> public CullTimeBase(int num, int den, String time_data_name, String > > >> start_time, String end_time, int interv, String time_unit){ > > >> numerator = num; > > >> denominator = den; > > >> timeDataName = time_data_name; > > >> interval = interv; > > >> timeUnit = time_unit; > > >> DateFormat formatter = new > > >> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); > > >> try { > > >> startTime = formatter.parse(start_time); > > >> endTime = formatter.parse(end_time); > > >> } catch (ParseException e) { > > >> e.printStackTrace(); > > >> } > > >> } > > >> > > >> It works only if I declare all variables and methods in only one class, > > >> but so I should repeat the same properties in more classes. I would only > > >> specialize each derived class with a custom flatMap method. which uses a > > >> custom POJO type. > > >> > > >> Thanks a lot, > > >> Giacomo > > > > > > > > > > > > > > > > > > > > > > > > > > > >