I would like to add that if your predicate does some heavy-weight computation that you want to avoid duplicating for the filters, then you can insert a map before the filters, where you evaluate the predicate and put the result into a field.
Best, Gabor 2016-05-13 11:51 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi, > > it is true that Gabor's approach of using two filters has a certain > overhead. > However, the overhead should be reasonable. The data stays on the same node > and the filter can be very lightweight. > > I agree that this is not a very nice solution. > However, modifying the DataSet API such that an operator can have more than > one output would be a very large change. It would require rewriting large > portions of the optimizer and job generation. The assumption of a single > output is made in many places which are not always easy to spot. To be > honest, I don't think this is possible with reasonable effort. Even if it > was possible, the change would be so large that somebody would need to > spend a lot of time reviewing the changes. > > I am sorry, this limitation cannot be easily resolved. > > Fabian > > > 2016-05-12 19:39 GMT+02:00 CPC <acha...@gmail.com>: > >> Hi, >> >> if it just require implementing a custom operator(i mean does not require >> changes to network stack or other engine level changes) i can try to >> implement it since i am working on optimizer and plan generation for a >> month. Also we are going to implement our etl framework on flink and this >> kind of scenario is a good fit and a common requirement in etl like flows. >> If you can help me which parts of the project I should look for , i can try >> it. >> >> Thanks >> On May 12, 2016 6:54 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote: >> >> > Hi, >> > I agree that this would be very nice. Unfortunately Flink does only allow >> > one output from an operation right now. Maybe we can extends this somehow >> > in the future. >> > >> > Cheers, >> > Aljoscha >> > >> > On Thu, 12 May 2016 at 17:27 CPC <acha...@gmail.com> wrote: >> > >> > > Hi Gabor, >> > > >> > > Yes functionally this helps. But in this case i am processing an >> element >> > > twice and sending whole data to two different operator . What i am >> > trying >> > > to achieve is like datastream split like functionality or a little bit >> > > more: >> > > In filter like scenario i want to do below pseudo operation: >> > > >> > > def function(iter: Iterator[URLOutputData], trueEvents: >> > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], >> > > errEvents: >> > > >> Collector[URLOutputData]) { >> > > > >> > > > iter.foreach { >> > > > >> > > > i => >> > > > >> > > > try { >> > > > >> > > > if (predicate(i)) >> > > > >> > > > trueEvents.collect(i) >> > > > >> > > > else >> > > > >> > > > falseEvents.collect(i) >> > > > >> > > > } catch { >> > > > >> > > > case _ => errEvents.collect(i) >> > > > >> > > > } >> > > > >> > > > } >> > > > >> > > > } >> > > > >> > > > >> > > Another case could be,suppose i have an input set of web events comes >> > from >> > > different web apps and i want to split dataset based on application >> > > category >> > > >> > > Thanks, >> > > >> > > >> > > On 12 May 2016 at 17:28, Gábor Gévay <gga...@gmail.com> wrote: >> > > >> > > > Hello, >> > > > >> > > > You can split a DataSet into two DataSets with two filters: >> > > > >> > > > val xs: DataSet[A] = ... >> > > > val split1: DataSet[A] = xs.filter(f1) >> > > > val split2: DataSet[A] = xs.filter(f2) >> > > > >> > > > where f1 and f2 are true for those elements that should go into the >> > > > first and second DataSets respectively. So far, the splits will just >> > > > contain elements from the input DataSet, but you can of course apply >> > > > some map after one of the filters. >> > > > >> > > > Does this help? >> > > > >> > > > Best, >> > > > Gábor >> > > > >> > > > >> > > > >> > > > 2016-05-12 16:03 GMT+02:00 CPC <acha...@gmail.com>: >> > > > > Hi folks, >> > > > > >> > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] >> and >> > > > > Dataset[B] ? Use case belongs to a custom filter component that we >> > want >> > > > to >> > > > > implement. We will want to direct input elements whose result is >> > false >> > > > > after we apply the predicate. Actually we want to direct input >> > elements >> > > > > that throw exception to another output as well(demultiplexer like >> > > > > component). >> > > > > >> > > > > Thank you in advance... >> > > > >> > > >> > >>