Hi Chiwan, I'm using Flink 0.9. 1 Cheers, Giacomo I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this. Currently, there is no way to use derived class with CSV input.
Thank you for reporting. [1] https://issues.apache.org/jira/browse/FLINK-2690 [2] https://issues.apache.org/jira/browse/FLINK-2637 [3] https://github.com/apache/flink/pull/1134 Regards, Chiwan Park > On Sep 17, 2015, at 1:33 AM, Chiwan Park <chiwanp...@apache.org> wrote: > > It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine. > I will create a JIRA issue for this and submit a patch to fix it. > > Which version of Flink are used? > > Regards, > Chiwan Park > >> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <giacomo.lic...@gmail.com> wrote: >> >> Yes I did. >> >> if anyone has a bypass solution, let us know. >> >> Regards, >> Giacomo Licari >> >> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <chiwanp...@apache.org> wrote: >> Hi Giacomo, >> >> Did you create constructors without arguments in both base class and derived class? >> If you do, it seems like a bug. >> >> Regards, >> Chiwan Park >> >>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <giacomo.lic...@gmail.com> wrote: >>> >>> Hi Chiwan, >>> I followed instructions in documentation. >>> I have a simple base class with some properties (all public). >>> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. >>> >>> Now when I execute: >>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) >>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>> >>> I receive: >>> There is no field called "table" in com.Flink.POJO.TwitterPOJO >>> >>> table is a field of the Base class, declared as public with also getter and setter. >>> >>> Thank you for your help. >>> >>> Giacomo >>> >>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <chiwanp...@apache.org> wrote: >>> Hi Giacomo, >>> >>> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. >>> >>> Maybe the documentation in homepage [1] would be helpful. >>> >>> Regards, >>> Chiwan Park >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos >>> >>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> wrote: >>>> >>>> I run it only implementing java.io.Serializable without disabling the closure cleaner. >>>> >>>> Another question I have is about POJO classes. >>>> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). >>>> >>>> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". >>>> >>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) >>>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>>> >>>> I obtain this error: >>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type >>>> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo >>>> >>>> Greetings, >>>> G.L. >>>> >>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote: >>>> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari < giacomo.lic...@gmail.com> wrote: >>>> Thank you Martin and Stephan for your help. >>>> I tried directly to implement java.io.Serializable in Base class and it worked perfectly! >>>> >>>> Now I can develop more flexible and maintainable code. Thank you a lot guys. >>>> >>>> Greetings, >>>> Giacomo >>>> >>>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote: >>>> Hi! >>>> >>>> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. >>>> >>>> Can you try the following to see if any of those resolves the problem? >>>> >>>> 1) On the environment, disable the closure cleaner (in the execution config). >>>> >>>> 2) Let the CullTimeBase class implement java.io.Serializable. >>>> >>>> Please let us know how it turns out! >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns < m.jungha...@mailbox.org> wrote: >>>> Hi Giacomo, >>>> >>>> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). >>>> >>>> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { >>>> // ... >>>> } >>>> >>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { >>>> // ... >>>> } >>>> >>>> This should work. >>>> >>>> Best, >>>> Martin >>>> >>>> >>>> On 16.09.2015 10:41, Giacomo Licari wrote: >>>>> Hi guys, >>>>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. >>>>> >>>>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. >>>>> >>>>> Here the derived class, using RainfallPOJO: >>>>> >>>>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { >>>>> >>>>> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ >>>>> super(num, den, time_data_name, start_time, end_time, interval, time_unit); >>>>> } >>>>> >>>>> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { >>>>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); >>>>> try { >>>>> Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >>>>> if(time.after(this.startTime) && time.before(this.endTime)){ >>>>> coll.collect(obj); >>>>> } >>>>> } catch(Exception e){ >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> } >>>>> >>>>> My Base class is: >>>>> >>>>> public class CullTimeBase { >>>>> >>>>> protected int numerator; >>>>> protected int denominator; >>>>> protected String timeDataName; >>>>> protected Date startTime; >>>>> protected Date endTime; >>>>> protected int interval; >>>>> protected String timeUnit; >>>>> >>>>> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ >>>>> numerator = num; >>>>> denominator = den; >>>>> timeDataName = time_data_name; >>>>> interval = interv; >>>>> timeUnit = time_unit; >>>>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); >>>>> try { >>>>> startTime = formatter.parse(start_time); >>>>> endTime = formatter.parse(end_time); >>>>> } catch (ParseException e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. >>>>> >>>>> Thanks a lot, >>>>> Giacomo >>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> >> > > >