Re: Can we use transform to exchange data between two streams

2020-02-23 Thread Guozhang Wang
I think multi-way stream-stream join, beyond table-table join would be a good to add. About joining with different keys, we have foreign-key join for KTables only at the moment (KIP-213), and maybe we can follow that direction as well. Also in your case, if you can manage to transform your stream-

Re: Can we use transform to exchange data between two streams

2020-02-23 Thread Sachin Mittal
Hi, KIP-150 is indeed cool, and I suppose it would be released as part of 2.5. I can see some use cases of the new api where one can avoid multiple aggregations. I believe in the same lines if we can introduce two more api's. 1. to join two streams having different keys. This would help in trying

Re: Can we use transform to exchange data between two streams

2020-02-22 Thread Guozhang Wang
Hi Sachin, You can check first by topology.describe() if there are indeed some repartition topics existing in your topology. If there's none, you may also want to check if the children operators of the split is in the order of (AA, AB, B): if the B sub-stream is the first child, then it may also t

Re: Can we use transform to exchange data between two streams

2020-02-21 Thread Sachin Mittal
Hi, So in my example what I notice is that records order is: [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] Stream is branched as: input .branch( (k, v) -> //filter records of type AA, (k, v) -> //filter records of type AB, (k, v) -> //filter records of type B, ) Topolo

Re: Can we use transform to exchange data between two streams

2020-02-21 Thread Guozhang Wang
>From the description it seems there's no repartition topics throughout your topology, in that case the ordering should be guaranteed. I.e. a record from the input topic would only be processed after all previous records from that same topics have been processed entirely; if there's repartition top

Re: Can we use transform to exchange data between two streams

2020-02-20 Thread Sachin Mittal
Hi, I wanted to understand if in this particular case my solution would work: Say I have source records [timestamp, (K,V)] in input topic in following order: .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ... I create multiple streams out of input stream as: input .branch( (k, v) -> //

Re: Can we use transform to exchange data between two streams

2020-02-20 Thread Guozhang Wang
Hello Sachin, 1) It seems from your source code, that in the stream2.transform you are generating a new value and return a new key-value pair: mutate value = enrich(value, result) return new KeyValue(key, value); --- Anyways, if you do not want to generate a new value object, and ju

Can we use transform to exchange data between two streams

2020-02-19 Thread Sachin Mittal
Hi, I have two streams and I want to enrich stream2 records based off stream1 records. I really cannot join those two streams is because there is no common key between them. Hence only way I can do that is using a timestamp field property. This is how I have built my pipeline. . //create and a