You can do a conditional branching by using KStream.branch(Predicate...). You can then merge multiple streams using KStreamBuilder.merge(KStream...).
-Yasuhiro On Mon, Jun 20, 2016 at 4:45 AM, Jeyhun Karimov <je.kari...@gmail.com> wrote: > Hi Guozhang, > > Thank you for your reply. Yes, it is correct. Your solution is match for my > use case. I will try to use the topology you mentioned in a more dynamic > way. > > > Thanks > Jeyhun > > On Mon, Jun 20, 2016 at 1:59 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Jeyhun, > > > > Another way to do this "dynamic routing" is to specify your topology > using > > the lower-level processor API: > > > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api > > > > More specifically, you can for example specify both A and D as parents > of E > > when adding processor E, and then in the processor A you can use the " > > forward(K key, V value, String childName)" to pass the record to a > specific > > child (either B or E) by its processor name. > > > > > > As for TelegraphCQ and its underlying query processor (i.e. the Eddy > model > > http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf), my understanding is > > that it is conceptually any-to-any routable and the query processor will > > try to schedule at a per-record granularity depending on the query > > selectivity, etc. But this is not fully controllable by the users. Is > that > > correct? > > > > > > Guozhang > > > > > > On Sun, Jun 19, 2016 at 7:16 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Thanks for clarification. Still don't have an better answer as before. > > > > > > How much overhead my suggestion gives is hard to predict. However, the > > > filter operators will run in the same thread (it's more or less just > > > another chained method call), thus, it should not be too large. > > > Furthermore, it should never the required to write tagged record to > > > Kafka -- thus, it would only be some main memory overhead. But you > would > > > need to test and measure. > > > > > > -Matthias > > > > > > On 06/18/2016 08:13 PM, Jeyhun Karimov wrote: > > > > Hi Matthias, > > > > > > > > Thank you for your answer. In my use-case, depending on statistics of > > > every > > > > operator, some tuples can be escaped for specific operators, so that > we > > > can > > > > get approximate but faster result. I think this is somehow similar to > > > > TelegraphCQ in dynamism of operators. > > > > In my case, the goal is getting rid of transmission and processing > > > overhead > > > > of some tuples for some operators (in runtime) to get approximate > > > results. > > > > However, it iseems the possible solution can bring extra overhead to > > > system > > > > in some cases. > > > > > > > > Jeyhun > > > > > > > > On Sat, Jun 18, 2016 at 7:36 PM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > >> Hi Jeyhun, > > > >> > > > >> there is no support by the library itself. But you could build a > > custom > > > >> solution by building the DAG with all required edges (ie, additional > > > >> edges from A->E, and B->sink etc.). For this, each output message > > from A > > > >> would be duplicate and send to B and E. Therefore, A should "tag" > each > > > >> message with the designated receiver (B or E) and you add additional > > > >> filter step in both edges (ie, a filter between A->F1->B and > > A->F2->E), > > > >> that drop messages if the "tag" does not match the downstream > > operator. > > > >> > > > >> Does this makes sense? Of course, depending on your use case, you > > might > > > >> get a huge number of edges (plus filters) and your DAG might be > quite > > > >> complex. Don't see any other solution though. > > > >> > > > >> Hope this helps. > > > >> > > > >> One question though: how would changing the DAG at runtime would > help > > > >> you? Do you mean you would dynamically change the edge between A->B > > and > > > >> A->sink ? I guess, this would be a very special pattern and I doubt > > that > > > >> any library or system can offer this. > > > >> > > > >> -Matthias > > > >> > > > >> On 06/18/2016 05:33 PM, Jeyhun Karimov wrote: > > > >>> Hi community, > > > >>> > > > >>> Is there a way in Kafka Streams to change the order of operators in > > > >>> runtime? For example, I have operators > > > >>> > > > >>> Source->A->B->C->D->E->Sink > > > >>> > > > >>> and I want to forward some tuples from A to E, from B to Sink and > > etc. > > > As > > > >>> far as I know, the stream execution graph is computed in compile > time > > > and > > > >>> does not change in runtime. Can there be an indirect solution for > > this > > > >>> specific case? > > > >>> > > > >>> Jeyhun > > > >>> > > > >> > > > >> -- > > > > -Cheers > > > > > > > > Jeyhun > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- > -Cheers > > Jeyhun >