I think multi-way stream-stream join, beyond table-table join would be a good to add. About joining with different keys, we have foreign-key join for KTables only at the moment (KIP-213), and maybe we can follow that direction as well.
Also in your case, if you can manage to transform your stream-stream join to table-table join maybe the existing API would be more appealing to apply :) Guozhang On Sun, Feb 23, 2020 at 8:29 AM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > KIP-150 is indeed cool, and I suppose it would be released as part of 2.5. > I can see some use cases of the new api where one can avoid multiple > aggregations. > > I believe in the same lines if we can introduce two more api's. > 1. to join two streams having different keys. This would help in trying to > avoid transforming the streams to matchable keys. > It can be something like: > <KA,VA>stream > . join( > KStream<KB,VB> otherStream, > (ka, kb) -> //return true/false (true means it can be joined) > (va, vb) -> ... > ) > So basically we introduce a key joiner which will compare two different > keys and return true/value to indicate if their values can be joined or > not. > The resultant stream can be of type (KA, VA+VB) > So here we have basically enriched streamA with values of streamB using > simple join. > > 2. In lines of KIP-150 an api to join multiple streams something like: > <K,VA>stream > . join( > KStream<K,VB> otherStream1, > KStream<K,VC> otherStream2, > ... > (va, vb, vc, ...) -> ... > ) > This way we can join multiple streams and whose resultant stream if of type > (K, VA+VB+VC...) > > Let me know if something like this can be useful. > > Thanks > Sachin > > > > On Sun, Feb 23, 2020 at 5:40 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Sachin, > > > > You can check first by topology.describe() if there are indeed some > > repartition topics existing in your topology. If there's none, you may > also > > want to check if the children operators of the split is in the order of > > (AA, AB, B): if the B sub-stream is the first child, then it may also > > trigger fetch before the join. > > > > If that's the case, I'd suggest maybe you can consider some multi-way > join > > operations similar to "co-group" (KIP-150): I mean, you can implemented > > something like that, not exactly that than relying on the existing > in-built > > operators, because what you've described here really sounds like a > > three-way join to me except for streamB it is not joined by key but by > time > > window. > > > > Guozhang > > > > > > > > On Fri, Feb 21, 2020 at 8:31 PM Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > Hi, > > > So in my example what I notice is that records order is: > > > [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] > > > Stream is branched as: > > > input > > > .branch( > > > (k, v) -> //filter records of type AA, > > > (k, v) -> //filter records of type AB, > > > (k, v) -> //filter records of type B, > > > ) > > > Topology is > > > streamAA.join(streamAB).process(/*put joined value to state store*/) > > > streamB.transformValues(/*fetch joined value from state store and > enrich > > > the value*/) > > > > > > Once I see the logs I see the following order: > > > (KA, AA1) > > > (KA, AB1) > > > fetch for joined data > > > (KB, B1) > > > join [ (KA, AA1), (KA, AB1) ] > > > put joined record > > > > > > Clearly it is trying to fetch data before it is made available to the > > state > > > store and that would be because join operation happens after the > streamB > > > record is read. > > > As you mentioned that if there is re-partition then it may not hold. > > > I wanted to know when does kafka automatically re-partition the topics. > > > Just for simplicity in the example, I had omitted lot of steps like: > > > After branching I actually re-key each stream using map, and then > apply a > > > transformer and again re-key them. > > > > > > If this creates re-partition of these streams and order may not hold, > > then > > > what may be a good way to ensure streamB records are enriched with the > > > joined data of streamAA and streamAB. > > > > > > Thanks > > > Sachin > > > > > > > > > > > > On Sat, Feb 22, 2020 at 2:33 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > From the description it seems there's no repartition topics > throughout > > > your > > > > topology, in that case the ordering should be guaranteed. I.e. a > record > > > > from the input topic would only be processed after all previous > records > > > > from that same topics have been processed entirely; if there's > > > repartition > > > > topics in between though, this may not hold. > > > > > > > > Also a minor thing that if your streamAA.join(streamBB) results are > > only > > > > needed for populating the store, you can use `process(..)` instead of > > > > `transform(..)` as it would not need a return value since it would be > > the > > > > end of this branch of the topology. > > > > > > > > > > > > Guozhang > > > > > > > > On Thu, Feb 20, 2020 at 7:32 PM Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > > > > > Hi, > > > > > I wanted to understand if in this particular case my solution would > > > work: > > > > > Say I have source records [timestamp, (K,V)] in input topic in > > > following > > > > > order: > > > > > .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ... > > > > > > > > > > I create multiple streams out of input stream as: > > > > > input > > > > > .branch( > > > > > (k, v) -> //filter records of type AA, > > > > > (k, v) -> //filter records of type AB, > > > > > (k, v) -> //filter records of type B, > > > > > ) > > > > > Then my topology is written in following way: > > > > > //join stream of type AA with AB and push its value in state store > > > > > streamAA.join(streamAB).transform( new TransformerSupplier() { > > > > transform(K > > > > > key, V value) { > > > > > // timestamp = extract timestamp from (key, value) > > > > > enrichMessageStore.put(key , value, timestamp); > > > > > return new KeyValue(key, value); > > > > > } } , "enrich-message-store") > > > > > //fetch the data from that state store and enrich streamB > > > > > streamB.transform(new TransformerSupplier() { transform(K key, V > > > value) { > > > > > // (from, to) = extract (from, to) from (key, > value) > > > > > result = enrichMessageStore.fetchAll( from, to ); > > > > > //mutate value using value.enrich(result) > > > > > return new KeyValue(key, value); > > > > > } } , "enrich-message-store"); > > > > > > > > > > So does kafka stream ensure that records of streamB would be > > processed > > > > only > > > > > after records of streamAA and streamAB are joined since they are in > > > > order? > > > > > Because if the operation of streamAA.join(streamAB) happens after > > > > > streamB.transform() > > > > > then it will not work. > > > > > I am assuming that since streamAA and streamAB types of records are > > > > before > > > > > streamB type of record in the input topic the join will also happen > > > > before. > > > > > If this assumption is not safe then is there any other way of > > ensuring. > > > > > > > > > > For now lets assume there is single partition of the input topic. > > > > > > > > > > Thanks > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Hello Sachin, > > > > > > > > > > > > 1) It seems from your source code, that in the stream2.transform > > you > > > > are > > > > > > generating a new value and return a new key-value pair: > > > > > > > > > > > > mutate value = enrich(value, result) > > > > > > return new KeyValue(key, value); > > > > > > > > > > > > --------------- > > > > > > > > > > > > Anyways, if you do not want to generate a new value object, and > > just > > > > > have a > > > > > > field function like this: > > > > > > > > > > > > value.enrich(result) > > > > > > return new KeyValue(key, value); > > > > > > > > > > > > --------------- > > > > > > > > > > > > That actually still works as long as your serde function > recognize > > > the > > > > > > optional enriched fields and can encode / decode the value object > > > with > > > > / > > > > > > without the enriched fields. > > > > > > > > > > > > 2) And regarding your join scenario, if your join does not depend > > on > > > > any > > > > > > field, but only depending on the time-range (since you used > > > fetchAll() > > > > > > which would return ALL the key-values falling into that range) I > > > think > > > > > this > > > > > > would be the way to do it. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal < > sjmit...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > I have two streams and I want to enrich stream2 records based > off > > > > > stream1 > > > > > > > records. > > > > > > > I really cannot join those two streams is because there is no > > > common > > > > > key > > > > > > > between them. > > > > > > > Hence only way I can do that is using a timestamp field > property. > > > > > > > > > > > > > > This is how I have built my pipeline. > > > > > > > ..... > > > > > > > //create and add state store > > > > > > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder = > > > > > > > Stores.windowStoreBuilder( > > > > > > > > Stores.persistentWindowStore("enrich-message-store", > > > > > > > retentionSize, windowSize, false), ...); > > > > > > > builder.addStateStore(enrichStoreBuilder) > > > > > > > ..... > > > > > > > //push the data to state store > > > > > > > stream1.transform(new TransformerSupplier() { transform(K key, > V > > > > > value) { > > > > > > > // timestamp = extract timestamp from (key, > > value) > > > > > > > enrichMessageStore.put(key , value, timestamp); > > > > > > > return new KeyValue(key, value); > > > > > > > } } , " enrich-message-store"); > > > > > > > ..... > > > > > > > //fetch the data from state store for > > > > > > > stream2.transform(new TransformerSupplier() { transform(K key, > V > > > > > value) { > > > > > > > // (from, to) = extract (from, to) from (key, > > > value) > > > > > > > result = enrichMessageStore.fetchAll( from, to > ); > > > > > > > //mutate value = enrich(value, result) > > > > > > > return new KeyValue(key, value); > > > > > > > } } , " enrich-message-store"); > > > > > > > ....... > > > > > > > > > > > > > > So is this something that would work. Basically in one stream > > > > > > > transformation a state store is updated. > > > > > > > And in second stream transformation values from that state > stores > > > are > > > > > > > fetched and stream2 value is enriched from those values. > > > > > > > The enriched (mutated) value is returned back to downstream. > > > > > > > > > > > > > > So will this work and most important can we mutate the input > > value > > > > > itself > > > > > > > in transform function and return the same or we should always > > > create > > > > > new > > > > > > > instances of the same. > > > > > > > > > > > > > > Is there any better way of doing this. > > > > > > > > > > > > > > Thanks > > > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang