Yes, the intermediate dataset I create then join again between themselves. What I'd need is a Either<1,...,n>. Is that possible to add? Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the subsequent filter+map/flatMap deserialize only those elements I want to group togheter (e.g. t.f0=="someEventType") in order to generate the typed dataset based. Which one do you think is the best solution?
On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Flavio, > > I did not completely understand which objects should go where, but here > are some general guidelines: > > - early filtering is mostly a good idea (unless evaluating the filter > expression is very expensive) > - you can use a flatMap function to combine a map and a filter > - applying multiple functions on the same data set does not necessarily > materialize the data set (in memory or on disk). In most cases it prevents > chaining, hence there is serialization overhead. In some cases where the > forked data streams are joined again, the data set must be materialized in > order to avoid deadlocks. > - it is not possible to write a map that generates two different types, > but you could implement a mapper that returns an Either<First, Second> type. > > Hope this helps, > Fabian > > 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Any help on this? >> On 9 Feb 2016 18:03, "Flavio Pompermaier" <pomperma...@okkam.it> wrote: >> >>> Hi to all, >>> >>> in my program I have a Dataset that generated different types of object >>> wrt the incoming element. >>> Thus it's like a Map<Tuple2,Object>. >>> In order to type the different generated datasets I do something: >>> >>> Dataset<Tuple2> start =... >>> >>> Dataset<MyObj1> ds1 = start.filter().map(..); >>> Dataset<MyObj1> ds2 = start.filter().map(..); >>> Dataset<MyObj3> ds3 = start.filter().map(..); >>> Dataset<MyObj3> ds4 = start.filter().map(..); >>> >>> However this is very inefficient (I think because Flink needs to >>> materialize the entire source dataset for every slot). >>> >>> It's much more efficient to group the generation of objects of the same >>> type. E.g.: >>> >>> Dataset<Tuple2> start =.. >>> >>> Dataset<MyObj1> tmp1 = start.map(..); >>> Dataset<MyObj3> tmp2 = start.map(..); >>> Dataset<MyObj1> ds1 = tmp1.filter(); >>> Dataset<MyObj1> ds2 = tmp1.filter(); >>> Dataset<MyObj3> ds3 = tmp2.filter(); >>> Dataset<MyObj3> ds4 = tmp2.filter(); >>> >>> Increasing the number of slots per task manager make things worse and >>> worse :) >>> Is there a way to improve this situation? Is it possible to write a >>> "map" generating different type of object and then filter them by generated >>> class type? >>> >>> Best, >>> Flavio >>> >>> >>> >>> >>> >>> >>> >