You can do a conditional branching by using KStream.branch(Predicate...).
You can then merge multiple streams using KStreamBuilder.merge(KStream...).

-Yasuhiro

On Mon, Jun 20, 2016 at 4:45 AM, Jeyhun Karimov <je.kari...@gmail.com>
wrote:

> Hi Guozhang,
>
> Thank you for your reply. Yes, it is correct. Your solution is match for my
> use case. I will try to use the topology you mentioned in a more dynamic
> way.
>
>
> Thanks
> Jeyhun
>
> On Mon, Jun 20, 2016 at 1:59 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Jeyhun,
> >
> > Another way to do this "dynamic routing" is to specify your topology
> using
> > the lower-level processor API:
> >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api
> >
> > More specifically, you can for example specify both A and D as parents
> of E
> > when adding processor E, and then in the processor A you can use the "
> > forward(K key, V value, String childName)" to pass the record to a
> specific
> > child (either B or E) by its processor name.
> >
> >
> > As for TelegraphCQ and its underlying query processor (i.e. the Eddy
> model
> > http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf), my understanding is
> > that it is conceptually any-to-any routable and the query processor will
> > try to schedule at a per-record granularity depending on the query
> > selectivity, etc. But this is not fully controllable by the users. Is
> that
> > correct?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 19, 2016 at 7:16 AM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for clarification. Still don't have an better answer as before.
> > >
> > > How much overhead my suggestion gives is hard to predict. However, the
> > > filter operators will run in the same thread (it's more or less just
> > > another chained method call), thus, it should not be too large.
> > > Furthermore, it should never the required to write tagged record to
> > > Kafka -- thus, it would only be some main memory overhead. But you
> would
> > > need to test and measure.
> > >
> > > -Matthias
> > >
> > > On 06/18/2016 08:13 PM, Jeyhun Karimov wrote:
> > > > Hi Matthias,
> > > >
> > > > Thank you for your answer. In my use-case, depending on statistics of
> > > every
> > > > operator, some tuples can be escaped for specific operators, so that
> we
> > > can
> > > > get approximate but faster result. I think this is somehow similar to
> > > >  TelegraphCQ in dynamism of operators.
> > > > In my case, the goal is getting rid of transmission and processing
> > > overhead
> > > > of some tuples for some operators (in runtime) to get approximate
> > > results.
> > > > However, it iseems the possible solution can bring extra overhead to
> > > system
> > > > in some cases.
> > > >
> > > > Jeyhun
> > > >
> > > > On Sat, Jun 18, 2016 at 7:36 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Hi Jeyhun,
> > > >>
> > > >> there is no support by the library itself. But you could build a
> > custom
> > > >> solution by building the DAG with all required edges (ie, additional
> > > >> edges from A->E, and B->sink etc.). For this, each output message
> > from A
> > > >> would be duplicate and send to B and E. Therefore, A should "tag"
> each
> > > >> message with the designated receiver (B or E) and you add additional
> > > >> filter step in both edges (ie, a filter between A->F1->B and
> > A->F2->E),
> > > >> that drop messages if the "tag" does not match the downstream
> > operator.
> > > >>
> > > >> Does this makes sense? Of course, depending on your use case, you
> > might
> > > >> get a huge number of edges (plus filters) and your DAG might be
> quite
> > > >> complex. Don't see any other solution though.
> > > >>
> > > >> Hope this helps.
> > > >>
> > > >> One question though: how would changing the DAG at runtime would
> help
> > > >> you? Do you mean you would dynamically change the edge between A->B
> > and
> > > >> A->sink ? I guess, this would be a very special pattern and I doubt
> > that
> > > >> any library or system can offer this.
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 06/18/2016 05:33 PM, Jeyhun Karimov wrote:
> > > >>> Hi community,
> > > >>>
> > > >>> Is there a way in Kafka Streams to change the order of operators in
> > > >>> runtime? For example, I have operators
> > > >>>
> > > >>> Source->A->B->C->D->E->Sink
> > > >>>
> > > >>> and I want to forward some tuples from A to E, from B to Sink and
> > etc.
> > > As
> > > >>> far as I know, the stream execution graph is computed in compile
> time
> > > and
> > > >>> does not change in runtime. Can there be an indirect solution for
> > this
> > > >>> specific case?
> > > >>>
> > > >>> Jeyhun
> > > >>>
> > > >>
> > > >> --
> > > > -Cheers
> > > >
> > > > Jeyhun
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
> --
> -Cheers
>
> Jeyhun
>

Reply via email to