Thanks Max and Till for the answers. However I still didn't understand fully the difference...Here are my doubts:
- If I don't register any of my POJO classes, they will be serialized with Kryo (black box for Flink) - If I register all of my POJO using env.registerType they will be serialized as POJO (which is slower than Tuple serialization but much faster than Kryo) - What if I call env.registerTypeWithKryoSerializer()? Why should I specify a serializer for Kryo? Best, Flavio On Tue, Feb 23, 2016 at 4:08 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Registering a data type is only relevant for the Kryo serializer or if you > want to serialize a subclass of a POJO. Registering has the advantage that > you assign an id to the class which is written instead of the full class > name. The latter is usually much longer than the id. > > Cheers, > Till > > On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels <m...@apache.org> > wrote: > >> Hi Flavio, >> >> I think the point is that Flink can use its serialization tools if you >> register the class in advance. If you don't do that, it will use Kryo >> as a fall-back which is slightly less efficient. >> >> Equals and hash code have to be implemented correctly if you compare >> Pojos. For standard types like String or Integer, this is done >> automatically. For Pojos, Flink doesn't know whether it is implemented >> correctly or not. Every object in Java has a default equals and >> hashCode implementation. >> >> Cheers, >> Max >> >> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier >> <pomperma...@okkam.it> wrote: >> > Hi Max, >> > why do I need to register them? My job runs without problem also without >> > that. >> > The only problem with my POJOs was that I had to implement equals and >> hash >> > correctly, Flink didn't enforce me to do it but then results were wrong >> :( >> > >> > >> > >> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <m...@apache.org> >> wrote: >> >> >> >> Hi Flavio, >> >> >> >> Stephan was referring to >> >> >> >> env.registerType(ExtendedClass1.class); >> >> env.registerType(ExtendedClass2.class); >> >> >> >> Cheers, >> >> Max >> >> >> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier >> >> <pomperma...@okkam.it> wrote: >> >> > What do you mean exactly..? Probably I'm missing something >> >> > here..remember >> >> > that I can specify the right subClass only after the last flatMap, >> after >> >> > the >> >> > first map neither me nor Flink can know the exact subclass of >> BaseClass >> >> > >> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org> >> wrote: >> >> >> >> >> >> Class hierarchies should definitely work, even if the base class >> has no >> >> >> fields. >> >> >> >> >> >> They work more efficiently if you register the subclasses at the >> >> >> execution >> >> >> environment (Flink cannot infer them from the function signatures >> >> >> because >> >> >> the function signatures only contain the abstract base class). >> >> >> >> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier >> >> >> <pomperma...@okkam.it> wrote: >> >> >>> >> >> >>> Because The classes are not related to each other. Do you think >> it's a >> >> >>> good idea to have something like this? >> >> >>> >> >> >>> abstract class BaseClass(){ >> >> >>> String someField; >> >> >>> } >> >> >>> >> >> >>> class ExtendedClass1 extends BaseClass (){ >> >> >>> String someOtherField11; >> >> >>> String someOtherField12; >> >> >>> String someOtherField13; >> >> >>> ... >> >> >>> } >> >> >>> >> >> >>> class ExtendedClass2 extends BaseClass (){ >> >> >>> Integer someOtherField21; >> >> >>> Double someOtherField22; >> >> >>> Integer someOtherField23; >> >> >>> ... >> >> >>> } >> >> >>> >> >> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a >> >> >>> flatMap that can be used to generated the specific datasets? >> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different >> to >> >> >>> each other..maybe this can cause problems with the plan >> >> >>> generation..isn't >> >> >>> it? >> >> >>> >> >> >>> Thanks Fabian and Stephan for the support! >> >> >>> >> >> >>> >> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org> >> >> >>> wrote: >> >> >>>> >> >> >>>> Why not use an abstract base class and N subclasses? >> >> >>>> >> >> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske < >> fhue...@gmail.com> >> >> >>>> wrote: >> >> >>>>> >> >> >>>>> Unfortunately, there is no Either<1,...,n>. >> >> >>>>> You could implement something like a Tuple3<Option<Type1>, >> >> >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an >> >> >>>>> Option >> >> >>>>> type (comes with Java8). You would need to implement it yourself >> >> >>>>> incl. >> >> >>>>> TypeInfo and Serializer. You can get some inspiration from the >> >> >>>>> Either type >> >> >>>>> info /serializer, if you want to go this way. >> >> >>>>> >> >> >>>>> Using a byte array would also work but doesn't look much easier >> than >> >> >>>>> the Option approach to me. >> >> >>>>> >> >> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier < >> pomperma...@okkam.it>: >> >> >>>>>> >> >> >>>>>> Yes, the intermediate dataset I create then join again between >> >> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that >> possible to >> >> >>>>>> add? >> >> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> >> and in >> >> >>>>>> the subsequent filter+map/flatMap deserialize only those >> elements I >> >> >>>>>> want to >> >> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate >> >> >>>>>> the typed >> >> >>>>>> dataset based. >> >> >>>>>> Which one do you think is the best solution? >> >> >>>>>> >> >> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske < >> fhue...@gmail.com> >> >> >>>>>> wrote: >> >> >>>>>>> >> >> >>>>>>> Hi Flavio, >> >> >>>>>>> >> >> >>>>>>> I did not completely understand which objects should go where, >> but >> >> >>>>>>> here are some general guidelines: >> >> >>>>>>> >> >> >>>>>>> - early filtering is mostly a good idea (unless evaluating the >> >> >>>>>>> filter >> >> >>>>>>> expression is very expensive) >> >> >>>>>>> - you can use a flatMap function to combine a map and a filter >> >> >>>>>>> - applying multiple functions on the same data set does not >> >> >>>>>>> necessarily materialize the data set (in memory or on disk). In >> >> >>>>>>> most cases >> >> >>>>>>> it prevents chaining, hence there is serialization overhead. In >> >> >>>>>>> some cases >> >> >>>>>>> where the forked data streams are joined again, the data set >> must >> >> >>>>>>> be >> >> >>>>>>> materialized in order to avoid deadlocks. >> >> >>>>>>> - it is not possible to write a map that generates two >> different >> >> >>>>>>> types, but you could implement a mapper that returns an >> >> >>>>>>> Either<First, >> >> >>>>>>> Second> type. >> >> >>>>>>> >> >> >>>>>>> Hope this helps, >> >> >>>>>>> Fabian >> >> >>>>>>> >> >> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier >> >> >>>>>>> <pomperma...@okkam.it>: >> >> >>>>>>>> >> >> >>>>>>>> Any help on this? >> >> >>>>>>>> >> >> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" < >> pomperma...@okkam.it> >> >> >>>>>>>> wrote: >> >> >>>>>>>>> >> >> >>>>>>>>> Hi to all, >> >> >>>>>>>>> >> >> >>>>>>>>> in my program I have a Dataset that generated different >> types of >> >> >>>>>>>>> object wrt the incoming element. >> >> >>>>>>>>> Thus it's like a Map<Tuple2,Object>. >> >> >>>>>>>>> In order to type the different generated datasets I do >> >> >>>>>>>>> something: >> >> >>>>>>>>> >> >> >>>>>>>>> Dataset<Tuple2> start =... >> >> >>>>>>>>> >> >> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..); >> >> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..); >> >> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..); >> >> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..); >> >> >>>>>>>>> >> >> >>>>>>>>> However this is very inefficient (I think because Flink >> needs to >> >> >>>>>>>>> materialize the entire source dataset for every slot). >> >> >>>>>>>>> >> >> >>>>>>>>> It's much more efficient to group the generation of objects >> of >> >> >>>>>>>>> the >> >> >>>>>>>>> same type. E.g.: >> >> >>>>>>>>> >> >> >>>>>>>>> Dataset<Tuple2> start =.. >> >> >>>>>>>>> >> >> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..); >> >> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..); >> >> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter(); >> >> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter(); >> >> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter(); >> >> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter(); >> >> >>>>>>>>> >> >> >>>>>>>>> Increasing the number of slots per task manager make things >> >> >>>>>>>>> worse >> >> >>>>>>>>> and worse :) >> >> >>>>>>>>> Is there a way to improve this situation? Is it possible to >> >> >>>>>>>>> write a >> >> >>>>>>>>> "map" generating different type of object and then filter >> them >> >> >>>>>>>>> by generated >> >> >>>>>>>>> class type? >> >> >>>>>>>>> >> >> >>>>>>>>> Best, >> >> >>>>>>>>> Flavio >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>> >> >> >>>>>> >> >> >>>>>> >> >> >>>>> >> >> >>>> >> >> >>> >> >> >> >> >> > >> > >