Hi Max, why do I need to register them? My job runs without problem also without that. The only problem with my POJOs was that I had to implement equals and hash correctly, Flink didn't enforce me to do it but then results were wrong :(
On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <m...@apache.org> wrote: > Hi Flavio, > > Stephan was referring to > > env.registerType(ExtendedClass1.class); > env.registerType(ExtendedClass2.class); > > Cheers, > Max > > On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier > <pomperma...@okkam.it> wrote: > > What do you mean exactly..? Probably I'm missing something here..remember > > that I can specify the right subClass only after the last flatMap, after > the > > first map neither me nor Flink can know the exact subclass of BaseClass > > > > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org> wrote: > >> > >> Class hierarchies should definitely work, even if the base class has no > >> fields. > >> > >> They work more efficiently if you register the subclasses at the > execution > >> environment (Flink cannot infer them from the function signatures > because > >> the function signatures only contain the abstract base class). > >> > >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier > >> <pomperma...@okkam.it> wrote: > >>> > >>> Because The classes are not related to each other. Do you think it's a > >>> good idea to have something like this? > >>> > >>> abstract class BaseClass(){ > >>> String someField; > >>> } > >>> > >>> class ExtendedClass1 extends BaseClass (){ > >>> String someOtherField11; > >>> String someOtherField12; > >>> String someOtherField13; > >>> ... > >>> } > >>> > >>> class ExtendedClass2 extends BaseClass (){ > >>> Integer someOtherField21; > >>> Double someOtherField22; > >>> Integer someOtherField23; > >>> ... > >>> } > >>> > >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a > >>> flatMap that can be used to generated the specific datasets? > >>> Doesn't this cause problem to Flink? Classes can be vrry different to > >>> each other..maybe this can cause problems with the plan > generation..isn't > >>> it? > >>> > >>> Thanks Fabian and Stephan for the support! > >>> > >>> > >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org> > wrote: > >>>> > >>>> Why not use an abstract base class and N subclasses? > >>>> > >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fhue...@gmail.com> > >>>> wrote: > >>>>> > >>>>> Unfortunately, there is no Either<1,...,n>. > >>>>> You could implement something like a Tuple3<Option<Type1>, > >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an > Option > >>>>> type (comes with Java8). You would need to implement it yourself > incl. > >>>>> TypeInfo and Serializer. You can get some inspiration from the > Either type > >>>>> info /serializer, if you want to go this way. > >>>>> > >>>>> Using a byte array would also work but doesn't look much easier than > >>>>> the Option approach to me. > >>>>> > >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: > >>>>>> > >>>>>> Yes, the intermediate dataset I create then join again between > >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to > add? > >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in > >>>>>> the subsequent filter+map/flatMap deserialize only those elements I > want to > >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate > the typed > >>>>>> dataset based. > >>>>>> Which one do you think is the best solution? > >>>>>> > >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhue...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hi Flavio, > >>>>>>> > >>>>>>> I did not completely understand which objects should go where, but > >>>>>>> here are some general guidelines: > >>>>>>> > >>>>>>> - early filtering is mostly a good idea (unless evaluating the > filter > >>>>>>> expression is very expensive) > >>>>>>> - you can use a flatMap function to combine a map and a filter > >>>>>>> - applying multiple functions on the same data set does not > >>>>>>> necessarily materialize the data set (in memory or on disk). In > most cases > >>>>>>> it prevents chaining, hence there is serialization overhead. In > some cases > >>>>>>> where the forked data streams are joined again, the data set must > be > >>>>>>> materialized in order to avoid deadlocks. > >>>>>>> - it is not possible to write a map that generates two different > >>>>>>> types, but you could implement a mapper that returns an > Either<First, > >>>>>>> Second> type. > >>>>>>> > >>>>>>> Hope this helps, > >>>>>>> Fabian > >>>>>>> > >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it > >: > >>>>>>>> > >>>>>>>> Any help on this? > >>>>>>>> > >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pomperma...@okkam.it> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi to all, > >>>>>>>>> > >>>>>>>>> in my program I have a Dataset that generated different types of > >>>>>>>>> object wrt the incoming element. > >>>>>>>>> Thus it's like a Map<Tuple2,Object>. > >>>>>>>>> In order to type the different generated datasets I do something: > >>>>>>>>> > >>>>>>>>> Dataset<Tuple2> start =... > >>>>>>>>> > >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..); > >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..); > >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..); > >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..); > >>>>>>>>> > >>>>>>>>> However this is very inefficient (I think because Flink needs to > >>>>>>>>> materialize the entire source dataset for every slot). > >>>>>>>>> > >>>>>>>>> It's much more efficient to group the generation of objects of > the > >>>>>>>>> same type. E.g.: > >>>>>>>>> > >>>>>>>>> Dataset<Tuple2> start =.. > >>>>>>>>> > >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..); > >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..); > >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter(); > >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter(); > >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter(); > >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter(); > >>>>>>>>> > >>>>>>>>> Increasing the number of slots per task manager make things worse > >>>>>>>>> and worse :) > >>>>>>>>> Is there a way to improve this situation? Is it possible to > write a > >>>>>>>>> "map" generating different type of object and then filter them > by generated > >>>>>>>>> class type? > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Flavio > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >