Hi Giacomo,

You should set your field as public. If you are set your field as private or 
protected, the class must provide getter and setter to be treated as POJO.

Maybe the documentation in homepage [1] would be helpful.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos

> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> wrote:
> 
> I run it only implementing java.io.Serializable without disabling the closure 
> cleaner.
> 
> Another question I have is about POJO classes. 
> I would also create a base POJO class with some common proprerties, and then 
> extend it in new classes. These classes are used to convert a CSV into a 
> dataset of POJO objects (of derived class type). 
> 
> In the following example, I create a DataSet of TwitterPOJO, which extends a 
> Base class, adding the new property "tweet".
> 
> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>                    .pojoType(TwitterPOJO.class, "table", "time", "tweet"); 
> 
> I obtain this error:
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> com.Flink.POJO.TwitterPOJO is not a valid POJO type
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> 
> Greetings,
> G.L.
> 
> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:
> Could you also try the other variant (disabeling the closure cleaner)? I 
> would be curious if this behavior is expected Java Serialization behavior, or 
> whether our pre-processing code is causing it.
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <giacomo.lic...@gmail.com> 
> wrote:
> Thank you Martin and Stephan for your help.
> I tried directly to implement java.io.Serializable in Base class and it 
> worked perfectly!
> 
> Now I can develop more flexible and maintainable code. Thank you a lot guys.
> 
> Greetings,
> Giacomo
> 
> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> 
> Interesting case. We use plain Java Serialization to distribute UDFs, and 
> perform additional "cleaning" of scopes, which may be causing the issue.
> 
> Can you try the following to see if any of those resolves the problem?
> 
> 1) On the environment, disable the closure cleaner (in the execution config).
> 
> 2) Let the CullTimeBase class implement java.io.Serializable.
> 
> Please let us know how it turns out!
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <m.jungha...@mailbox.org> 
> wrote:
> Hi Giacomo,
> 
> I ran into the same issue. Seems to be coupled to the serialization mechanism 
> of UDFs. I solved it by letting the base class implement the UDF interface 
> (e.g. FlatMapFunction) and in addition make it generic (which should be 
> necessary in your example).
> 
> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, 
> OUT> {
> // ...
> }
> 
> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, 
> RainFallPOJO> {
> // ...
> }
> 
> This should work.
> 
> Best,
> Martin
> 
> 
> On 16.09.2015 10:41, Giacomo Licari wrote:
>> Hi guys,
>> I'm trying to create a base class which is inherited from classes 
>> implementing FlatMap method on specific POJO types.
>> 
>> It seems inheritance doesn't work, I can access this.PropertyName or 
>> super.PropertyName from flatMap method but values are always null. 
>> 
>> Here the derived class, using RainfallPOJO:
>> 
>> public class CullTimeRainfall extends CullTimeBase implements 
>> FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>> 
>>      public CullTimeRainfall(int num, int den, String time_data_name, String 
>> start_time, String end_time, int interval, String time_unit){
>>              super(num, den, time_data_name, start_time, end_time, interval, 
>> time_unit);                                     
>>      }       
>> 
>>      public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) 
>> throws Exception {  
>>              DateFormat formatter = new 
>> SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>>              try {
>>                  Date time = formatter.parse( obj.getTime().replaceAll( 
>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>              if(time.after(this.startTime) && time.before(this.endTime)){    
>>                 
>>                              coll.collect(obj);
>>                      }                   
>>              } catch(Exception e){
>>                      e.printStackTrace();
>>              }       
>>      }
>>      
>> }
>> 
>> My Base class is:
>> 
>> public class CullTimeBase {
>> 
>>     protected int numerator;
>>     protected int denominator;
>>     protected String timeDataName;   
>>     protected Date startTime;
>>     protected Date endTime;
>>     protected int interval;
>>     protected String timeUnit;
>>      
>>      public CullTimeBase(int num, int den, String time_data_name, String 
>> start_time, String end_time, int interv, String time_unit){
>>              numerator = num;
>>              denominator = den;
>>              timeDataName = time_data_name;          
>>              interval = interv;
>>              timeUnit = time_unit;
>>              DateFormat formatter = new 
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");                               
>>                 
>>              try {
>>                      startTime = formatter.parse(start_time);
>>                      endTime = formatter.parse(end_time);
>>              } catch (ParseException e) {
>>                      e.printStackTrace();
>>              }                               
>>      }
>> 
>> It works only if I declare all variables and methods in only one class, but 
>> so I should repeat the same properties in more classes. I would only 
>> specialize each derived class with a custom flatMap method. which uses a 
>> custom POJO type.
>> 
>> Thanks a lot,
>> Giacomo
> 
> 
> 
> 
> 



Reply via email to