Hi Gabor, Yes functionally this helps. But in this case i am processing an element twice and sending whole data to two different operator . What i am trying to achieve is like datastream split like functionality or a little bit more: In filter like scenario i want to do below pseudo operation:
def function(iter: Iterator[URLOutputData], trueEvents: >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], errEvents: >> Collector[URLOutputData]) { > > iter.foreach { > > i => > > try { > > if (predicate(i)) > > trueEvents.collect(i) > > else > > falseEvents.collect(i) > > } catch { > > case _ => errEvents.collect(i) > > } > > } > > } > > Another case could be,suppose i have an input set of web events comes from different web apps and i want to split dataset based on application category Thanks, On 12 May 2016 at 17:28, Gábor Gévay <gga...@gmail.com> wrote: > Hello, > > You can split a DataSet into two DataSets with two filters: > > val xs: DataSet[A] = ... > val split1: DataSet[A] = xs.filter(f1) > val split2: DataSet[A] = xs.filter(f2) > > where f1 and f2 are true for those elements that should go into the > first and second DataSets respectively. So far, the splits will just > contain elements from the input DataSet, but you can of course apply > some map after one of the filters. > > Does this help? > > Best, > Gábor > > > > 2016-05-12 16:03 GMT+02:00 CPC <acha...@gmail.com>: > > Hi folks, > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > Dataset[B] ? Use case belongs to a custom filter component that we want > to > > implement. We will want to direct input elements whose result is false > > after we apply the predicate. Actually we want to direct input elements > > that throw exception to another output as well(demultiplexer like > > component). > > > > Thank you in advance... >