I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a 
patch to solve this.
Currently, there is no way to use derived class with CSV input.

Thank you for reporting.

[1] https://issues.apache.org/jira/browse/FLINK-2690
[2] https://issues.apache.org/jira/browse/FLINK-2637
[3] https://github.com/apache/flink/pull/1134

Regards,
Chiwan Park

> On Sep 17, 2015, at 1:33 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> 
> It seems like a bug of CsvInputFormat. I succeed in reproducing in my local 
> machine.
> I will create a JIRA issue for this and submit a patch to fix it.
> 
> Which version of Flink are used?
> 
> Regards,
> Chiwan Park
> 
>> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <giacomo.lic...@gmail.com> 
>> wrote:
>> 
>> Yes I did.
>> 
>> if anyone has a bypass solution, let us know.
>> 
>> Regards,
>> Giacomo Licari
>> 
>> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <chiwanp...@apache.org> wrote:
>> Hi Giacomo,
>> 
>> Did you create constructors without arguments in both base class and derived 
>> class?
>> If you do, it seems like a bug.
>> 
>> Regards,
>> Chiwan Park
>> 
>>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <giacomo.lic...@gmail.com> 
>>> wrote:
>>> 
>>> Hi Chiwan,
>>> I followed instructions in documentation.
>>> I have a simple base class with some properties (all public).
>>> Then I extend that class with a new public property (tweet in my case), I 
>>> provide also getter and setter for that property.
>>> 
>>> Now when I execute:
>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>> 
>>> I receive:
>>> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>>> 
>>> table is a field of the Base class, declared as public with also getter and 
>>> setter.
>>> 
>>> Thank you for your help.
>>> 
>>> Giacomo
>>> 
>>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <chiwanp...@apache.org> wrote:
>>> Hi Giacomo,
>>> 
>>> You should set your field as public. If you are set your field as private 
>>> or protected, the class must provide getter and setter to be treated as 
>>> POJO.
>>> 
>>> Maybe the documentation in homepage [1] would be helpful.
>>> 
>>> Regards,
>>> Chiwan Park
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>>> 
>>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> 
>>>> wrote:
>>>> 
>>>> I run it only implementing java.io.Serializable without disabling the 
>>>> closure cleaner.
>>>> 
>>>> Another question I have is about POJO classes.
>>>> I would also create a base POJO class with some common proprerties, and 
>>>> then extend it in new classes. These classes are used to convert a CSV 
>>>> into a dataset of POJO objects (of derived class type).
>>>> 
>>>> In the following example, I create a DataSet of TwitterPOJO, which extends 
>>>> a Base class, adding the new property "tweet".
>>>> 
>>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>>                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>>> 
>>>> I obtain this error:
>>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
>>>> com.Flink.POJO.TwitterPOJO is not a valid POJO type
>>>> Exception in thread "main" java.lang.ClassCastException: 
>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo
>>>> 
>>>> Greetings,
>>>> G.L.
>>>> 
>>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:
>>>> Could you also try the other variant (disabeling the closure cleaner)? I 
>>>> would be curious if this behavior is expected Java Serialization behavior, 
>>>> or whether our pre-processing code is causing it.
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <giacomo.lic...@gmail.com> 
>>>> wrote:
>>>> Thank you Martin and Stephan for your help.
>>>> I tried directly to implement java.io.Serializable in Base class and it 
>>>> worked perfectly!
>>>> 
>>>> Now I can develop more flexible and maintainable code. Thank you a lot 
>>>> guys.
>>>> 
>>>> Greetings,
>>>> Giacomo
>>>> 
>>>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>>> Hi!
>>>> 
>>>> Interesting case. We use plain Java Serialization to distribute UDFs, and 
>>>> perform additional "cleaning" of scopes, which may be causing the issue.
>>>> 
>>>> Can you try the following to see if any of those resolves the problem?
>>>> 
>>>> 1) On the environment, disable the closure cleaner (in the execution 
>>>> config).
>>>> 
>>>> 2) Let the CullTimeBase class implement java.io.Serializable.
>>>> 
>>>> Please let us know how it turns out!
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns 
>>>> <m.jungha...@mailbox.org> wrote:
>>>> Hi Giacomo,
>>>> 
>>>> I ran into the same issue. Seems to be coupled to the serialization 
>>>> mechanism of UDFs. I solved it by letting the base class implement the UDF 
>>>> interface (e.g. FlatMapFunction) and in addition make it generic (which 
>>>> should be necessary in your example).
>>>> 
>>>> public [abstract] class CullTimeBase<IN, OUT> implements 
>>>> FlatMapFunction<IN, OUT> {
>>>> // ...
>>>> }
>>>> 
>>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, 
>>>> RainFallPOJO> {
>>>> // ...
>>>> }
>>>> 
>>>> This should work.
>>>> 
>>>> Best,
>>>> Martin
>>>> 
>>>> 
>>>> On 16.09.2015 10:41, Giacomo Licari wrote:
>>>>> Hi guys,
>>>>> I'm trying to create a base class which is inherited from classes 
>>>>> implementing FlatMap method on specific POJO types.
>>>>> 
>>>>> It seems inheritance doesn't work, I can access this.PropertyName or 
>>>>> super.PropertyName from flatMap method but values are always null.
>>>>> 
>>>>> Here the derived class, using RainfallPOJO:
>>>>> 
>>>>> public class CullTimeRainfall extends CullTimeBase implements 
>>>>> FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>>>>> 
>>>>>     public CullTimeRainfall(int num, int den, String time_data_name, 
>>>>> String start_time, String end_time, int interval, String time_unit){
>>>>>             super(num, den, time_data_name, start_time, end_time, 
>>>>> interval, time_unit);
>>>>>     }
>>>>> 
>>>>>     public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) 
>>>>> throws Exception {
>>>>>             DateFormat formatter = new 
>>>>> SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>>>>>             try {
>>>>>                 Date time = formatter.parse( obj.getTime().replaceAll( 
>>>>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>>>>             if(time.after(this.startTime) && time.before(this.endTime)){
>>>>>                             coll.collect(obj);
>>>>>                     }
>>>>>             } catch(Exception e){
>>>>>                     e.printStackTrace();
>>>>>             }
>>>>>     }
>>>>> 
>>>>> }
>>>>> 
>>>>> My Base class is:
>>>>> 
>>>>> public class CullTimeBase {
>>>>> 
>>>>>    protected int numerator;
>>>>>    protected int denominator;
>>>>>    protected String timeDataName;
>>>>>    protected Date startTime;
>>>>>    protected Date endTime;
>>>>>    protected int interval;
>>>>>    protected String timeUnit;
>>>>> 
>>>>>     public CullTimeBase(int num, int den, String time_data_name, String 
>>>>> start_time, String end_time, int interv, String time_unit){
>>>>>             numerator = num;
>>>>>             denominator = den;
>>>>>             timeDataName = time_data_name;
>>>>>             interval = interv;
>>>>>             timeUnit = time_unit;
>>>>>             DateFormat formatter = new 
>>>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
>>>>>             try {
>>>>>                     startTime = formatter.parse(start_time);
>>>>>                     endTime = formatter.parse(end_time);
>>>>>             } catch (ParseException e) {
>>>>>                     e.printStackTrace();
>>>>>             }
>>>>>     }
>>>>> 
>>>>> It works only if I declare all variables and methods in only one class, 
>>>>> but so I should repeat the same properties in more classes. I would only 
>>>>> specialize each derived class with a custom flatMap method. which uses a 
>>>>> custom POJO type.
>>>>> 
>>>>> Thanks a lot,
>>>>> Giacomo
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
> 




Reply via email to