Hi,

if it just require implementing a custom operator(i mean does not require
changes to network stack or other engine level changes)  i can try to
implement it since i am working on optimizer and plan generation for a
month. Also  we are going to implement our etl framework on flink and this
kind of scenario is a good fit and a common requirement in etl like flows.
If you can help me which parts of the project I should look for , i can try
it.

Thanks
On May 12, 2016 6:54 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:

> Hi,
> I agree that this would be very nice. Unfortunately Flink does only allow
> one output from an operation right now. Maybe we can extends this somehow
> in the future.
>
> Cheers,
> Aljoscha
>
> On Thu, 12 May 2016 at 17:27 CPC <acha...@gmail.com> wrote:
>
> > Hi Gabor,
> >
> > Yes functionally this helps. But in this case i am processing an element
> > twice and sending  whole data to two different operator . What i am
> trying
> > to achieve is like datastream split  like functionality or a little bit
> > more:
> > In filter like scenario i want to do below pseudo operation:
> >
> > def function(iter: Iterator[URLOutputData], trueEvents:
> > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData],
> > errEvents:
> > >> Collector[URLOutputData]) {
> > >
> > >     iter.foreach {
> > >
> > >       i =>
> > >
> > >         try {
> > >
> > >           if (predicate(i))
> > >
> > >             trueEvents.collect(i)
> > >
> > >           else
> > >
> > >             falseEvents.collect(i)
> > >
> > >         } catch {
> > >
> > >           case _ => errEvents.collect(i)
> > >
> > >         }
> > >
> > >     }
> > >
> > >   }
> > >
> > >
> > Another case could be,suppose i have an input set of web events comes
> from
> > different web apps and i want to split dataset based on application
> > category
> >
> > Thanks,
> >
> >
> > On 12 May 2016 at 17:28, Gábor Gévay <gga...@gmail.com> wrote:
> >
> > > Hello,
> > >
> > > You can split a DataSet into two DataSets with two filters:
> > >
> > > val xs: DataSet[A] = ...
> > > val split1: DataSet[A] = xs.filter(f1)
> > > val split2: DataSet[A] = xs.filter(f2)
> > >
> > > where f1 and f2 are true for those elements that should go into the
> > > first and second DataSets respectively. So far, the splits will just
> > > contain elements from the input DataSet, but you can of course apply
> > > some map after one of the filters.
> > >
> > > Does this help?
> > >
> > > Best,
> > > Gábor
> > >
> > >
> > >
> > > 2016-05-12 16:03 GMT+02:00 CPC <acha...@gmail.com>:
> > > > Hi folks,
> > > >
> > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and
> > > > Dataset[B] ? Use case belongs to a custom filter component that we
> want
> > > to
> > > > implement. We will want to direct input elements whose result is
> false
> > > > after we apply the predicate. Actually we want to direct input
> elements
> > > > that throw exception to another output as well(demultiplexer like
> > > > component).
> > > >
> > > > Thank you in advance...
> > >
> >
>

Reply via email to