Hi, if it just require implementing a custom operator(i mean does not require changes to network stack or other engine level changes) i can try to implement it since i am working on optimizer and plan generation for a month. Also we are going to implement our etl framework on flink and this kind of scenario is a good fit and a common requirement in etl like flows. If you can help me which parts of the project I should look for , i can try it.
Thanks On May 12, 2016 6:54 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote: > Hi, > I agree that this would be very nice. Unfortunately Flink does only allow > one output from an operation right now. Maybe we can extends this somehow > in the future. > > Cheers, > Aljoscha > > On Thu, 12 May 2016 at 17:27 CPC <acha...@gmail.com> wrote: > > > Hi Gabor, > > > > Yes functionally this helps. But in this case i am processing an element > > twice and sending whole data to two different operator . What i am > trying > > to achieve is like datastream split like functionality or a little bit > > more: > > In filter like scenario i want to do below pseudo operation: > > > > def function(iter: Iterator[URLOutputData], trueEvents: > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], > > errEvents: > > >> Collector[URLOutputData]) { > > > > > > iter.foreach { > > > > > > i => > > > > > > try { > > > > > > if (predicate(i)) > > > > > > trueEvents.collect(i) > > > > > > else > > > > > > falseEvents.collect(i) > > > > > > } catch { > > > > > > case _ => errEvents.collect(i) > > > > > > } > > > > > > } > > > > > > } > > > > > > > > Another case could be,suppose i have an input set of web events comes > from > > different web apps and i want to split dataset based on application > > category > > > > Thanks, > > > > > > On 12 May 2016 at 17:28, Gábor Gévay <gga...@gmail.com> wrote: > > > > > Hello, > > > > > > You can split a DataSet into two DataSets with two filters: > > > > > > val xs: DataSet[A] = ... > > > val split1: DataSet[A] = xs.filter(f1) > > > val split2: DataSet[A] = xs.filter(f2) > > > > > > where f1 and f2 are true for those elements that should go into the > > > first and second DataSets respectively. So far, the splits will just > > > contain elements from the input DataSet, but you can of course apply > > > some map after one of the filters. > > > > > > Does this help? > > > > > > Best, > > > Gábor > > > > > > > > > > > > 2016-05-12 16:03 GMT+02:00 CPC <acha...@gmail.com>: > > > > Hi folks, > > > > > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > > > Dataset[B] ? Use case belongs to a custom filter component that we > want > > > to > > > > implement. We will want to direct input elements whose result is > false > > > > after we apply the predicate. Actually we want to direct input > elements > > > > that throw exception to another output as well(demultiplexer like > > > > component). > > > > > > > > Thank you in advance... > > > > > >