I run it only implementing java.io.Serializable without disabling the
closure cleaner.

Another question I have is about POJO classes.
I would also create a base POJO class with some common proprerties, and
then extend it in new classes. These classes are used to convert a CSV into
a dataset of POJO objects (of derived class type).

In the following example, I create a DataSet of TwitterPOJO, which extends
a Base class, adding the new property "tweet".

DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");

I obtain this error:
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
com.Flink.POJO.TwitterPOJO is not a valid POJO type
Exception in thread "main" java.lang.ClassCastException:
org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.PojoTypeInfo

Greetings,
G.L.

On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:

> Could you also try the other variant (disabeling the closure cleaner)? I
> would be curious if this behavior is expected Java Serialization behavior,
> or whether our pre-processing code is causing it.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <giacomo.lic...@gmail.com>
> wrote:
>
>> Thank you Martin and Stephan for your help.
>> I tried directly to implement java.io.Serializable in Base class and it
>> worked perfectly!
>>
>> Now I can develop more flexible and maintainable code. Thank you a lot
>> guys.
>>
>> Greetings,
>> Giacomo
>>
>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> Interesting case. We use plain Java Serialization to distribute UDFs,
>>> and perform additional "cleaning" of scopes, which may be causing the issue.
>>>
>>> Can you try the following to see if any of those resolves the problem?
>>>
>>> 1) On the environment, disable the closure cleaner (in the execution
>>> config).
>>>
>>> 2) Let the CullTimeBase class implement java.io.Serializable.
>>>
>>> Please let us know how it turns out!
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <
>>> m.jungha...@mailbox.org> wrote:
>>>
>>>> Hi Giacomo,
>>>>
>>>> I ran into the same issue. Seems to be coupled to the serialization
>>>> mechanism of UDFs. I solved it by letting the base class implement the UDF
>>>> interface (e.g. FlatMapFunction) and in addition make it generic (which
>>>> should be necessary in your example).
>>>>
>>>> public [abstract] class CullTimeBase<IN, OUT> implements
>>>> FlatMapFunction<IN, OUT> {
>>>> // ...
>>>> }
>>>>
>>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO,
>>>> RainFallPOJO> {
>>>> // ...
>>>> }
>>>>
>>>> This should work.
>>>>
>>>> Best,
>>>> Martin
>>>>
>>>>
>>>> On 16.09.2015 10:41, Giacomo Licari wrote:
>>>>
>>>> Hi guys,
>>>> I'm trying to create a base class which is inherited from classes
>>>> implementing FlatMap method on specific POJO types.
>>>>
>>>> It seems inheritance doesn't work, I can access this.PropertyName or
>>>> super.PropertyName from flatMap method but values are always null.
>>>>
>>>> Here the derived class, using RainfallPOJO:
>>>>
>>>> public class CullTimeRainfall extends CullTimeBase implements
>>>> FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>>>>
>>>> public CullTimeRainfall(int num, int den, String time_data_name, String
>>>> start_time, String end_time, int interval, String time_unit){
>>>> super(num, den, time_data_name, start_time, end_time, interval,
>>>> time_unit);
>>>> }
>>>>
>>>> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll)
>>>> throws Exception {
>>>> DateFormat formatter = new
>>>> SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>>>> try {
>>>>    Date time = formatter.parse( obj.getTime().replaceAll(
>>>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>>>        if(time.after(this.startTime) && time.before(this.endTime)){
>>>> coll.collect(obj);
>>>> }
>>>> } catch(Exception e){
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>> }
>>>>
>>>> My Base class is:
>>>>
>>>> public class CullTimeBase {
>>>>
>>>>   protected int numerator;
>>>>     protected int denominator;
>>>>     protected String timeDataName;
>>>>     protected Date startTime;
>>>>     protected Date endTime;
>>>>     protected int interval;
>>>>     protected String timeUnit;
>>>> public CullTimeBase(int num, int den, String time_data_name, String
>>>> start_time, String end_time, int interv, String time_unit){
>>>> numerator = num;
>>>> denominator = den;
>>>> timeDataName = time_data_name;
>>>> interval = interv;
>>>> timeUnit = time_unit;
>>>> DateFormat formatter = new
>>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
>>>> try {
>>>> startTime = formatter.parse(start_time);
>>>> endTime = formatter.parse(end_time);
>>>> } catch (ParseException e) {
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> It works only if I declare all variables and methods in only one class,
>>>> but so I should repeat the same properties in more classes. I would only
>>>> specialize each derived class with a custom flatMap method. which uses a
>>>> custom POJO type.
>>>>
>>>> Thanks a lot,
>>>> Giacomo
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to